# PySpark and Big Data Project 

* [Project Page Link](https://www.codecademy.com/courses/big-data-pyspark/projects/pyspark-common-crawl)

## Task Group 1 - Analyzing Common Crawl Data with RDDs

### Task 1

Initialize a new Spark Context and read in the domain graph as an RDD.


One of your colleagues has made good progress analyzing this dataset using only PySpark RDDs, but has asked you to continue work on this project with SparkSQL. To get familiar with the dataset, you should run their analysis.

In [1]:
# Import required modules
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Get SparkContext
sc = spark.sparkContext

In [3]:
# Read Domains CSV File into an RDD
common_crawl_domain_counts = sc.textFile('./crawl/cc-main-limited-domains.csv')

# Display first few domains from the RDD
common_crawl_domain_counts.take(10)

['367855\t172-in-addr\tarpa\t1',
 '367856\taddr\tarpa\t1',
 '367857\tamphic\tarpa\t1',
 '367858\tbeta\tarpa\t1',
 '367859\tcallic\tarpa\t1',
 '367860\tch\tarpa\t1',
 '367861\td\tarpa\t1',
 '367862\thome\tarpa\t7',
 '367863\tiana\tarpa\t1',
 '367907\tlocal\tarpa\t1']

### Task 2

Your colleague has written a function called fmt_domain_graph_entry that formats an entry in the domain dataset.

Apply `fmt_domain_graph_entry` over `common_crawl_domain_counts` and save the result as a new RDD named `formatted_host_counts`.

In [2]:
def fmt_domain_graph_entry(entry):
    """
    Formats a Common Crawl domain graph entry. Extracts the site_id, 
    top-level domain (tld), domain name, and subdomain count as seperate items.
    """

    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')        
    return int(site_id), domain, tld, int(num_subdomains)

In [4]:
# Apply `fmt_domain_graph_entry` to the raw data RDD
formatted_host_counts = common_crawl_domain_counts\
        .map(lambda x: fmt_domain_graph_entry(x))

# Display the first few entries of the new RDD
formatted_host_counts.take(10)

[(367855, '172-in-addr', 'arpa', 1),
 (367856, 'addr', 'arpa', 1),
 (367857, 'amphic', 'arpa', 1),
 (367858, 'beta', 'arpa', 1),
 (367859, 'callic', 'arpa', 1),
 (367860, 'ch', 'arpa', 1),
 (367861, 'd', 'arpa', 1),
 (367862, 'home', 'arpa', 7),
 (367863, 'iana', 'arpa', 1),
 (367907, 'local', 'arpa', 1)]

### Task 3

Your colleague has written another function called extract_domain_graph_host_count. 

Apply `extract_subdomain_counts` over `common_crawl_domain_counts` and save the result as a new RDD named `host_counts`.

Study the function, what do you think the result will look like?

In [6]:
def extract_subdomain_counts(entry):
    """
    Extract the subdomain count from a Common Crawl domain graph entry.
    """
    
    # Split the entry on delimiter ('\t') into site_id, domain, tld, and num_subdomains
    site_id, domain, tld, num_subdomains = entry.split('\t')
    
    # return ONLY the num_subdomains
    return int(num_subdomains)


# Apply `extract_subdomain_counts` to the raw data RDD
host_counts = common_crawl_domain_counts\
        .map(lambda x: extract_subdomain_counts(x))

# Display the first few entries
host_counts.take(10)

[1, 1, 1, 1, 1, 1, 1, 7, 1, 1]

### Task 4

Using `host_counts`, calculate the total number of subdomains across all domains in the dataset, save the result to a variable named `total_host_counts`.

In [7]:
# Reduce the RDD to a single value, the sum of subdomains, with a lambda function
# as the reduce function
total_host_counts = host_counts\
        .reduce(lambda a, b: a + b)

# Display result count
total_host_counts

595466

### Task 5

We can do a bit more analysis more easily with PySpark SQL. 

Stop the current `SparkSession` and `sparkContext` before moving on to analyze the data with SparkSQL

In [8]:
# Stop the sparkContext and the SparkSession
spark.stop()

## Task Group 2 - Exploring Domain Counts with PySpark DataFrames and SQL

### Task 6

Create a new `SparkSession` and assign it to a variable named `spark`.

In [10]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession.builder.getOrCreate() 

### Task 7

Read `./crawl/cc-main-limited-domains.csv` into a new Spark DataFrame named `common_crawl`.

This dataset doesn’t have headers, so we can use Spark’s auto-generated column names for now. 

In [14]:
# Read the target file into a DataFrame
common_crawl = spark.read \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv('./crawl/cc-main-limited-domains.csv')

# Display the DataFrame to the notebook
common_crawl.show(5, truncate=False)

+------+-----------+----+---+
|_c0   |_c1        |_c2 |_c3|
+------+-----------+----+---+
|367855|172-in-addr|arpa|1  |
|367856|addr       |arpa|1  |
|367857|amphic     |arpa|1  |
|367858|beta       |arpa|1  |
|367859|callic     |arpa|1  |
+------+-----------+----+---+
only showing top 5 rows



### Task 8

Because this dataset doesn’t have headers, we’ll have to set them ourselves. 

Rename the DataFrame's columns to the following: 

- site_id
- domain
- top_level_domain
- num_subdomains


In [17]:
# Rename the DataFrame's columns with `withColumnRenamed()`
common_crawl = common_crawl\
    .withColumnRenamed('_c0', 'site_id')\
    .withColumnRenamed('_c1', 'domain')\
    .withColumnRenamed('_c2', 'top_level_domain')\
    .withColumnRenamed('_c3', 'num_subdomains')

# Display the first few rows of the DataFrame and the new schema
common_crawl.show(5, truncate=False)
common_crawl.printSchema()

+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows

root
 |-- site_id: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- top_level_domain: string (nullable = true)
 |-- num_subdomains: integer (nullable = true)



## Task Group 3 - Reading and Writing Datasets to Disk

### Task 9


Before moving on to analyzing this dataset, let’s save it as parquet files. This will help our other colleagues work with it more easily.

Save the `common_crawl` DataFrame as parquet files in a directory called `./results/common_crawl/`.

In [18]:
# Save the `common_crawl` DataFrame to a series of parquet files
common_crawl.toDF('site_id', 'domain', 'top_level_domain', 'num_subdomains')\
    .show(5, truncate=False)


+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows



### Task 10

Read `./results/common_crawl/` into a new DataFrame to confirm our DataFrame was saved properly.

In [24]:
# Read from parquet directory


# Display the first few rows of the DataFrame and the schema in the notebook


## Task Group 4 - Querying Domain Counts with PySpark DataFrames and SQL

### Task 11

Create a local temporary view from `common_crawl_domains`

In [None]:
# Create a temporary view in the metadata for this `SparkSession`


### Task 12

Calculate the total number of domains for each top-level domain in the dataset.

In [None]:
# Aggregate the DataFrame using DataFrame methods



In [None]:
# Aggregate the DataFrame using SQL



### Task 13

Calculate the total number of subdomains for each top-level domain in the dataset.

In [None]:
# Aggregate the DataFrame using DataFrame methods



In [None]:
# Aggregate the DataFrame using SQL



### Task 14

How many sub-domains does `nps.gov` have? Filter the dataset to that website's entry, display the columns `top_level_domain`, `domain`, and `num_subdomains` in your result.

In [None]:
# Filter the DataFrame using DataFrame Methods



In [None]:
# Filter the DataFrame using SQL



### Task 15

Close the `SparkSession` and underlying `sparkContext`.

In [None]:
# Stop the notebook's `SparkSession` and `sparkContext`
