### Exploratory Data Analysis of Common Crawl Web Archive Data

In this notebook, we perform an initial assessment of the nature of the data provided in the web archive (WARC) files made available in the <a href="https://commoncrawl.org/">Common Crawl</a> web data repository. This resource consists of several hundred terabytes of web crawl data including related HTML content and headers, which we will explore here. We will make note of the different data elements in these WARC files, how they can be interpreted, and how we might transform these into a tabular form for downstream storage and analysis.

For the purposes of this EDA, we will narrow the scope to just a single WARC file to begin characterizing the data. Here, the <a href="https://resiliparse.chatnoir.eu/en/stable/man/fastwarc.html">FastWARC</a> package is used for the purposes of opening and exploring each web archive.

In [1]:
# Packages
import numpy as np
import pandas as pd
import re
import os
from fastwarc.warc import *
from fastwarc.stream_io import GZipStream
import findspark
findspark.init()
import pyspark
import pprint
from timeit import default_timer as timer
import json

#### Notes on WARC Record Types
Via the documentation on the FastWARC package, we note that each record in a WARC is not necessarily an HTML response containing HTML headers/code but can also simply consist of only of a request record. As analytics use cases would more likely be interested in records with actual HTML data, we can limit our results to these types of response records. By referencing the documentation, we can see how to create an iterator object that will serve as a generator for response records only:

In [2]:
# Constants
DATA_LOC = "../data"
WARCS = [os.path.join(DATA_LOC, i) for i in os.listdir(DATA_LOC) if bool(re.search("\.warc.gz$",i))]

In [3]:
# Helper functions


def parse_record(record, return_html=False):
    """Extract WARC/HTTP header and optionally the full HTML"""

    # Headers
    warc_header = record.headers
    http_header = record.http_headers

    # Raw HTML
    raw_html = record.reader.read()
        
    # Output
    if return_html:
        return (warc_header, http_header, raw_html)
    else:
        return (warc_header, http_header)

#### Example Record

We can begin by extracting the WARC headers, and payload from just a single record known to have all of these elements available. As mentioned, the FastWARC package allows for custom filters, so in this case we filter for a specific site's metadata:

In [4]:
# Pull an example record

with (open(WARCS[0], 'rb')) as stream:
    example_uri = 'http://1836saloon.net/2019/09/22/specials-for-the-week-of-9-22-thru-9-28/'
    example_iterator = ArchiveIterator(stream, func_filter=
                                    lambda x: (x.headers.get('WARC-Type')=='response') and (x.headers.get('WARC-Target-URI')==example_uri))
    
    example_record = next(example_iterator)

    warc_header, http_header, raw_html = parse_record(example_record, True)

##### WARC Headers

From the site documentation, we can see that the web archive files used by Common Crawl use set headers that convey different types of information. These named fields follow a standardized format, and we can leverage a resource <a href="https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#named-fields">offered by Github</a> which contains additional information about these fields. One item of analytics interest is the content length, which stores the size of the response in bytes:

In [5]:
# View the WARC Header
pprint.pp(dict(warc_header))

{'WARC-Type': 'response',
 'WARC-Date': '2023-02-06T06:46:56Z',
 'WARC-Record-ID': '<urn:uuid:f863b6b3-0632-4524-9b1a-e7cd3ea7a231>',
 'Content-Length': '43014',
 'Content-Type': 'application/http; msgtype=response',
 'WARC-Warcinfo-ID': '<urn:uuid:2c7f7c4e-d996-4046-a11d-ce5ea7367928>',
 'WARC-Concurrent-To': '<urn:uuid:82c7afee-8991-4186-a2ec-a54984ced648>',
 'WARC-IP-Address': '50.63.8.18',
 'WARC-Target-URI': 'http://1836saloon.net/2019/09/22/specials-for-the-week-of-9-22-thru-9-28/',
 'WARC-Payload-Digest': 'sha1:B5R5RCDZQDU5DJSEMCNXL7M3ENJCRO5Y',
 'WARC-Block-Digest': 'sha1:EIXW4QUYQBU6D3RLK4VHEFXVJ5U7U3LH',
 'WARC-Identified-Payload-Type': 'text/html'}


