Intro TBD

In [None]:
# This notebook requires the following additional libraries
# (please install using the preferred method for your environment, e.g. pip, conda):
#
# lib1 >= ver1
# lib2 >= ver2
# lib3 >= ver3 

# Import the libraries required for this notebook
# Built-ins
import glob
import gzip
import io
import json
import os.path
import platform
import sys
import time

# Installed libraries
%pip install requests duckdb warcio cdx_toolkit pyarrow pandas polars cdxj-indexer setuptools
import duckdb
import requests
from warcio.archiveiterator import ArchiveIterator
# TODO: Potentially switch to using AWS python libraries
#import boto3, polars, matplotlib.pyplot as plt
#from botocore import UNSIGNED
#from botocore.config import Config


### Task 1: Look at the crawl data

Downloading explanations here.

In [None]:
# Temporary downloading from github
!curl -O https://raw.githubusercontent.com/commoncrawl/whirlwind-python/main/whirlwind.warc.gz
!curl -O https://raw.githubusercontent.com/commoncrawl/whirlwind-python/main/whirlwind.warc.wat.gz
!curl -O https://raw.githubusercontent.com/commoncrawl/whirlwind-python/main/whirlwind.warc.wet.gz

WEB_ARCHIVE_FILES = ['whirlwind.warc.gz',
                     'whirlwind.warc.wat.gz',
                     'whirlwind.warc.wet.gz']


#TODO: Potentially switch to loading data from AWS S3 directly using boto3

# Location of the S3 bucket for this dataset
# bucket = "amazon-last-mile-challenges"

# List the top level of the bucket using boto3. Because this is a public bucket, we don't need to sign requests.
# Here we set the signature version to unsigned, which is required for public buckets.3
# s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))

# Print the items in the top-level prefixes
# for item in s3.list_objects_v2(Bucket=bucket, Delimiter='/')['CommonPrefixes']:
#    print(item['Prefix'])

Data format explanations here.

1. WARC
2. WET
3. WAT

### Task 2: Iterate over WARC, WET, and WAT files

warcio explanations here.

In [None]:
def warcio_iterator(file):
    # Iterates over a given web archive file
    with open(file, 'rb') as stream:
            for record in ArchiveIterator(stream):
                print('File:', file)
                print(' ', 'WARC-Type:', record.rec_type)
                if record.rec_type in {'request', 'response', 'conversion', 'metadata'}:
                    print('   ', 'WARC-Target-URI', record.rec_headers.get_header('WARC-Target-URI'))

for file in WEB_ARCHIVE_FILES:
    warcio_iterator(file)

The ArchiveIterator reads the WARC content in a single pass and allows us to access the attributes of each record (e.g. the record type through record.rec_type).

The output has three sections, one each for the WARC, WET, and WAT. For each one, it prints the record types we saw before, plus the WARC-Target-URI for those record types that have it.

### Task 3: Index the WARC, WET, and WAT

Explain CDX index vs columnar index.
We will start with CDX index.

##### Task 3.a: CDX(J) index

CDX(J) explanation here.

In [None]:
# Create *.cdxj index files from the local warcs
!cdxj-indexer whirlwind.warc.gz > whirlwind.warc.cdxj
!cdxj-indexer --records conversion whirlwind.warc.wet.gz > whirlwind.warc.wet.cdxj
!cdxj-indexer whirlwind.warc.wat.gz > whirlwind.warc.wat.cdxj

