In [None]:
# This notebook can be used to query the Common Crawl data set.

# http://netpreserve.org/ga2019/wp-content/uploads/2019/07/IIPCWAC2019-SEBASTIAN_NAGEL-Accessing_WARC_files_via_SQL-poster.pdf
# To-do: Codify the creation of a S3 bucket to save the Athena results

import json
import boto3
import os

In [None]:
# Prior to utilizing this notebook, the following three queries should be run within AWS Athena to configure the Common Crawl database.
# Do not run this cell in the notebook as it will fail.

# Query 1
CREATE DATABASE ccindex

# Query 2
CREATE EXTERNAL TABLE IF NOT EXISTS ccindex (
  url_surtkey                   STRING,
  url                           STRING,
  url_host_name                 STRING,
  url_host_tld                  STRING,
  url_host_2nd_last_part        STRING,
  url_host_3rd_last_part        STRING,
  url_host_4th_last_part        STRING,
  url_host_5th_last_part        STRING,
  url_host_registry_suffix      STRING,
  url_host_registered_domain    STRING,
  url_host_private_suffix       STRING,
  url_host_private_domain       STRING,
  url_protocol                  STRING,
  url_port                      INT,
  url_path                      STRING,
  url_query                     STRING,
  fetch_time                    TIMESTAMP,
  fetch_status                  SMALLINT,
  content_digest                STRING,
  content_mime_type             STRING,
  content_mime_detected         STRING,
  content_charset               STRING,
  content_languages             STRING,
  warc_filename                 STRING,
  warc_record_offset            INT,
  warc_record_length            INT,
  warc_segment                  STRING)
PARTITIONED BY (
  crawl                         STRING,
  subset                        STRING)
STORED AS parquet
LOCATION 's3://commoncrawl/cc-index/table/cc-main/warc/';

# Query 3
MSCK REPAIR TABLE ccindex

In [None]:
# The core functions notebook contains generalized functions that apply across use cases
%run ./corefunctions.ipynb

# Make sure to update these values
DOMAIN_TO_QUERY = 'derbycon.com' # This should look like 'domain.com'. The wildcard will be added automatically later.
ATHENA_BUCKET = 's3://brevity-athena' # This will need to be customized and specific to your own account (i.e. s3://customname-athena').
ATHENA_DB = 'ccindex' # This should align with the database and not need changed if it was created using the previous queries.
ATHENA_TABLE = 'ccindex' # This should align with the table and not need changed if it was created using the previous queries.

# Do not modify this query unless the intent is to customize
query = "SELECT url, warc_filename, warc_record_offset, warc_record_length FROM %s WHERE subset = 'warc' AND url_host_registered_domain = '%s';" % (ATHENA_TABLE, DOMAIN_TO_QUERY)

execid = queryathena(ATHENA_DB, ATHENA_BUCKET, query)
print(execid)

In [None]:
import json, boto3, time, requests
import pandas as pd
import io

# Load an external notebook with normalized functions
%run ./corefunctions.ipynb
    
# Utilize executionID to retrieve results
downloadURL = retrieveresults(execid)

# Load output into dataframe - re-run the following section of code if the initial process fails.
s=requests.get(downloadURL).content
dfhosts=pd.read_csv(io.StringIO(s.decode('utf-8')))
dfhosts

In [None]:
import warcio
from warcio.archiveiterator import ArchiveIterator
import os
import io
from io import BytesIO

# Thank you to Sebastian Nagel for your instructions and code to perform the following step.
# http://netpreserve.org/ga2019/wp-content/uploads/2019/07/IIPCWAC2019-SEBASTIAN_NAGEL-Accessing_WARC_files_via_SQL-poster.pdf

#Fetch all WARC records defined by filenames and offsets in rows, parse the records and the contained HTML, split the text into words and emit pairs <word, 1>
def fetch_process_warc_records(dfhosts):
    s3client = boto3.client('s3')
    skippedrecords = 0
    processedrecords = 0
    for index, row in dfhosts.iterrows():
        url = row['url']
        warc_path = row['warc_filename']
        offset = int(row['warc_record_offset'])
        length = int(row['warc_record_length'])
        rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
        response = s3client.get_object(Bucket='commoncrawl',Key=warc_path,Range=rangereq)
        record_stream = BytesIO(response["Body"].read())
        for record in ArchiveIterator(record_stream):
            try:
                page = record.content_stream().read()
                page = page.decode("utf-8") 
                url = url.replace("https://","")
                url = url.replace("http://","")
                url = url + str(offset) + '.html'
                filepath = os.getcwd() + '/tmp/' + url
                os.makedirs(os.path.dirname(filepath), exist_ok=True)
                with open(filepath, "w") as text_file:
                    print(page, file=text_file)
                    processedrecords = processedrecords + 1
            except:
                # Proceed without printing the record. Occasionally there are decoding errors.
                skippedrecords = skippedrecords + 1

# Nothing is actually returned from this command as it writes the files to the filesystem.
#Todo: Add some type of return metric.
dfhostsupdated = fetch_process_warc_records(dfhosts)

In [None]:
# Zip the file for download out of Jupyter
filepath = os.getcwd() + '/tmp/'
! tar -zcvf domainoutput.tar.gz $filepath
# This will create a file named domainoutput.tar.gz with the full html files in the structure of the website. It can be downloaded from the same directory running the notebook.

In [None]:
# Additional example queries to run with this configuration.
# These can be run directly within the Athena query window in the AWS console or can be integrated into this notebook instead of using the pre-defined query.

# Search the entire common crawl data set for specific URL parameters.
SELECT url,
       warc_filename,
       warc_record_offset,
       warc_record_length,
       url_query
FROM "ccindex"."ccindex"
WHERE subset = 'warc'
  AND url_query like 'cmd='

In [None]:
# Count the number of distinct websites for a specific domain

SELECT DISTINCT COUNT(url) as URLCount
FROM "ccindex"."ccindex"
WHERE  subset = 'warc'
  AND url_host_registered_domain = 'domain.com'