## 🤖 Spark, PySpark, and Spark SQL Programming Assignment

**due: March 8th (11:59pm)** \
**name:** Sajida Malik

#### part I: 🏠 median rental data (warm-up) 

In [0]:
# uncomment the following code & run it to import libraries

from pyspark.sql import SparkSession

In [0]:
# create a new spark session

spark = SparkSession.builder.getOrCreate()
print(spark) #to check the session is built correctly

<pyspark.sql.session.SparkSession object at 0x7f21b6481db0>


In [0]:
# create an RDD out of the data listed below:

median_rent = [
    ["Bronx", "borough", 2195, 2200],
    ["Brooklyn", "borough", 2999, 2999],
    ["Manhattan", "borough", 4000, 4100],
    ["Queens", "borough", 2500, 2495],
    ["Staten Island", "borough", 1600, 1600]
]
median_rent_rdd = spark.sparkContext.parallelize(median_rent)
# confirming that RDD contains correct data
print(median_rent_rdd.collect())

[['Bronx', 'borough', 2195, 2200], ['Brooklyn', 'borough', 2999, 2999], ['Manhattan', 'borough', 4000, 4100], ['Queens', 'borough', 2500, 2495], ['Staten Island', 'borough', 1600, 1600]]


In [0]:
# display how many partitions are used in this RDD
# (using getNumPartitions function)

print(median_rent_rdd.getNumPartitions())


8


In [0]:
# create a new DataFrame from the RDD, with the following column names:
# "areaName", "areaType", "2023-12", "2024-01" 

median_rent_df = median_rent_rdd.toDF(["areaName","areaType", "2023-12", "2024-01"])


In [0]:
# display the full 5 rows of the DataFrame

median_rent_df.show(5, truncate = False)


+-------------+--------+-------+-------+
|areaName     |areaType|2023-12|2024-01|
+-------------+--------+-------+-------+
|Bronx        |borough |2195   |2200   |
|Brooklyn     |borough |2999   |2999   |
|Manhattan    |borough |4000   |4100   |
|Queens       |borough |2500   |2495   |
|Staten Island|borough |1600   |1600   |
+-------------+--------+-------+-------+



In [0]:
# create a new DataFrame that drops the "2023-12" column
# and uses DataFrame methods (not SQL) to sort the "2024-01" column, highest to lowest 
# display the full new DataFrame
N_median_rent_df = median_rent_df.drop("2023-12")
New_median_rent_df = N_median_rent_df.orderBy(N_median_rent_df["2024-01"].desc())
New_median_rent_df.show(truncate = False)


+-------------+--------+-------+
|areaName     |areaType|2024-01|
+-------------+--------+-------+
|Manhattan    |borough |4100   |
|Brooklyn     |borough |2999   |
|Queens       |borough |2495   |
|Bronx        |borough |2200   |
|Staten Island|borough |1600   |
+-------------+--------+-------+



✍️ Double-click to answer here in full sentence format:\
Look at your line of code in the previous cell. Of the methods that you used: which were **narrow transformations,** which were **wide transformations,** and which were **actions**? When did the series of transformations get "triggered" into actually computing?

*median_rent_df.drop("2023-12")* is a narrow transformationas because a narrow transformation converts each input partition to only one output partition and tranforms data without shuffling. This line of code simply removes a column from the DataFrame without requiring data from other partitions.

*N_median_rent_df.orderBy(N_median_rent_df["2024-01"].desc())* is a wide transformation as it requires shuffling the data. For this line of code, sorting the data by a column may require comparing elements across different partitions, leading to a shuffle of data across the cluster to ensure the data is sorted properly.

*New_median_rent_df.show(truncate = False)* is an action because actions are steps that start the process of working with data. They are necessary to get a final result after making changes to the data.

The series transformations get "triggered" into actually computing when calling an action like show(truncate = False).After this spark perform the necessary computations to generate and display the output.



In [0]:
# create a new DataFrame including a new column, "normalized"
# the value of "normalized" is the value of the "2024-01" column, divided by 1000
# display the full new DataFrame

df_New = New_median_rent_df.withColumn("normalized", New_median_rent_df["2024-01"] / 1000)

df_New.show(truncate = False)

+-------------+--------+-------+----------+
|areaName     |areaType|2024-01|normalized|
+-------------+--------+-------+----------+
|Manhattan    |borough |4100   |4.1       |
|Brooklyn     |borough |2999   |2.999     |
|Queens       |borough |2495   |2.495     |
|Bronx        |borough |2200   |2.2       |
|Staten Island|borough |1600   |1.6       |
+-------------+--------+-------+----------+