##### HTTP Headers
We can simlarly view the output of the HTTP headers. As described in <a href="https://en.wikipedia.org/wiki/List_of_HTTP_header_fields">this article</a>, many types of HTTP headers exist and will by no means be uniform across different websites. While many fields exist, the following data elements could be of interest to analytics use cases:

- Server: Name of the server hosting the site

- Content-Language: Language(s) for the intended audience of the site

- Referer: This denotes the website from which a link was used to access the page

- Last-Modified: Date in which the object was last updated

In [6]:
# View the HTTP Headers
pprint.pp(dict(http_header))

{'Date': 'Mon, 06 Feb 2023 06:46:51 GMT',
 'Server': 'Apache',
 'X-Pingback': 'http://1836saloon.net/xmlrpc.php',
 'Link': '<http://1836saloon.net/wp-json/>; rel="https://api.w.org/", '
         '<http://1836saloon.net/?p=529>; rel=shortlink',
 'Upgrade': 'h2,h2c',
 'Connection': 'Upgrade, Keep-Alive',
 'Vary': 'Accept-Encoding',
 'X-Crawler-Content-Encoding': 'br',
 'X-Crawler-Content-Length': '10028',
 'Content-Length': '42559',
 'Keep-Alive': 'timeout=5',
 'Content-Type': 'text/html; charset=UTF-8'}


##### Raw HTML
The raw HTML code is also available for view. At this point, we can simply consider ingesting the code, but we may wish to parse these records at some point to extract elements of interest like page titles. In this case, we can see that the content of this page pertained to a special being offered at a restaurant:

In [8]:
pprint.pp({'html':raw_html.decode('utf-8')[0:1000]})