Now look at the `.cdxj` files with cat `whirlwind*.cdxj`. You'll see that each file has one entry in the index. The WARC only has the response record indexed, since by default cdxj-indexer guesses that you won't ever want to random-access the request or metadata. WET and WAT have the conversion and metadata records indexed (Common Crawl doesn't publish a WET or WAT index, just WARC).

For each of these records, there's one text line in the index - yes, it's a flat file! It starts with a string like `org,wikipedia,an)/wiki/escopete 20240518015810`, followed by a JSON blob. The starting string is the primary key of the index. The first thing is a SURT (Sort-friendly URI Reordering Transform). The big integer is a date, in ISO-8601 format with the delimiters removed.

What is the purpose of this funky format? It's done this way because these flat files (300 gigabytes total per crawl) can be sorted on the primary key using any out-of-core sort utility e.g. the standard Linux sort, or one of the Hadoop-based out-of-core sort functions.

The JSON blob has enough information to extract individual records: it says which warc file the record is in, and the offset and length of the record. We'll use that in the next section.

### Task 4: Use the CDXJ index to extract raw content from the local WARC, WET, and WAT

Normally, compressed files aren't random access. However, the WARC files use a trick to make this possible, which is that every record needs to be separately compressed. The `gzip` compression utility supports this, but it's rarely used.

To extract one record from a warc file, all you need to know is the filename and the offset into the file. If you're reading over the web, then it really helps to know the exact length of the record.

Let's get a set of extractions from your local whirlwind.*.gz files with warcio:

In [None]:
# Create extraction.* from local warcs, the offset numbers are from the cdxj index

!warcio extract --payload whirlwind.warc.gz 1023 > extraction.html
!warcio extract --payload whirlwind.warc.wet.gz 466 > extraction.txt
!warcio extract --payload whirlwind.warc.wat.gz 443 > extraction.json

# Hint: You can try python -m json.tool extraction.json

The offset numbers in the Makefile are the same ones as in the index. Look at the three output files:   `extraction.html`, `extraction.txt`, and `extraction.json` (pretty-print the json with `python -m json.tool extraction.json`).

Notice that we extracted HTML from the WARC, text from WET, and JSON from the WAT (as shown in the different file extensions). This is because the payload in each file type is formatted differently!

### Task 5: Wreck the WARC by compressing it wrong

As mentioned earlier, WARC/WET/WAT files look like they're gzipped, but they're actually gzipped in a particular way that allows random access. This means that you can't gunzip and then gzip a warc without wrecking random access. This example:

- creates a copy of one of the warc files in the repo
- uncompresses it
- recompresses it the wrong way
- runs warcio-iterator over it to show that it triggers an error
- recompresses it the right way using warcio recompress
- shows that this compressed file works


In [None]:
# We will break and then fix this warc
!cp whirlwind.warc.gz testing.warc.gz
!rm -f testing.warc
!gzip -d testing.warc.gz
	
# Iterate over this uncompressed warc: works
warcio_iterator('testing.warc')

# Compress it the wrong way
!gzip testing.warc

# Iterating over this compressed warc fails
try:
    print('\nThis wont work!')
    warcio_iterator('testing.warc.gz')
except Exception as e:
    print(f"Error iterating over malformed warc.gz: {e}")

# Now let's do it the right way
!gzip -d testing.warc.gz
!warcio recompress testing.warc testing.warc.gz

# And now iterating works
print('\nThis should work:')
warcio_iterator('testing.warc.gz')

Make sure you compress WARCs the right way!

### Task 6: Use cdx_toolkit to query the full CDX index and download those captures from AWS S3

Some of our users only want to download a small subset of the crawl. They want to run queries against an index, either the CDX index we just talked about, or in the columnar index, which we'll talk about later.

The cdx_toolkit is a set of tools for working with CDX indices of web crawls and archives. It knows how to query the CDX index across all of our crawls and also can create WARCs of just the records you want. We will fetch the same record from Wikipedia that we've been using for the whirlwind tour.

In [None]:
# Look up this capture in the comoncrawl cdx index
!cdxt --limit 1 --crawl CC-MAIN-2024-22 --from 20240518015810 --to 20240518015810 iter an.wikipedia.org/wiki/Escopete

#Extract the content from the commoncrawl s3 bucket
!rm -f TEST-000000.extracted.warc.gz
!cdxt --limit 1 --crawl CC-MAIN-2024-22 --from 20240518015810 --to 20240518015810 warc an.wikipedia.org/wiki/Escopete

# Index this new warc
!cdxj-indexer TEST-000000.extracted.warc.gz  > TEST-000000.extracted.warc.cdxj
!cat TEST-000000.extracted.warc.cdxj

# Iterate this new warc
warcio_iterator('TEST-000000.extracted.warc.gz')


We look up the capture using the cdxt commands by specifying the exact URL (an.wikipedia.org/wiki/Escopete) and the date of its capture (20240518015810). The output is the WARC file TEST-000000.extracted.warc.gz which contains a warcinfo record explaining what the WARC is, followed by the response record we requested. The Makefile target then runs cdxj-indexer on this new WARC to make a CDXJ index of it as in Task 3, and finally iterates over the WARC using warcio-iterator.py as in Task 2.

If you dig into cdx_toolkit's code, you'll find that it is using the offset and length of the WARC record, as returned by the CDX index query, to make a HTTP byte range request to S3 to download the single WARC record we want. It only downloads the response WARC record because our CDX index only has the response records indexed.

### Task 7: Find the right part of the columnar index

Now let's look at the columnar index, the other kind of index that Common Crawl makes available. This index is stored in parquet files so you can access it using SQL-based tools like AWS Athena and duckdb as well as through tables in your favorite table packages such as pandas, pyarrow, and polars.

We could read the data directly from our index in our S3 bucket and analyse it in the cloud through AWS Athena. However, this is a managed service that costs money to use (though usually a small amount). You can read about using it here. This whirlwind tour will only use the free method of either fetching data from outside of AWS (which is kind of slow), or making a local copy of a single columnar index (300 gigabytes per monthly crawl), and then using that.

The columnar index is divided up into a separate index per crawl, which Athena or duckdb can stitch together. The cdx index is similarly divided up, but cdx_toolkit hides that detail from you.

For the purposes of this whirlwind tour, we don't want to configure all the crawl indices because it would be slow. So let's start by figuring out which crawl was ongoing on the date `20240518015810`, and then we'll work with just that one crawl.

##### Task 7.a: Downloading collinfo.json

We're going to use the collinfo.json file to find out which crawl we want. This file includes the dates for the start and end of every crawl and is available through the Common Crawl website at index.commoncrawl.org. 

In [None]:
# Download collinfo.json so we can find out the crawl name
!curl -O https://index.commoncrawl.org/collinfo.json

The date of our test record is `20240518015810`, which is `2024-05-18T01:58:10` if you add the delimiters back in. We can scroll through the records in `collinfo.json` and look at the from/to values to find the right crawl: `CC-MAIN-2024-22`. Now we know the crawl name, we can access the correct fraction of the index without having to read the metadata of all the rest.

### Task 8: Query using the columnar index + DuckDB from outside AWS

A single crawl columnar index is around 300 gigabytes. If you don't have a lot of disk space, but you do have a lot of time, you can directly access the index stored on AWS S3. We're going to do just that, and then use DuckDB to make an SQL query against the index to find our webpage. We'll be running the following query:

```
    SELECT
      *
    FROM ccindex
    WHERE subset = 'warc'
      AND crawl = 'CC-MAIN-2024-22'
      AND url_host_tld = 'org' -- help the query optimizer
      AND url_host_registered_domain = 'wikipedia.org' -- ditto
      AND url = 'https://an.wikipedia.org/wiki/Escopete'
    ;
```

In [None]:
def index_download_advice(prefix, crawl):
    print('Do you need to download this index?')
    print(f' mkdir -p {prefix}/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/')
    print(f' cd {prefix}/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/')
    print(f' aws s3 sync s3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/ .')


def print_row_as_cdxj(row):
    df = row.fetchdf()
    for ro in df.itertuples(index=False):
        d = ro._asdict()
        cdxjd = {
            'url': d['url'],
            'mime': d['content_mime_type'],
            'status': str(d['fetch_status']),
            'digest': 'sha1:' + d['content_digest'],
            'length': str(d['warc_record_length']),
            'offset': str(d['warc_record_offset']),
            'filename': d['warc_filename'],
        }

        timestamp = d['fetch_time'].isoformat(sep='T')
        timestamp = timestamp.translate(str.maketrans('', '', '-T :Z')).replace('+0000', '')

        print(d['url_surtkey'], timestamp, json.dumps(cdxjd))


def print_row_as_kv_list(row):
    df = row.fetchdf()
    for ro in df.itertuples(index=False):
        d = ro._asdict()
        for k, v in d.items():
            print(' ', k, v)


def get_files(algo, crawl):
    if algo == 's3_glob':
        # 403 errors with and without credentials. you have to be commoncrawl-pds
        files = f's3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet'
        raise NotImplementedError('will cause a 403')
    elif algo == 'local_files':
        files = os.path.expanduser(f'~/commmoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet')
        files = glob.glob(files)
        # did we already download? we expect 300 files of about a gigabyte
        if len(files) < 250:
            index_download_advice('~', crawl)
            exit(1)
    elif algo == 'ccf_local_files':
        files = glob.glob(f'/home/cc-pds/commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet')
        if len(files) < 250:
            index_download_advice('/home/cc-pds', crawl)
            exit(1)
    elif algo == 'cloudfront_glob':
        # duckdb can't glob this, same reason as s3_glob above
        files = f'https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/*.parquet'
        raise NotImplementedError('duckdb will throw an error because it cannot glob this')
    elif algo == 'cloudfront':
        prefix = f's3://commoncrawl/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/'
        external_prefix = f'https://data.commoncrawl.org/cc-index/table/cc-main/warc/crawl={crawl}/subset=warc/'
        file_file = f'{crawl}.warc.paths.gz'

        with gzip.open(file_file, mode='rt', encoding='utf8') as fd:
            files = fd.read().splitlines()
            files = [external_prefix+f for f in files]
    else:
        raise NotImplementedError('algo: '+algo)
    return files


def run_duckdb_query(algo, crawl):
    windows = True if platform.system() == 'Windows' else False
    if windows:
        # windows stdout is often cp1252
        sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
    files = get_files(algo, crawl)
    retries_left = 100

    while True:
        try:
            ccindex = duckdb.read_parquet(files, hive_partitioning=True)
            break
        except (duckdb.HTTPException, duckdb.InvalidInputException) as e:
            # read_parquet exception seen: HTTPException("HTTP Error: HTTP GET error on 'https://...' (HTTP 403)")
            # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
            print('read_parquet exception seen:', repr(e), file=sys.stderr)
            if retries_left:
                print('sleeping for 60s', file=sys.stderr)
                time.sleep(60)
                retries_left -= 1
            else:
                raise

    duckdb.sql('SET enable_progress_bar = true;')
    duckdb.sql('SET http_retries = 100;')
    #duckdb.sql("SET enable_http_logging = true;SET http_logging_output = 'duck.http.log'")

    print('total records for crawl:', crawl)
    retries_left = 100
    while True:
        try:
            print(duckdb.sql('SELECT COUNT(*) FROM ccindex;'))
            break
        except duckdb.InvalidInputException as e:
            # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
            print('duckdb exception seen:', repr(e), file=sys.stderr)
            if retries_left:
                print('sleeping for 10s', file=sys.stderr)
                time.sleep(10)
                retries_left -= 1
            else:
                raise

    sq2 = f'''
    select
      *
    from ccindex
    where subset = 'warc'
      and crawl = 'CC-MAIN-2024-22'
      and url_host_tld = 'org' -- help the query optimizer
      and url_host_registered_domain = 'wikipedia.org' -- ditto
      and url = 'https://an.wikipedia.org/wiki/Escopete'
    ;
    '''

    row2 = duckdb.sql(sq2)
    print('our one row')
    while True:
        try:
            row2.show()
            break
        except duckdb.InvalidInputException as e:
            # duckdb.duckdb.InvalidInputException: Invalid Input Error: No magic bytes found at end of file 'https://...'
            print('duckdb exception seen:', repr(e), file=sys.stderr)
            if retries_left:
                print('sleeping for 10s', file=sys.stderr)
                time.sleep(10)
                retries_left -= 1
            else:
                raise

    print('writing our one row to a local parquet file, whirlwind.parquet')
    row2.write_parquet('whirlwind.parquet')

    cclocal = duckdb.read_parquet('whirlwind.parquet')

    print('total records for local whirlwind.parquet should be 1')
    print(duckdb.sql('SELECT COUNT(*) FROM cclocal;'))

    sq3 = sq2.replace('ccindex', 'cclocal')
    row3 = duckdb.sql(sq3)
    print('our one row, locally')
    row3.show()

    print('complete row:')
    print_row_as_kv_list(row3)
    print('')

    print('equivalent to cdxj:')
    print_row_as_cdxj(row3)

In [None]:
# Warning! this might take 1-10 minutes"

# Temporary downloading from github
!curl -O https://raw.githubusercontent.com/commoncrawl/whirlwind-python/main/CC-MAIN-2024-22.warc.paths.gz

crawl = 'CC-MAIN-2024-22'
run_duckdb_query('cloudfront', crawl)

On a machine with a 1 gigabit network connection and many cores, this should take about one minute total, and uses 8 cores.

The above code accesses the relevant part of the index for our crawl (`CC-MAIN-2024-22`) and then counts the number of records in that crawl (2709877975!). The code runs the SQL query we saw before which should match the single response record we want.

The program then writes that one record into a local Parquet file, does a second query that returns that one record, and shows the full contents of the record. We can see that the complete row contains many columns containing different information associated with our record. Finally, it converts the row to the CDXJ format we saw before.

### Bonus Task! Download a full crawl index and query with DuckDB

If you want to run many of these queries, and you have a lot of disk space, you'll want to download the 300 gigabyte index and query it repeatedly.

In [None]:
# Warning! 300 gigabyte download

# TODO: Skipping this for now, too large a download potentially for SageMaker
# run_duckdb_query('local_files', crawl)

(**Bonus bonus:** If you happen to be using the Common Crawl Foundation development server, we've already downloaded these files, so you can also try `run_duckdb_query('ccf_local_files', crawl)`.)

All of these scripts run the same SQL query and should return the same record (written as a parquet file).

### Bonus Task 2! Combine some steps

Use the DuckDb techniques from Task 8 and the Index Server to find a new webpage in the archives.
Note its url, warc, and timestamp.
Now open up the Makefile from Task 6 and look at the actions from the `cdx_toolkit` section.
Repeat the `cdx_toolkit` steps, but for the page and date range you found above.

### Congratulations!
You have completed the Whirlwind Tour of Common Crawl's Datasets using Python! You should now understand different filetypes we have in our corpus and how to interact with Common Crawl's datasets using Python. To see what other people have done with our data, see the Examples page on our website. Why not join our Discord through the Community tab?