#### part II: 🌎 common crawl data

In [0]:
# first read the overview of the Common Crawl project's mission, here on their website: https://commoncrawl.org/mission 

In [0]:
# uncomment & run the following code, which "gets" (downloads) a subset of the most recent common crawl 
# this may take a minute!

!wget https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl=CC-MAIN-2023-50/subset=crawldiagnostics/part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet

--2024-03-04 03:34:43--  https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl=CC-MAIN-2023-50/subset=crawldiagnostics/part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet
Resolving data.commoncrawl.org (data.commoncrawl.org)... 18.161.6.121, 18.161.6.27, 18.161.6.34, ...
Connecting to data.commoncrawl.org (data.commoncrawl.org)|18.161.6.121|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 207127367 (198M) [application/octet-stream]
Saving to: ‘part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet’


2024-03-04 03:34:44 (114 MB/s) - ‘part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet’ saved [207127367/207127367]



In [0]:
# uncomment & run the following code, which lists the local file system 
# to check and see if that Parquet file made it

!ls

azure
conf
eventlogs
hadoop_accessed_config.lst
logs
metastore_db
part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet
preload_class.lst


In [0]:
# uncomment & run the following code, which uploads your downloaded Parquet file to the Databricks Distributed File System (and your account)

dbutils.fs.cp('file:/databricks/driver/part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet', 'dbfs:/FileStore/tables/')

# after, in the menu to the left, click Catalog -> DBFS, look for the FileStore/tables folder, and check to see if your file made it there

True

In [0]:
# create a DataFrame from this Parquet file, using the `read` function
# hint: your file path should be dbfs:/FileStore/tables/part-00000-e565b809-b335-4c1d-90fd-54a9a2b7113d.c000.gz.parquet

df = spark.read.parquet('dbfs:/FileStore/tables')


In [0]:
# remember, every DataFrame is built "on top of" an RDD. 
# uncomment & run the following code to access the underlying RDD
# and display the number of partitions used
# (replace YOURDATAFRAME with the name of your DataFrame from the previous cell)

df.rdd.getNumPartitions()



2

In [0]:
# print the schema for the DataFrame created by the Parquet file

df.printSchema()


