In [47]:
# Import required modules
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# Get SparkContext
sc = spark.sparkContext

In [48]:
# Read Domains CSV File into an RDD
common_crawl_domain_counts = sc.textFile('cc-main-limited-domains.csv')

# Display first few domains from the RDD
common_crawl_domain_counts.take(10)

                                                                                

['367855\t172-in-addr\tarpa\t1',
 '367856\taddr\tarpa\t1',
 '367857\tamphic\tarpa\t1',
 '367858\tbeta\tarpa\t1',
 '367859\tcallic\tarpa\t1',
 '367860\tch\tarpa\t1',
 '367861\td\tarpa\t1',
 '367862\thome\tarpa\t7',
 '367863\tiana\tarpa\t1',
 '367907\tlocal\tarpa\t1']

In [49]:
# take 2 
def fmt_domian_graph_entry(entry):
    site_id, domain,tld,num_subdomains = entry.split('\t')
    return int(site_id),domain,int(num_subdomains)

In [15]:
formmated_host_counts = common_crawl_domain_counts.map(lambda e: fmt_domian_graph_entry(e))

formmated_host_counts.take(10)

[(367855, '172-in-addr', 1),
 (367856, 'addr', 1),
 (367857, 'amphic', 1),
 (367858, 'beta', 1),
 (367859, 'callic', 1),
 (367860, 'ch', 1),
 (367861, 'd', 1),
 (367862, 'home', 7),
 (367863, 'iana', 1),
 (367907, 'local', 1)]

In [16]:
def extract_subdomian_counts(entry):
    site_id,domain,tld,num_subdomains = entry.split("\t")
    return int(num_subdomains)


host_count = common_crawl_domain_counts.map(lambda e: extract_subdomian_counts(e))
host_count.take(10)

[1, 1, 1, 1, 1, 1, 1, 7, 1, 1]

In [19]:
total_host_count = host_count.reduce(lambda a,b : a+b)
total_host_count

595466

In [20]:
spark.stop()

In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

23/12/29 15:33:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [50]:
common_crawl = spark.read \
    .option('delimiter', '\t') \
    .option('inferSchema', True) \
    .csv('cc-main-limited-domains.csv')

# Display the DataFrame to the notebook
common_crawl.show(5, truncate=False)

+------+-----------+----+---+
|_c0   |_c1        |_c2 |_c3|
+------+-----------+----+---+
|367855|172-in-addr|arpa|1  |
|367856|addr       |arpa|1  |
|367857|amphic     |arpa|1  |
|367858|beta       |arpa|1  |
|367859|callic     |arpa|1  |
+------+-----------+----+---+
only showing top 5 rows



In [52]:
common_crawl=common_crawl.withColumnRenamed("_c0","site_id")\
            .withColumnRenamed("_c1","domain")\
            .withColumnRenamed("_c2","top_level_domain")\
            .withColumnRenamed("_c3","num_subdomains")

In [64]:
common_crawl.show(5,truncate=False)

+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows



In [54]:
common_crawl.printSchema()

root
 |-- site_id: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- top_level_domain: string (nullable = true)
 |-- num_subdomains: integer (nullable = true)



In [65]:
common_crawl.toDF("site_id",'domain','top_level_domain','num_subdomains').show(5,truncate=False)

+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows



In [66]:
# save common_craw1 use parquet
common_crawl\
    .write\
    .parquet('./results/common_crawl/', mode="overwrite")

In [67]:
# Read from parquet directory
common_crawl_domains = spark.read\
    .parquet('./results/common_crawl/')

# Display the first few rows of the DataFrame and the schema in the notebook
common_crawl_domains.show(5, truncate=False)
common_crawl_domains.printSchema()

+-------+-----------+----------------+--------------+
|site_id|domain     |top_level_domain|num_subdomains|
+-------+-----------+----------------+--------------+
|367855 |172-in-addr|arpa            |1             |
|367856 |addr       |arpa            |1             |
|367857 |amphic     |arpa            |1             |
|367858 |beta       |arpa            |1             |
|367859 |callic     |arpa            |1             |
+-------+-----------+----------------+--------------+
only showing top 5 rows

root
 |-- site_id: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- top_level_domain: string (nullable = true)
 |-- num_subdomains: integer (nullable = true)



Querying Domain Counts with Pyspark

In [68]:
common_crawl_domains.createOrReplaceTempView("crawl")

## Calculate the total number of domains for each top-level domain in the dataset.

In [69]:
common_crawl_domains\
    .groupby('top_level_domain')\
    .count()\
    .orderBy('count', ascending=False)\
    .show(10, truncate=False)

+----------------+-----+
|top_level_domain|count|
+----------------+-----+
|edu             |18547|
|gov             |15007|
|travel          |6313 |
|coop            |5319 |
|jobs            |3893 |
|post            |117  |
|map             |34   |
|arpa            |11   |
+----------------+-----+



In [70]:
# Aggregate the DataFrame using SQL's `COUNT`, `GROUP BY`, and `ORDER BY`
spark.sql(
    """
    SELECT 
        top_level_domain, 
        COUNT(domain) AS count
    FROM crawl
    GROUP BY top_level_domain
    ORDER BY COUNT(domain) DESC
    """
).show(10, truncate=False)

+----------------+-----+
|top_level_domain|count|
+----------------+-----+
|edu             |18547|
|gov             |15007|
|travel          |6313 |
|coop            |5319 |
|jobs            |3893 |
|post            |117  |
|map             |34   |
|arpa            |11   |
+----------------+-----+



In [75]:
common_crawl_domains.groupBy('top_level_domain')\
                    .sum('num_subdomains')\
                    .orderBy('sum(num_subdomains)',ascending=False)\
                    .show(10,truncate=False)

+----------------+-------------------+
|top_level_domain|sum(num_subdomains)|
+----------------+-------------------+
|edu             |484438             |
|gov             |85354              |
|travel          |10768              |
|coop            |8683               |
|jobs            |6023               |
|post            |143                |
|map             |40                 |
|arpa            |17                 |
+----------------+-------------------+



In [78]:
spark.sql(
    """
    select top_level_domain,
    sum(num_subdomains) as total_count
    from crawl
    group by top_level_domain
    order by sum(num_subdomains) desc
    """
).show(10,truncate=False)

+----------------+-----------+
|top_level_domain|total_count|
+----------------+-----------+
|edu             |484438     |
|gov             |85354      |
|travel          |10768      |
|coop            |8683       |
|jobs            |6023       |
|post            |143        |
|map             |40         |
|arpa            |17         |
+----------------+-----------+





## 

How many sub-domains does nps.gov have? Filter the dataset to that website's entry, display the columns top_level_domain, domain, and num_subdomains in your result.

In [79]:
common_crawl_domains\
    .select(['top_level_domain', 'domain', 'num_subdomains'])\
    .filter(common_crawl_domains.domain == "nps")\
    .filter(common_crawl_domains.top_level_domain == "gov")\
    .show(10, truncate=False)

+----------------+------+--------------+
|top_level_domain|domain|num_subdomains|
+----------------+------+--------------+
|gov             |nps   |178           |
+----------------+------+--------------+



In [80]:
# using sql
spark.sql(
    """
    SELECT top_level_domain, domain, num_subdomains
    FROM crawl
    WHERE domain = "nps" 
    AND top_level_domain = 'gov'
    """
).show(truncate=False)

+----------------+------+--------------+
|top_level_domain|domain|num_subdomains|
+----------------+------+--------------+
|gov             |nps   |178           |
+----------------+------+--------------+



In [81]:
spark.stop()