{'html': '\r\n'
         '<!DOCTYPE html>\n'
         '<html lang="en-US">\n'
         '<head>\n'
         '<meta charset="UTF-8">\n'
         '<meta name="viewport" content="width=device-width, '
         'initial-scale=1">\n'
         '<link rel="profile" href="http://gmpg.org/xfn/11">\n'
         '<link rel="pingback" href="http://1836saloon.net/xmlrpc.php">\n'
         '\n'
         '<!-- WP_Head -->\n'
         '<title>SPECIALS FOR THE WEEK OF 9/22 THRU 9/28 &#8211; 1836 Saloon '
         '&amp; Grille</title>\n'
         '<meta property="og:title" content="SPECIALS FOR THE WEEK OF 9/22 '
         'THRU 9/28"/>\n'
         '<meta property="og:description" content="SUNDAY FOOTBALL SPECIALS '
         'CHANGE WEEKLY SO COME ON IN TO SEE WHATS NEW!!!!!!!!!!!!!!  DAILY '
         'APPETIZERS:  $3.75 CUP OF THE SOUP OF THE DAY  $3.95 CORN NUGG"/>\n'
         '<meta property="og:url" '
         'content="http://1836saloon.net/2019/09/22/specials-for-the-week-of-9-22-thru-9-28/"/>\n'
   

#### Considerations for Extraction at Scale
There are a few things we may wish to consider regarding how we may ingest these at scale. To review, a single crawl usually consists of ~90,000 WARC files. While we will not be ingesting the entire Common Crawl dataset, we would like to explore how long processing takes for a given file.

To start, we can view how many total records are in a single WARC file before seeing what the effects are of running extractions on the limited the result set. We can see that even if we were to ingest just one crawl and assuming similar read times per file, it could potentially take a long time to ingest a large number of files. We may wish to explore options available on distributed platforms to reduce ingestion times. 

In [6]:
def warc_simple_iteration(path):
    num_records = 0
    with GZipStream(open(WARCS[0], 'rb')) as stream:
        iterator = ArchiveIterator(stream)
        for record in iterator:
            num_records += 1
    return num_records

In [7]:
import time
start_time = time.time()
num_records = warc_simple_iteration(WARCS[0])
end_time = time.time()
elapsed_time = round(end_time - start_time,3)

print(f"Iterated through {num_records} total records in {elapsed_time} seconds.")

Iterated through 108754 total records in 9.113 seconds.


In [8]:
def warc_to_dictionary(path):
    """Create a dictionary of WARC records with HTTP responses"""
    num_records = 0
    with GZipStream(open(WARCS[0], 'rb')) as stream:
        iterator = ArchiveIterator(stream, func_filter=lambda x: x.headers.get('WARC-Type')=='response')
        warc_headers = []
        http_headers = []
        raw_html = []
        for record in iterator:
            warc_headers.append(record.headers)
            http_headers.append(record.http_headers)
            raw_html.append(record.reader.read())
            num_records += 1
    
    proposed_output = {'num_records': num_records,
              'warc_headers': [dict(i) for i in warc_headers],
              'http_headers': [dict(i) for i in http_headers],
              'raw_html': raw_html
              }
    
    return proposed_output['num_records']

In [9]:
start_time = time.time()
num_records = warc_to_dictionary(WARCS[0])
end_time = time.time()
elapsed_time = round(end_time - start_time,3)

print(f"Obtained usable results for {num_records} records in {elapsed_time} seconds.")

Obtained usable results for 36251 records in 15.709 seconds.


#### Extraction and Data Exploration in Spark
One option is to leverage Spark for the purposes of ingestion using distributed computing. In this case, we explore a Pyspark implementation documented in the <a href="https://github.com/edsu/spn#readme">Save Page Now</a> Github repository which explores the use of Spark with WARC data ingestion. Performance is difficult to gauge at this point due to running Spark just on a local machine, but this methodology can be used to continue our exploration and start to formulate how we will ingest this data at scale.

Here, we will build a simple schema consisting of different header elements to finalize the EDA. We could also use this as a starting point for building the actual pipeline later in the project.

In [10]:
# Pyspark packages and session
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types as T
conf = SparkConf().setAppName('SparkApp').setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
# Setup schema
field_dict = {
    'headers':
        {
            'WARC-Record-ID': 'warc_record_id',
            'WARC-Date': 'warc_date',
            'Content-Length': 'content_length',
            'Content-Type': 'content_type',
            'WARC-Target-URI': 'warc_target_uri'
        },
    'http_headers':
        {
            'Server': 'server_name',
            'Content-Language': 'content_language',
            'Referer': 'referer',
            'Last-Modified': 'last_modified'
        }

 }

sorted_header_keys = sorted(field_dict['headers'].keys())
sorted_http_header_keys = sorted(field_dict['http_headers'].keys())
sorted_keys = sorted_header_keys + sorted_http_header_keys
schema = T.StructType([T.StructField(field_dict['headers'][i], T.StringType(), True) for i in sorted_header_keys] +
                      [T.StructField(field_dict['http_headers'][i], T.StringType(), True) for i in sorted_http_header_keys])

In [5]:
# Helper unction to extract WARC files via Pyspark
def extract_warcs(warc_files):
    """Extraction function using FastWARC, yielding records for creating a Spark RDD, based on Save Page Now methodology"""
    for warc_file in warc_files:
        with open(warc_file, 'rb') as stream:
            for record in ArchiveIterator(stream, func_filter=lambda x: x.headers.get('WARC-Type')=='response'):
                yield tuple([record.headers.get(i) for i in sorted_header_keys]+[record.http_headers.get(i) for i in sorted_http_header_keys])

In [6]:
warcs = sc.parallelize(WARCS)
results = warcs.mapPartitions(extract_warcs)
results_df = spark.createDataFrame(results, schema)
results_df.cache()
results_df.count()

74713

In [19]:
# View a few rows (suppressing uri to avoid hyperlink generation)
results_df.drop('warc_target_uri').show(5)

+--------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+--------------------+
|content_length|        content_type|           warc_date|      warc_record_id|content_language|       last_modified|referer|         server_name|
+--------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+--------------------+
|         30124|application/http;...|2023-02-06T05:59:35Z|<urn:uuid:c1162ac...|            NULL|                NULL|   NULL|               nginx|
|        309861|application/http;...|2023-02-06T07:20:15Z|<urn:uuid:baff24c...|            NULL|                NULL|   NULL|               nginx|
|         38628|application/http;...|2023-02-06T06:14:31Z|<urn:uuid:768ec28...|            NULL|                NULL|   NULL|              Apache|
|           336|application/http;...|2023-02-06T05:53:28Z|<urn:uuid:6cb60ab...|            NULL|Tue, 08 Sep 2020 ...| 

We can actually pull this result into a Pandas dataframe to get a quick assessment of any potential sparsity in the data. As we can see, general metadata elements pertaining to the WARC are well populated, though elements in the HTTP headers are not, and may require ingestion of many more files to glean insights. We also have the option of ingesting the raw HTML as well, and we may wish to process these larger text chunks in a Spark environment as well.

In [17]:
# Basic assessment of selected features, ingested as strings
results_df.toPandas().describe()

Unnamed: 0,content_length,content_type,warc_date,warc_record_id,warc_target_uri,content_language,last_modified,referer,server_name
count,74713,74713,74713,74713,74713,8720,14808,6.0,70115
unique,63376,1,18069,74713,74713,254,13485,4.0,1964
top,1754,application/http; msgtype=response,2023-09-24T20:33:42Z,<urn:uuid:c1162acc-54e4-4178-9009-dec83781ddbe>,http://0337.com.cn/news/shownews.php?id=937,en,"Thu, 01 Jan 1970 00:00:00 GMT",,nginx
freq,16,74713,14,1,1,4193,16,3.0,16016


We could also potentially leverage Spark for examining raw html at scale, though we may still need to be cognizant of memory limits in the driver. When working on a local machine, attempting to ingest all records will lead to memory errors. For the purposes of this EDA (though we might want to employ a similar method later), we can test ingesting only the first 200 characters. From here, we can start to explore this text data:

In [38]:
import pyspark.sql.functions as F

# Generator to extract just the first 200 characters from the HTML payload
def extract_warc_html(warc_files):
    """Extraction function using FastWARC, yielding records for creating a Spark RDD, based on Save Page Now methodology"""
    for warc_file in warc_files:
        with open(warc_file, 'rb') as stream:
            for record in ArchiveIterator(stream, func_filter=lambda x: x.headers.get('WARC-Type')=='response'):
                yield (record.reader.read().decode('utf-8', errors='replace')[0:200],)

In [31]:
# Map this function as before
warcs = sc.parallelize(WARCS)
html_data = warcs.mapPartitions(extract_warc_html)
html_data_df = html_data.toDF().withColumnRenamed('_1', 'raw_html')

From here, we can use regular expressions to begin searching for interesting data elements like page titles. This could be performed in the following way:

In [52]:
html_data_df.select(F.regexp_extract('raw_html', r"<title>([A-Za-z0-9]*?)</title>", 1).alias('title')) \
    .filter(F.col('title')!='') \
    .sample(fraction=0.05) \
    .show(10)

+--------+
|   title|
+--------+
|Infiniti|
|Findings|
|    News|
| Scholia|
| Welcome|
|   About|
| Donetsk|
|Pesumati|
|    News|
|Promocje|
+--------+
only showing top 10 rows



In [53]:
# End the session
sc.stop()

#### Next Steps
In the next steps of this project, we will begin prototyping the ingestion pipeline in earnest, combining the downloading process and expanding on the Pyspark ingestion processes performed here as part of EDA. We will also perform data cleansing and additional transformations steps at this point to begin shaping the data into its final format.