### Notebook Summary

In this notebook, we use PySpark on EMR (via JupyterHub) to read the Google Books Ngrams CSV that was copied into HDFS. We validate the load by inspecting the schema and reporting row and column counts, as well as some other basic info. Next, we run a Spark SQL query to filter rows where token = "data" and briefly examine the filtered dataset. Finally, we write the filtered results back to HDFS as a CSV, verify the output, and prepare for the downstream step of merging/uploading to S3 for local analysis.

### Preliminaries

In [17]:
# Imports

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Check that Spark session exists

spark = SparkSession.builder.getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Read csv from HDFS

PATH = f"hdfs:///user/hadoop/eng_1M_1gram/eng_1M_1gram.csv"

ngram_df = (spark.read
           .option("header",True)
           .csv(PATH, inferSchema=True))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Cache the dataframe to speed up repeated operations

ngram_df.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[token: string, year: int, frequency: int, pages: int, books: int]

In [9]:
# Confirm this is a Spark dataframe

print(f"Dataframe type is {type(ngram_df)}.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dataframe type is <class 'pyspark.sql.dataframe.DataFrame'>.

### Inspect data

In [12]:
# Count the number of rows and columns

rows = ngram_df.count()
cols = len(ngram_df.columns)


print(f"The number of rows is {rows}. The number of columns is {cols}.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of rows is 261823225. The number of columns is 5.

This is a massive dataset with 261,823,225 rows. However, the column count is just 5. 

In [14]:
# Examine schema

ngram_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- frequency: integer (nullable = true)
 |-- pages: integer (nullable = true)
 |-- books: integer (nullable = true)

In [16]:
# Inspect 10 rows

ngram_df.show(10, truncate=False, vertical=True) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0--------------
 token     | inGermany 
 year      | 1927      
 frequency | 2         
 pages     | 2         
 books     | 2         
-RECORD 1--------------
 token     | inGermany 
 year      | 1929      
 frequency | 1         
 pages     | 1         
 books     | 1         
-RECORD 2--------------
 token     | inGermany 
 year      | 1930      
 frequency | 1         
 pages     | 1         
 books     | 1         
-RECORD 3--------------
 token     | inGermany 
 year      | 1933      
 frequency | 1         
 pages     | 1         
 books     | 1         
-RECORD 4--------------
 token     | inGermany 
 year      | 1934      
 frequency | 1         
 pages     | 1         
 books     | 1         
-RECORD 5--------------
 token     | inGermany 
 year      | 1935      
 frequency | 1         
 pages     | 1         
 books     | 1         
-RECORD 6--------------
 token     | inGermany 
 year      | 1938      
 frequency | 5         
 pages     | 5         
 books     | 5  

Based on the schema and the above sample, we infer the data format (1-gram Ngrams, per year).

Each row represents a single token ``token`` in a specific year ``year``. The other columns are:

``frequency``: total occurrences of the token that year.

``pages``: number of distinct pages containing the token that year.

``books``: number of distinct books containing the token that year.

From the first previewed rows, the example token is inGermany, appearing sparsely between 1927–1942 with small counts.

In [18]:
# Determine approximate count of distinct tokens

ngram_df.select(F.approx_count_distinct("token").alias("approx_unique_tokens")).show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|approx_unique_tokens|
+--------------------+
|             3051522|
+--------------------+

The above function approximates there are 3,051,522 unique tokens in our data.

In [22]:
# Check null count for each column

nulls = ngram_df.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) for c in ngram_df.columns
])
nulls.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+---------+-----+-----+
|token|year|frequency|pages|books|
+-----+----+---------+-----+-----+
|0    |432 |432      |432  |432  |
+-----+----+---------+-----+-----+

As expected, there are no null ``token`` records. There are 432 null entries for all the other columns, likely the same rows. Given the size of the dataset, this amount of missing values is clearly negligible. 

In [23]:
# Check ranges for each numeric column

ngram_df.agg(
    F.min("year").alias("min_year"), F.max("year").alias("max_year"),
    F.min("frequency").alias("min_freq"), F.max("frequency").alias("max_freq"),
    F.min("pages").alias("min_pages"), F.max("pages").alias("max_pages"),
    F.min("books").alias("min_books"), F.max("books").alias("max_books")
).show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+--------+--------+--------+---------+---------+---------+---------+
|min_year|max_year|min_freq|max_freq|min_pages|max_pages|min_books|max_books|
+--------+--------+--------+--------+---------+---------+---------+---------+
|    1520|    2008|       1|43571378|        1|  2035864|        1|     6174|
+--------+--------+--------+--------+---------+---------+---------+---------+

The data span 1520–2008 by year (the earliest date predates the referenced 1800s coverage, so we may later restrict to ≥ 1800 for consistency). Frequency ranges from 1 to 43,571,378, with pages up to 2,035,864 and books up to 6,174, reflecting heavy usage of popular tokens.