root
 |-- url_surtkey: string (nullable = true)
 |-- url: string (nullable = true)
 |-- url_host_name: string (nullable = true)
 |-- url_host_tld: string (nullable = true)
 |-- url_host_2nd_last_part: string (nullable = true)
 |-- url_host_3rd_last_part: string (nullable = true)
 |-- url_host_4th_last_part: string (nullable = true)
 |-- url_host_5th_last_part: string (nullable = true)
 |-- url_host_registry_suffix: string (nullable = true)
 |-- url_host_registered_domain: string (nullable = true)
 |-- url_host_private_suffix: string (nullable = true)
 |-- url_host_private_domain: string (nullable = true)
 |-- url_host_name_reversed: string (nullable = true)
 |-- url_protocol: string (nullable = true)
 |-- url_port: integer (nullable = true)
 |-- url_path: string (nullable = true)
 |-- url_query: string (nullable = true)
 |-- fetch_time: timestamp (nullable = true)
 |-- fetch_status: short (nullable = true)
 |-- fetch_redirect: string (nullable = true)
 |-- content_digest: string (nulla

In [0]:
# looks like a lot of columns! 
# you can read more about each column here: https://data.commoncrawl.org/cc-index/table/cc-main/index.html 

# now: use an action to count the number of "rows" or elements in this DataFrame
df.count()


2565517

In [0]:
# display the first row in the DataFrame only
# use the argument truncate=False to show all the information
df.show(1, truncate = False)


+----------------------------------------------------------------+---------------------------------------------------------------------------+-------------------------------+------------+-----------------------+----------------------+----------------------+----------------------+------------------------+---------------------------+-----------------------+---------------------------+-------------------------------+------------+--------+------------------------------------+---------+-------------------+------------+--------------+--------------------------------+-----------------+---------------------+---------------+-----------------+-----------------+-------------------------------------------------------------------------------------------------------------------------+------------------+------------------+----------------+
|url_surtkey                                                     |url                                                                        |url_host_name        

In [0]:
# create a new DataFrame, called cc_filtered
# that uses the previous DataFrame but displays only the following columns:
# "url"
# "url_protocol"
# "url_host_3rd_last_part"
# "url_host_registered_domain"
# "url_host_registry_suffix"
# "content_mime_type"

# and then display the first 10 rows of cc_filtered
# use the argument truncate=True to clean up the display of the info

cc_filtered = df.select(
                        "url",
                        "url_protocol",
                        "url_host_3rd_last_part",
                        "url_host_registered_domain",
                        "url_host_registry_suffix",
                        "content_mime_type")
cc_filtered.show(10, truncate= True)



+--------------------+------------+----------------------+--------------------------+------------------------+-----------------+
|                 url|url_protocol|url_host_3rd_last_part|url_host_registered_domain|url_host_registry_suffix|content_mime_type|
+--------------------+------------+----------------------+--------------------------+------------------------+-----------------+
|https://www.safeh...|       https|                   www|      safehavencounseli...|                     com|        text/html|
|https://fr.safeha...|       https|                    fr|      safehavencounseli...|                     com|              unk|
|https://fr.safeha...|       https|                    fr|      safehavencounseli...|                     com|              unk|
|https://fr.safeha...|       https|                    fr|      safehavencounseli...|                     com|              unk|
|https://fr.safeha...|       https|                    fr|      safehavencounseli...|            

In [0]:
# re-name the following columns in cc_filtered
# "url_host_3rd_last_part" = "subdomain"
# "url_host_registry_suffix" = "suffix"
# "url_host_registered_domain" = "domain_name")

# and then display the first 3 rows of cc_filtered (with the new column names)

cc_filtered = cc_filtered\
                        .withColumnRenamed("url_host_3rd_last_part","subdomain")\
                        .withColumnRenamed("url_host_registry_suffix","suffix")\
                        .withColumnRenamed("url_host_registered_domain","domain_name")
cc_filtered.show(3, truncate= False)


+----------------------------------------------------------------------------------+------------+---------+---------------------------+------+-----------------+
|url                                                                               |url_protocol|subdomain|domain_name                |suffix|content_mime_type|
+----------------------------------------------------------------------------------+------------+---------+---------------------------+------+-----------------+
|https://www.safehavencounselingpllc.com/post/ecounseling-the-safe-haven-way       |https       |www      |safehavencounselingpllc.com|com   |text/html        |
|https://fr.safehavencounselingpllc.com/bookings-checkout/in-home-family-counseling|https       |fr       |safehavencounselingpllc.com|com   |unk              |
|https://fr.safehavencounselingpllc.com/bookings-checkout/trauma-dbt-informed-group|https       |fr       |safehavencounselingpllc.com|com   |unk              |
+---------------------------------

In [0]:
# we are going to use cc_filtered a lot - so let's cache it.
# write a line of code that caches cc_filtered

cc_filtered.cache()


DataFrame[url: string, url_protocol: string, subdomain: string, domain_name: string, suffix: string, content_mime_type: string]

In [0]:
# uncomment & run the following code to check: is the DataFrame cached?

print(cc_filtered.is_cached)

True


✍️ Double-click to answer here in full sentence format:\
is caching an **action** or a **transformation** in Spark? 
And why would I want to cache the DataFrame?

Caching is a transformation not an action. As transformations refres to the operations that creates a new DataFrame from an existing one through process like .cache() etc. The actual process of caching, or storing data in memory, occurs only upon the execution of an action like .show() etc.

Caching a DataFrame is beneficial because it significantly speeds up data processing by storing frequently accessed data in memory, reducing the need for repeated computations. This is especially useful in iterative algorithms where the same data is processed multiple times, enhancing overall performance. Moreover, it minimizes I/O operations, saving time and resources by avoiding constant data retrieval from disk or over the network.


In [0]:
# uncomment & run the following code:
cc_mime = cc_filtered.select("content_mime_type").distinct()


In [0]:
# hmm, what is mime type? read up a little here: 
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types

# and show the first 60 rows of the DataFrame cc_mime
# include the argument truncate=False to show all the info
cc_mime.show(60, truncate = False)

+-----------------------------------------------------------------------+
|content_mime_type                                                      |
+-----------------------------------------------------------------------+
|text/x-perl                                                            |
|application/rss+xml                                                    |
|httpd/unix-directory                                                   |
|application/octet-stream                                               |
|redirect                                                               |
|application/x-httpd-ea-php54                                           |
|application/atom+xml                                                   |
|text/xml                                                               |
|text/Calendar                                                          |
|text/csv                                                               |
|application/xml                      

✍️ Double-click to answer here in full sentence format: \
what does the code in cell 27 (`select("content_mime_type").distinct()`) do? 
 
This code selects only one column "content_mime-type" and removes it's duplicates and returns only unique values of this column.
 

In [0]:
# create a new DataFrame that counts up the total number of times each mime type occurs in the pages of this dataset
# and then filter so that the DataFrame only includes those types with more than 20 occurrences (or counts)
# and name the column with the total counts for each type "total_counts"
# and display the full results in descending order 

# use DataFrame methods (not SQL yet) to approach this
cc_filtered_new = (cc_filtered.groupBy("content_mime_type")
            .count()
            .filter("count > 20")
            .withColumnRenamed("count", "total_counts")
            .orderBy("total_counts", ascending=False))

cc_filtered_new.show(truncate= False)


+-----------------------------------------------------------------------+------------+
|content_mime_type                                                      |total_counts|
+-----------------------------------------------------------------------+------------+
|text/html                                                              |2206198     |
|unk                                                                    |273188      |
|warc/revisit                                                           |61926       |
|text/plain                                                             |19173       |
|application/json                                                       |1688        |
|application/octet-stream                                               |1240        |
|application/pdf                                                        |460         |
|application/binary                                                     |438         |
|application/rss+xml                       

In [0]:
# what are domains that show up the most in this dataset?
# create a new DataFrame that counts up the occurrences of each domain (grouping by the column "domain_name")
# show the first 50 rows of new DataFrame, including the argument truncate=False
domain_counts = (cc_filtered.groupBy("domain_name")
                 .count())
domain_counts.show(50,truncate = False)



+---------------------------+-----+
|domain_name                |count|
+---------------------------+-----+
|safehavencounselingpllc.com|10   |
|safehost.com               |1    |
|safehs.com                 |1    |
|safekeysdrivingschool.com  |2    |
|safeporntube.com           |2    |
|safeshipmoving.com         |2    |
|safesnout.com              |1    |
|safesoundfamily.com        |12   |
|safetran-traffic.com       |1    |
|safetycasesymposium.com    |66   |
|safetyinbusiness.com       |1    |
|safetyjackpot.com          |6    |
|safeunlocker.com           |8    |
|safewateredu.com           |1    |
|saffgroup.com              |13   |
|saffronbusiness.com        |4    |
|safimusic.com              |1    |
|safirhdd.com               |13   |
|sag-elm.com                |1    |
|saga-grybow.com            |26   |
|sagakraft.com              |1    |
|sagamihara-shounika.com    |4    |
|saganocoin.com             |4    |
|sagarpadwal.com            |3    |
|sagarsamy.com              

✍️ Look at your results from the previous code (cell 31). \
What does this tell us about the organization of this entire dataset? \
What do you think each line in `cc_filtered` represents?

The dataset seems to be organized in a way that each row contains information realted to "domain name" and it's occurrence. The dataset includes a wide range of domains and there is a significant variability in how often a domain appear in the data.This suggests that the data encompasses a broad sepctrum of web activity.

Each line in 'cc_filtered represents a unique data point, offering insights into the activity or presence asscociated with it's domain which can be useful for analytical purposes.


In [0]:
# create a new DataFrame holding all rows where domain_name = "sat24.com"
# use DataFrame methods (no SQL yet) to do this
# give the DataFrame only 1 column: "url"

# display the first 100 rows of this DataFrame, using the argument truncate=False
cc_filtered_df = cc_filtered.filter(cc_filtered.domain_name=="sat24.com").select('url')

cc_filtered_df.show(100, truncate = False)

+------------------------------------------------------------------+
|url                                                               |
+------------------------------------------------------------------+
|https://www.sat24.com/                                            |
|https://sat24.com/                                                |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|http://www.sat24.com/                                             |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|https://www.sat24.com/                                            |
|https://www.sat24.com/           

In [0]:
# are there more pages using http or https in this dataset?
# use DataFrame methods to create a new DataFrame that counts up and groups by the "url_protocol" column
cc_filtered_url_protocol = cc_filtered.groupBy("url_protocol").count()
cc_filtered_url_protocol.show()

+------------+-------+
|url_protocol|  count|
+------------+-------+
|        http| 536729|
|       https|2028788|
+------------+-------+



In [0]:
# what is the most common suffix (.com, .org, etc.) seen in this segment of the dataset?
# use DataFrame methods to create a new DataFrame that counts up and groups by the "suffix" column
cc_filtered_suffix = cc_filtered.groupBy("suffix").count()
cc_filtered_suffix.show(truncate= False)


+------+-------+
|suffix|count  |
+------+-------+
|com   |2565517|
+------+-------+



In [0]:
# ... and now for some SQL!
# use the "createOrReplaceTempView" function on cc_filtered
# to create a view called "common_index"

cc_filtered.createOrReplaceTempView("common_index")

In [0]:
# use SQL to answer this question:
# how many pages in this segment of the dataset include the word "sailing" in their domain name?
# hint: check out the SQL LIKE keyword: https://www.w3schools.com/sql/sql_like.asp

# show the first 50 rows of your new DataFrame
# and then print the count of rows in that new DataFrame

sailing_query = """
SELECT domain_name, COUNT(*) as count
FROM common_index
WHERE LOWER(domain_name) LIKE '%sailing%'
GROUP BY domain_name
"""
sailing_df = spark.sql(sailing_query)

sailing_df.show(50, truncate = False)

print("Total row counts: ", sailing_df.count())



+-----------------------------+-----+
|domain_name                  |count|
+-----------------------------+-----+
|sailing-week.com             |1    |
|sailingmachine.com           |2    |
|sailingportocristo.com       |1    |
|sailingyun.com               |1    |
|sailingdirections.com        |3    |
|sailingmeters.com            |14   |
|sailing-whitsundays.com      |84   |
|sailingjuniper.com           |1    |
|sailing-boats-for-sale.com   |58   |
|sailing-serbia.com           |4    |
|sailinglogistics.com         |4    |
|sailingsquadron.com          |6    |
|sailingway.com               |1    |
|sailingcielo.com             |3    |
|sailingellidah.com           |6    |
|sailingphuket.com            |6    |
|sailing-news.com             |9    |
|sailing-boats-croatia.com    |7    |
|sailinghollyblue.com         |9    |
|sailingtexas.com             |874  |
|sailingtortuga.com           |3    |
|sailingfoxes.com             |1    |
|sailingpassionata.com        |1    |
|sailingshot

In [0]:
# we are going to use DataFrame methods to ask a similar question:
# how many pages in this segment of the dataset might be translated into French?
# we are going to assume the page might be translated if it includes "/fr/" in the url
# hint: .contains("/fr/")

# create a new DataFrame with this French filter
# and drop all columns except "url" 
# show the first 10 rows of your new DataFrame, using truncate=False
# finally, print the count of rows in that new DataFrame

cc_filtered_fr = cc_filtered.filter(cc_filtered["url"].contains("/fr/")).select("url")

cc_filtered_fr.show(10, truncate = False)

print("Total row counts: ", cc_filtered_fr.count())

+---------------------------------------------------------------------------------+
|url                                                                              |
+---------------------------------------------------------------------------------+
|https://www.safehost.com/fr/centre-d-hebergement/sh3                             |
|https://safelagoon.com/fr/                                                       |
|https://safelightberlin.com/fr/blogs/news/silberra-the-path-of-the-pan-perfection|
|https://safelightberlin.com/fr/collections/35mm-slr                              |
|https://safelightberlin.com/fr/collections/bf22                                  |
|https://safelightberlin.com/fr/collections/lomography                            |
|https://safelightberlin.com/fr/collections/point-and-shoot                       |
|https://safelightberlin.com/fr/collections/silbersalz35                          |
|https://safelightberlin.com/fr/pages/contact-new                           

In [0]:
# what is the average page count per domain in this list?
# use whichever methods you choose (SQL or DataFrame functions)
# to approach this

# show your result as a new DataFrame with a single column called "average"
avg_page_count_per_domain_query = """
SELECT AVG(count) as average
FROM(
    SELECT domain_name, COUNT(*) as count
    FROM common_index
    GROUP BY domain_name
)
"""
avg_page_count_per_domain_df = spark.sql(avg_page_count_per_domain_query)

avg_page_count_per_domain_df.show(truncate = False)


+------------------+
|average           |
+------------------+
|22.041660223035553|
+------------------+



In [0]:
# now let's pull in some more data!
# uncomment & run the following code:
!wget https://data.commoncrawl.org/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/wet/CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz



--2024-03-04 03:35:54--  https://data.commoncrawl.org/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/wet/CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz
Resolving data.commoncrawl.org (data.commoncrawl.org)... 99.84.66.123, 99.84.66.102, 99.84.66.103, ...
Connecting to data.commoncrawl.org (data.commoncrawl.org)|99.84.66.123|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 112806267 (108M) [application/octet-stream]
Saving to: ‘CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz’


2024-03-04 03:35:56 (70.1 MB/s) - ‘CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz’ saved [112806267/112806267]



In [0]:
# uncomment & run the following code:
!zcat CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz | head -n 500

# this reads the first 500 lines of the file you downloaded into the notebook's local file system (not distributed file)



WARC/1.0
WARC-Type: warcinfo
WARC-Date: 2023-12-12T01:49:07Z
WARC-Filename: CC-MAIN-20231128083443-20231128113443-00000.warc.wet.gz
WARC-Record-ID: <urn:uuid:965b7f0e-e769-4c47-9e28-f7d9120e1027>
Content-Type: application/warc-fields
Content-Length: 382

Software-Info: ia-web-commons.1.1.10-SNAPSHOT-20231102093126
Extracted-Date: Tue, 12 Dec 2023 01:49:07 GMT
robots: checked via crawler-commons 1.5-SNAPSHOT (https://github.com/crawler-commons/crawler-commons)
isPartOf: CC-MAIN-2023-50
operator: Common Crawl Admin (info@commoncrawl.org)
description: Wide crawl of the web for November/December 2023
publisher: Common Crawl



WARC/1.0
WARC-Type: conversion
WARC-Target-URI: http://0-50.ru/news/line/2014-07-13/id_43733.html
WARC-Date: 2023-11-28T11:34:01Z
WARC-Record-ID: <urn:uuid:ea93f3a5-6e42-46bb-b2da-bbc91ff80ef0>
WARC-Refers-To: <urn:uuid:60a1e8f3-68a9-437c-937c-e52edd95c91c>
WARC-Block-Digest: sha1:IAHHMFYRVDRXUQQMQTZA3KR4GKQEF2XB
WARC

✍️  What kind of data is this, compared to the first dataset?


First dataset looks more structured data as compared to this dataset it looks like a raw material. This data contains everything from HTML to Headers etc. It looks like this data needs to be further processing. The first data looks more organized and cleaned then this one. 

In [0]:
# let's pull in more common crawl data!
# uncomment & run the following code:
!wget https://data.commoncrawl.org/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/warc/CC-MAIN-20231128083443-20231128113443-00000.warc.gz


--2024-03-04 03:35:56--  https://data.commoncrawl.org/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/warc/CC-MAIN-20231128083443-20231128113443-00000.warc.gz
Resolving data.commoncrawl.org (data.commoncrawl.org)... 99.84.66.110, 99.84.66.123, 99.84.66.102, ...
Connecting to data.commoncrawl.org (data.commoncrawl.org)|99.84.66.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1208969113 (1.1G) [application/octet-stream]
Saving to: ‘CC-MAIN-20231128083443-20231128113443-00000.warc.gz’


2024-03-04 03:36:20 (48.4 MB/s) - ‘CC-MAIN-20231128083443-20231128113443-00000.warc.gz’ saved [1208969113/1208969113]



In [0]:
# uncomment & run the following code:
!zcat CC-MAIN-20231128083443-20231128113443-00000.warc.gz | head -n 500




WARC/1.0
WARC-Type: warcinfo
WARC-Date: 2023-11-28T08:34:43Z
WARC-Record-ID: <urn:uuid:2b6daf87-e3fb-4851-a842-b24612e49256>
Content-Length: 499
Content-Type: application/warc-fields
WARC-Filename: CC-MAIN-20231128083443-20231128113443-00000.warc.gz

isPartOf: CC-MAIN-2023-50
publisher: Common Crawl
description: Wide crawl of the web for November/December 2023
operator: Common Crawl Admin (info@commoncrawl.org)
hostname: ip-10-67-67-56
software: Apache Nutch 1.19 (modified, https://github.com/commoncrawl/nutch/)
robots: checked via crawler-commons 1.5-SNAPSHOT (https://github.com/crawler-commons/crawler-commons)
format: WARC File Format 1.1
conformsTo: https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/


WARC/1.0
WARC-Type: request
WARC-Date: 2023-11-28T11:34:01Z
WARC-Record-ID: <urn:uuid:0ca80b2d-5a43-41a0-8809-bfe4626f2f8d>
Content-Length: 277
Content-Type: application/http; msgtype=request
WARC-Warcinfo-I

✍️  Now, what kind of data is **this**, compared to the first dataset and the second file?

This dataset looks corrupted or not encoded properly as there are lot of ? and dimond shapes in this as compared to the 2nd file . This dataset looks more like 2nd file raw HTML data but with some issues which needs to be resolved. First datset is more structured as compared to this one. This needs some coding adjustments.

In [0]:
# uncomment & run the following code:
!gunzip CC-MAIN-20231128083443-20231128113443-00000.warc.gz

# this code "unzips" the compressed .gz file and may take a minute - !

In [0]:
# uncomment & run the following code 
# to upload the file to the DBFS distributed file system  - this might take a few minutes!

dbutils.fs.cp('file:/databricks/driver/CC-MAIN-20231128083443-20231128113443-00000.warc', 'dbfs:/FileStore/tables/')

# wait until the Output below says "True," and you could also check for the file in the Catalog -> DBFS menu on the left.

True

In [0]:
# create an RDD with this most recent data using the .textFile function
# call this RDD warc_rdd
# hint: the file path should be dbfs:/FileStore/tables/CC-MAIN-20231128083443-20231128113443-00000.warc

warc_rdd = spark.sparkContext.textFile("dbfs:/FileStore/tables")

In [0]:
# how many partitions are used in this RDD?

warc_rdd.getNumPartitions()

84

In [0]:
# display the first 500 elements in this RDD
print(warc_rdd.take(500))


['WARC/1.0', 'WARC-Type: warcinfo', 'WARC-Date: 2023-11-28T08:34:43Z', 'WARC-Record-ID: <urn:uuid:2b6daf87-e3fb-4851-a842-b24612e49256>', 'Content-Length: 499', 'Content-Type: application/warc-fields', 'WARC-Filename: CC-MAIN-20231128083443-20231128113443-00000.warc.gz', '', 'isPartOf: CC-MAIN-2023-50', 'publisher: Common Crawl', 'description: Wide crawl of the web for November/December 2023', 'operator: Common Crawl Admin (info@commoncrawl.org)', 'hostname: ip-10-67-67-56', 'software: Apache Nutch 1.19 (modified, https://github.com/commoncrawl/nutch/)', 'robots: checked via crawler-commons 1.5-SNAPSHOT (https://github.com/crawler-commons/crawler-commons)', 'format: WARC File Format 1.1', 'conformsTo: https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/', '', '', 'WARC/1.0', 'WARC-Type: request', 'WARC-Date: 2023-11-28T11:34:01Z', 'WARC-Record-ID: <urn:uuid:0ca80b2d-5a43-41a0-8809-bfe4626f2f8d>', 'Content-Length: 277', 'Content-Type: application/http; msgtype

✍️  What does each "element" of the RDD here represent?

Each element in the RDD represents extracted content from web crawl. The snippets are text and HTML tags like heading and links. These elements are pieces of data from websites, structured for further processing within Spark. 

In [0]:
# you are given the function below:

def count_script_tags(line):
    return line.lower().count('<script type="text/javascript">')

# use this function, and lambda functions, to count the number of times that Javascript tag
# appears in this segment of the dataset
# hint: you will want to use a mapping function, and `.reduce()` 

# print the resulting value, the final count

Javsscript_tag_rdd = warc_rdd.map(lambda line: count_script_tags(line))
total_js_tags = Javsscript_tag_rdd.reduce(lambda a, b: a + b)
print("The final count: ", total_js_tags)



The final count: 91982


In [0]:
# there are usually multiple ways of accomplishing the same task ...
# use an accumulator variable to count the number of Javascript tags 
# instead of the mapping + reducing that you did previously

# print the resulting value of the accumulator variable

js_tags_count_accumulator = spark.sparkContext.accumulator(0)

def count_accumulates(line):
  count = line.lower().count('<script type="text/javascript">')
  js_tags_count_accumulator.add(count)

warc_rdd.foreach(count_accumulates)

print("Final value of accumulator variable: ", js_tags_count_accumulator.value)


Final value of accumulator variable:  91982


✍️  Check out the Spark UI (under View menu) to investigate whether there was any performance difference between \
your 2 approaches for counting the Javascript tags.

I think the second approach performed better as it took a little bit (a few seconds) less time as compared to Map and Reduce. 


In [0]:
# let's try and figure out the average content length of these pages from the common crawl
# each page gives that information in a header in the data
# first: create a new RDD that filters warc_rdd
# using a lambda function that runs on each line: 
# if the string 'Content-Length' from the header is in the line, then that row should be in the new RDD

# display the first 10 rows of this RDD

warc_rdd_filtered = warc_rdd.filter(lambda line: 'Content-Length' in line)

warc_rdd_filtered.take(10)


['Content-Length: 499',
 'Content-Length: 277',
 'Content-Length: 42587',
 'Content-Length: 42070',
 'Content-Length: 208',
 'Content-Length: 327',
 'Content-Length: 39949',
 'Content-Length: 39528',
 'Content-Length: 202',
 'Content-Length: 295']

In [0]:
# the goal now = to get a DataFrame with only the content length, as a number, on each line

# you are given the following function, which parses the text on each line and returns tuples:
# (the function uses regular expressions, or RegEx, to search through strings for numbers)
# (more on RegEx: https://en.wikipedia.org/wiki/Regular_expression)

import re
# import the re (or RegEx) library

def parse_length_line(line):
    # regular expression to find 'Content-Length:' followed by any number of digits
    match = re.search(r'Content-Length:\s*(\d+)', line)
    if match:
        # if a match is found, convert the matching group (the digits) to an integer
        return (int(match.group(1)),)
    else:
        # if no match, return None
        return (None,)
    
# using RDD methods, run this function on every line in the RDD you just created in the previous cell
# and then convert the result to a new DataFrame called length_df
# it should have only 1 column: "Length"

warc_rdd_parsed = warc_rdd_filtered.map(parse_length_line)

New_length_rdd = warc_rdd_parsed.filter(lambda x: x[0] is not None)

length_df = New_length_rdd.map(lambda x: (x[0],)).toDF(["Length"])




In [0]:
# show the first 10 rows of length_df
# and print the schema of this DataFrame

length_df.show(10)
length_df.printSchema()

+------+
|Length|
+------+
|   499|
|   277|
| 42587|
| 42070|
|   208|
|   327|
| 39949|
| 39528|
|   202|
|   295|
+------+
only showing top 10 rows

root
 |-- Length: long (nullable = true)



In [0]:
# now for some SQL!
# use createOrReplaceTempView to create a view based on length_df
# called "content_length"

length_df.createOrReplaceTempView("content_length")


In [0]:
# use SQL to print out the average content length for pages in this segment of the dataset

avg_content_length_query = """
SELECT AVG(Length) AS average_content_length
FROM content_length
"""

spark.sql(avg_content_length_query).show(truncate= False)


+----------------------+
|average_content_length|
+----------------------+
|93421.36533395662     |
+----------------------+



In [0]:
# invent your own question to query from this dataset, or previous DataFrames in this notebook
# you may use any methods that you choose to answer the question

#Q: How many pages have a content length above and below the average?
#Using SQL to calculate content length above and below average.
query = """
WITH AvgLength AS (
  SELECT AVG(Length) AS avg_content_length 
  FROM content_length
)
SELECT 
  (SELECT COUNT(*) FROM content_length WHERE Length > (SELECT avg_content_length FROM AvgLength)) AS count_above_avg,
  (SELECT COUNT(*) FROM content_length WHERE Length < (SELECT avg_content_length FROM AvgLength)) AS count_below_avg
"""

result = spark.sql(query)
result.show(truncate=False)



+---------------+---------------+
|count_above_avg|count_below_avg|
+---------------+---------------+
|33732          |124567         |
+---------------+---------------+



✍️  Please **cite any sources** that you used, other than class notes and the Codecademy course, \
to help with or complete this notebook. You should list any websites, tools, videos, etc.!

- https://medium.com/@charchitpatidar/how-cache-works-in-apache-spark-aea6eeb3fd03#:~:text=Since%20cache()%20is%20a,RDD%20in%20a%20single%20action.
- https://www.freshers.in/article/aws-glue/pyspark-data-processing-in-aws-glue-dataframe-cache/
- https://w3schoolls.com/


🚀 **To submit this assignment,** go to **File -> Export -> HTML.**  \
Upload this file and your Codecademy screenshot using [this form](https://airtable.com/appgQV4bSPRVdHEWt/shrOahNbuss9wxvyN)