# Big Data Wrangling with Google Books Ngrams

#### Name: Amirhossein Kiani
#### Email: amkoxia@gmail.com
#### Date: July 23, 2023

## Setting Things Up

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1690152117275_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f2ee8e594d0>

In [3]:
sc.list_packages()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- ----------
aws-cfn-bootstrap          2.0
beautifulsoup4             4.9.3
boto                       2.49.0
click                      8.1.3
docutils                   0.14
jmespath                   1.0.1
joblib                     1.2.0
lockfile                   0.11.0
lxml                       4.9.2
mysqlclient                1.4.2
nltk                       3.8
nose                       1.3.4
numpy                      1.20.0
pip                        20.2.2
py-dateutil                2.2
pystache                   0.5.4
python-daemon              2.2.3
python37-sagemaker-pyspark 1.4.2
pytz                       2022.7
PyYAML                     5.4.1
regex                      2021.11.10
setuptools                 28.8.0
simplejson                 3.2.0
six                        1.13.0
tqdm                       4.64.1
wheel                      0.29.0
windmill                   1.6


In [4]:
# install data science and plotting packages

sc.install_pypi_package("pandas==1.0.5") 
sc.install_pypi_package("matplotlib==3.1.1")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Downloading pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (10.1 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.8.2

Collecting matplotlib==3.1.1
  Downloading matplotlib-3.1.1-cp37-cp37m-manylinux1_x86_64.whl (13.1 MB)
Collecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading pyparsing-3.1.0-py3-none-any.whl (102 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.4.4-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.1 MB)
Collecting typing-extensions; python_version < "3.8"
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Installing collected packages: cycler, pyparsing, typing-extensions, kiwisolver, matplotlib
Successfully installed cycler-0.11.0 kiwisolver-1.4.4 matplotlib-3.1.1 

In [6]:
import pandas as pd
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Q1-3
The following steps have been carried out separately, with the steps detaileed in a PDF document available in the repository.

**Q1:** Spin up a new EMR cluster on AWS for using Spark and EMR notebooks - follow the same instructions as for the Spark Lab.

**Q2:** Connect to the head node of the cluster using SSH.

**Q3:** Copy the data folder from the S3 bucket directly into a directory on the Hadoop File System (HDFS) named

I did the following in the order that they appear, in my Git Bash. Screenshots are attached to the deliverable folder:

- cd '<the directory where the .pem file is located>'

- ssh -i amirkia_hadoop.pem -L 9995:localhost:9443 hadoop@ec2-3-133-154-124.us-east-2.compute.amazonaws.com

- hadoop distcp s3://brainstation-dsft/eng_1M_1gram.csv /user/hadoop/eng_1M_1gram

- Open https://localhost:9995 in my browser.

## Q4. Working with Spark DataFrames

In [7]:
# Read from S3 directly
books = spark.read.csv('s3://brainstation-dsft/eng_1M_1gram.csv', header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4. a. Describe the dataset (examples include size, shape, schema) in pyspark

`df.printSchema()` give us the following information for each column in the DataFrame:
- Column names
- Data type of each column
- Whether the column can contain null values ('nullable')

In [8]:
books.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- token: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- frequency: integer (nullable = true)
 |-- pages: integer (nullable = true)
 |-- books: integer (nullable = true)

We have 5 columns: `token`, `year`, `frequency`, `pages`, and `books`.

In [9]:
books.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

261823225

We have 261,823,225 rows!

In [10]:
books.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----+---------+-----+-----+
|    token|year|frequency|pages|books|
+---------+----+---------+-----+-----+
|inGermany|1927|        2|    2|    2|
|inGermany|1929|        1|    1|    1|
|inGermany|1930|        1|    1|    1|
|inGermany|1933|        1|    1|    1|
|inGermany|1934|        1|    1|    1|
|inGermany|1935|        1|    1|    1|
|inGermany|1938|        5|    5|    5|
|inGermany|1939|        1|    1|    1|
|inGermany|1940|        1|    1|    1|
|inGermany|1942|        2|    2|    2|
|inGermany|1944|        2|    2|    2|
|inGermany|1946|        2|    2|    2|
|inGermany|1947|        3|    3|    2|
|inGermany|1948|        1|    1|    1|
|inGermany|1949|        1|    1|    1|
|inGermany|1952|        1|    1|    1|
|inGermany|1956|        1|    1|    1|
|inGermany|1957|        2|    2|    2|
|inGermany|1959|        1|    1|    1|
|inGermany|1960|        3|    3|    3|
+---------+----+---------+-----+-----+
only showing top 20 rows

### 4.b. Create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data" and describe the new dataset
Create a new DataFrame from a query using Spark SQL, filtering to include only the rows where the token is "data" and describe the new dataset.

In [11]:
# Register the dataframe as a view
book.createOrReplaceTempView("books")    # Creates or replaces a local temporary view with this DataFrame.

# Execute a SQL query
spark.sql("SELECT token FROM books WHERE token == 'data'").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+
|token|
+-----+
| data|
| data|
| data|
| data|
| data|
+-----+
only showing top 5 rows

In [12]:
# Count the number of rows that have 'data' in the 'token' column
count = spark.sql("SELECT COUNT(*) FROM books WHERE token == 'data'").collect()[0][0]

# Check the result
print("Number of rows with 'data' in the token column:", count)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of rows with 'data' in the token column: 316

### 4.c. Write the filtered data back to a directory in the HDFS from Spark using `df.write.csv()`. Be sure to pass the `header=True` parameter and examine the contents of what you've written.

In [13]:
filtered = spark.sql("SELECT token, year, COUNT(*) AS count \
                     FROM books \
                     WHERE token == 'data' \
                     GROUP BY token, year \
                     ORDER BY count DESC")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This query filters the dataframe to only include rows where the value in the `token` column is equal to 'data'. Then the query groups the remaining rows by both the `token` and `year` columns and counts the number of rows in each group. The resulting dataframe is sorted in descending order using the `count` column.

In [None]:
filtered.show(5)

Now we write the contents of the `filtered` dataframe to a CSV file located at `/user/hadoop/filtered.csv` in the HDFS.

In [19]:
filtered.write.csv("/user/hadoop/filtered.csv", header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Q5. Collect the contents of the directory into a single file on the local drive of the head node using `getmerge` and move this file into a S3 bucket in personal account.

I created an S3 bucket named 'amirkiabucket' and ran the below code in the GitBash terminal:

In [20]:
# hadoop fs -getmerge /user/hadoop/filtered.csv /home/hadoop/filtered.csv
# aws s3 cp filtered.csv s3://amirkiabucket/filtered.csv

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The first line merges all files in the `/user/hadoop/filtered.csv` directory in HDFS into a single file, and save the resulting file to `/home/hadoop/filtered.csv` on the local file system. The second line copies the file to the S3 bucket (amirkiabucket).

### Q6. On your local machine (or on AWS outside of Spark) in python, read the CSV data from the S3 folder into a pandas DataFrame (You will have to research how to read data into pandas from S3 buckets).

In [22]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV from S3") \
    .getOrCreate()

# Define the S3 bucket and file path
bucket_name = "amirkiabucket"
file_name = "filtered.csv"
file_path = f"s3a://{bucket_name}/{file_name}"

# Read the CSV file from S3 and create a DataFrame
df = spark.read \
    .format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load(file_path)

# Show the DataFrame
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----+-----+
|token|year|count|
+-----+----+-----+
| data|1764|    1|
| data|1817|    1|
| data|1840|    1|
| data|1846|    1|
| data|1855|    1|
| data|1769|    1|
| data|1963|    1|
| data|1627|    1|
| data|1965|    1|
| data|2002|    1|
| data|1767|    1|
| data|1798|    1|
| data|1968|    1|
| data|1805|    1|
| data|1874|    1|
| data|1775|    1|
| data|1945|    1|
| data|1753|    1|
| data|1825|    1|
| data|1834|    1|
+-----+----+-----+
only showing top 20 rows

### Q7. Plot the number of occurrences of the token (the frequency column) of data over the years using matplotlib

In [30]:
from pyspark.sql import functions as F

# Group by the 'token' column and count occurrences
token_counts = df.groupBy("token").count().orderBy("token")

# Show the token counts
token_counts.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+
|token|count|
+-----+-----+
| data|  316|
+-----+-----+

In [34]:
import pandas as pd

# filter for the "data" token
counts = df.filter(df.token == "data")

# group by year and sum the counts
counts = counts.groupBy("year").sum("count")

# convert to Pandas DataFrame and sort by year
counts_pd = counts.toPandas().sort_values("year")

# plot
plt.figure(figsize=(10, 5))
plt.plot(data_counts_pd["year"], data_counts_pd["sum(count)"], marker='o')
plt.title("Number of occurrences of 'data' over the years")
plt.xlabel("Year")
plt.ylabel("Frequency")
plt.show()

%matplot plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Pandas >= 1.0.5 must be installed; however, your version was 0.25.1.
Traceback (most recent call last):
  File "/mnt1/yarn/usercache/livy/appcache/application_1690152117275_0004/container_1690152117275_0004_01_000001/pyspark.zip/pyspark/sql/pandas/conversion.py", line 86, in toPandas
    require_minimum_pandas_version()
  File "/mnt1/yarn/usercache/livy/appcache/application_1690152117275_0004/container_1690152117275_0004_01_000001/pyspark.zip/pyspark/sql/pandas/utils.py", line 40, in require_minimum_pandas_version
    "your version was %s." % (minimum_pandas_version, pandas.__version__)
ImportError: Pandas >= 1.0.5 must be installed; however, your version was 0.25.1.



**NOTE:** I re-ran everything but my connection was unstable and since I was getting charged for my EMR cluster, I thought to shut this down. This should work now that I have installed the relevant version of Pandas earlier in the notebook.

### Q8. Compare Hadoop and Spark as distributed file systems.

#### a) What are the advantages/ differences between Hadoop and Spark? List two advantages for each.

#### Haddop:
**Advanteges:**

- Haddop offers a cost-efficient model, being free to use, and utilizing inexpensive commodity hardware.
- The sytem's high scalability allows it to efficiently distribute large datasets across multiple low-cost machines in a clustr, enabling parallel processing.

#### Spak:
**Advantages:**

- Spark's remarkabl speed, up to 100 timess faster than Haddop for large-scale data processing, is achieved through in-memory computng and various optimizations.

- Spak comes with a comprhensive set of higher-level libraries, supporting SQL queries, streaming data, machne learning, and graph processing, making it more verstatile for diverse data tasks.

Also, Spark outperforms Haddop due to its utilisation of RAM instad of reading and writing intermediate data to disks. In contrast, Haddop processes data in batches through MapReduce, which can be comparativly slower. Haddop's advantage lies in its lower cost as it effciently operates with various disk storage types for data processing. Spark's in-memory approach, though fast, may requir more memory resources and potentially higher infrastucture costs.

#### b) Explain how the HDFS stores the data

The Hadoop Distributed File System (HDFS) is the primary storage system used by Hadoop. It divides files into blocks and stores each block on a DataNode, which are worker nodes in the Hadoop cluster. The NameNode is the central metadata server in the HDFS. It stores information about the location of the data blocks and which DataNodes they are stored on. Applications can access data stored in the HDFS through the Hadoop Distributed File System API, which provides a simple interface for reading and writing files stored in the HDFS.