### Filter data and describe new dataset

In [24]:
# Filter for token == "data" using Spark SQL

ngram_df.createOrReplaceTempView("ngrams")
filtered_df = spark.sql("""
  SELECT token, year, frequency, pages, books
  FROM ngrams
  WHERE token = 'data'
  ORDER BY year
""")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Count the number of rows and columns

rows_filtered = filtered_df.count()
cols_filtered = len(filtered_df.columns)


print(f"The number of rows is {rows_filtered}. The number of columns is {cols_filtered}.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The number of rows is 316. The number of columns is 5.

After filtering to only records corresponding to the token "data," we have restricted to only 316 rows.

In [29]:
# Inspect 10 rows

filtered_df.show(10, truncate=False, vertical=True) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0---------
 token     | data 
 year      | 1584 
 frequency | 16   
 pages     | 14   
 books     | 1    
-RECORD 1---------
 token     | data 
 year      | 1614 
 frequency | 3    
 pages     | 2    
 books     | 1    
-RECORD 2---------
 token     | data 
 year      | 1627 
 frequency | 1    
 pages     | 1    
 books     | 1    
-RECORD 3---------
 token     | data 
 year      | 1631 
 frequency | 22   
 pages     | 18   
 books     | 1    
-RECORD 4---------
 token     | data 
 year      | 1637 
 frequency | 1    
 pages     | 1    
 books     | 1    
-RECORD 5---------
 token     | data 
 year      | 1638 
 frequency | 2    
 pages     | 2    
 books     | 1    
-RECORD 6---------
 token     | data 
 year      | 1640 
 frequency | 1    
 pages     | 1    
 books     | 1    
-RECORD 7---------
 token     | data 
 year      | 1642 
 frequency | 1    
 pages     | 1    
 books     | 1    
-RECORD 8---------
 token     | data 
 year      | 1644 
 frequency | 4    
 pages     |

A sample of 10 records in our new dataset shows some early mentions of the word "data" going back as far as the 1500s.

In [30]:
# Confirm the only token is now "data"

uniq = filtered_df.select("token").distinct()
n_uniq = uniq.count()
print("Distinct tokens:", n_uniq)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Distinct tokens: 1

Since our ``token`` column no longer distinguishes records, we drop the column.

In [31]:
# Drop the token column

filtered_df = filtered_df.drop("token")

print("Columns after drop:", filtered_df.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columns after drop: ['year', 'frequency', 'pages', 'books']

In [32]:
# Confirm there are no nulls

nulls_filtered = filtered_df.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) for c in filtered_df.columns
])
nulls_filtered.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------+-----+-----+
|year|frequency|pages|books|
+----+---------+-----+-----+
|0   |0        |0    |0    |
+----+---------+-----+-----+

In [33]:
# Determine numeric ranges for all columns

mins = filtered_df.agg(*(F.min(c).alias(f"min_{c}") for c in filtered_df.columns))
maxs = filtered_df.agg(*(F.max(c).alias(f"max_{c}") for c in filtered_df.columns))
mins.show(truncate=False)
maxs.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------------+---------+---------+
|min_year|min_frequency|min_pages|min_books|
+--------+-------------+---------+---------+
|1584    |1            |1        |1        |
+--------+-------------+---------+---------+

+--------+-------------+---------+---------+
|max_year|max_frequency|max_pages|max_books|
+--------+-------------+---------+---------+
|2008    |254561       |122472   |4372     |
+--------+-------------+---------+---------+

Checking the value ranges for each column, the years span from 1584 to 2008. The minimum frequency of the word "data" appearing is 1, while the maximum is 254,561. 

### Write filtered data back into HDFS

In [34]:
# Use .write.csv() to write filtered data back into HDFS

out_dir = "hdfs:///user/hadoop/outputs/data_token_csv"


(filtered_df
    .orderBy("year")         # Sort by year
    .coalesce(1)             # Ensure single file
    .write
    .csv(out_dir, mode="overwrite", header=True)) # Specify header = True

print("Wrote CSV directory:", out_dir)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Wrote CSV directory: hdfs:///user/hadoop/outputs/data_token_csv

In [35]:
# Read filtered data back to confirm the last step executed correctly

written_df = spark.read.csv(out_dir, header=True, inferSchema=True)
print("Rows written:", written_df.count())
written_df.printSchema()
written_df.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rows written: 316
root
 |-- year: integer (nullable = true)
 |-- frequency: integer (nullable = true)
 |-- pages: integer (nullable = true)
 |-- books: integer (nullable = true)

+----+---------+-----+-----+
|year|frequency|pages|books|
+----+---------+-----+-----+
|1584|16       |14   |1    |
|1614|3        |2    |1    |
|1627|1        |1    |1    |
|1631|22       |18   |1    |
|1637|1        |1    |1    |
+----+---------+-----+-----+
only showing top 5 rows