## Step 1: Capture all relevant documents from a single Common Crawl release
* To collect data from *N* releases, run the notebook *N* times, modifying the release name in the cell below

In [1]:
import io
import os
import time

import pandas as pd
import requests
import warcio

# For parallel_apply() in pandas
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True)

INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


### What release are we interested in?
* Use the dropdown to see all available releases from 2008: https://commoncrawl.org/overview

In [2]:
release = 'CC-MAIN-2024-10'
#release = 'CC-MAIN-2023-50'
#release = 'CC-MAIN-2023-40'
#release = 'CC-MAIN-2023-23'
#release = 'CC-MAIN-2023-14'
#release = 'CC-MAIN-2023-06'

### Read index from the specified release

In [3]:
# Download index into a list
index = (
    pd.read_csv(
        f'https://data.commoncrawl.org/crawl-data/{release}/cc-index-table.paths.gz',
        header=None
    )
    .rename(columns={0: 'index_url'})
    .query(
        'index_url.str.contains("subset=warc")'
    )
    .index_url.tolist()
)

# Finalise urls - we'll download from from HTTPS not S3
index = [f'https://data.commoncrawl.org/{x}' for x in index]

### Download lists of URLs in chunks
* Use `duckdb` to query server-side for efficiency
* Read one index file at a time (not all at once) to minimise 503 errors
* Saves to `./urls` folder - make sure the folder exists

In [5]:
for path in index:

    part = path.split('part-')[1].split('-')[0]
    print(f'Processing part {part}...')
    
    duckdb_query = f"""
        load httpfs;
        load parquet;
        
        copy (
            select
                url, content_mime_type, content_mime_detected,
                warc_filename, warc_record_offset, warc_record_length
            from
                parquet_scan('{path}')
            where
                content_mime_type ilike '%gpx%'
                or content_mime_detected ilike '%gpx%'
                or url ilike '%.gpx'
        ) to './urls/{release}-{part}.csv' (delimiter ',', header true);
    """

    !duckdb -c "{duckdb_query}"

Processing part 00000...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00001...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00002...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00003...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00004...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00005...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00006...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00007...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00008...
100% ▕████████████████████████████████████████████████████████████▏ 
Processing part 00009...
100% ▕████████████████████████████████████████████████████████████▏ 


### Glue the chunks back together & save as CSV
* Saves the full index for a particular release in a CSV file to `./interim/` folder

In [18]:
pd.concat([
    pd.read_csv(f'./urls/{x}', index_col='url') for x in os.listdir('./urls/') if release in x
]).to_csv(f'./interim/urls.{release}.csv')

## 📁 Capture individual documents from WARC files

In [19]:
urls = pd.read_csv(f'./interim/urls.{release}.csv')

In [20]:
def get_document_as_string(warc_file, offset, length):
    """
    Retrieves a single document from the WARC file.
    """
    
    delay = 0.25 # don't overload the server
    time.sleep(delay)

    try:
        url = f'https://data.commoncrawl.org/{warc_file}'
        headers = {'Range': f'bytes={offset}-{offset+length-1}'}
        response = requests.get(url, headers=headers)
    
        counter = 20 # try 20 times
        while response.status_code != 206 and counter > 0:
            time.sleep(delay * 2)
            response = requests.get(url, headers=headers)
            counter -= 1
    
        with io.BytesIO(response.content) as stream:
            for record in warcio.ArchiveIterator(stream):
                output_string = record.content_stream().read()
    
        return output_string.decode('utf-8').strip()

    except:
        return None

In [21]:
%%time

urls['responses'] = urls.parallel_apply(
    lambda row: get_document_as_string(
        row.warc_filename,
        row.warc_record_offset,
        row.warc_record_length
    ),
    axis=1
)

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=2155), Label(value='0 / 2155'))), …

CPU times: user 11.3 s, sys: 4.76 s, total: 16.1 s
Wall time: 39min 41s


### Save dataframe as feather with file content as string
* Saves to `./interim/` folder

In [22]:
urls.to_feather(f'./interim/urls.{release}.feather')