# Common Crawl AWS EMR Notebook for processing WARC/WET/WAT/ARC files

This notebook will guide you how to:
- Configure your EMR Pyspark Kernel
- Generate parquets from WARC/WET/WAT/ARC files without spark-submit

Requirements:
- [AWS EMR Cluster](./cluster_setup.md)
- S3 Bucket

First we start off with configuring our PySpark kernel. We will set our driver memory and executor memory to 1 gigabyte, and we will set the spark profiler to set in the spark configuration. More details with respect to [configuring](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) and using Spark can be found on the [AWS EMR Release guide](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-launch.html).

In [5]:
%%configure -f
{"driverMemory": "1G", "executorMemory" : "1G", "conf": {"spark.python.profile": "true"} }

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1606097548163_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1606097548163_0005,pyspark,idle,Link,Link,✔


We will simply verify the current sessions configs here to make sure that our driver and executor memory is what it should be, as well as the profiler option.

In [6]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1606097548163_0005,pyspark,idle,Link,Link,✔


We will now store a list of links to the WARC/WET/WAT/ARC files that we want to process. For now we will start with two ARC files. If you are unfamiliar with the format or usage of these links, please be sure to check out the [getting started](https://commoncrawl.org/the-data/get-started/), specifically the section on data location. It should be noted that we are able to load up an arbitrary amount of data locations, and they do not neccessarily have to be ARC files.

In [7]:
file_list = ["s3://commoncrawl/crawl-001/2008/07/22/3/1216753395382_3.arc.gz",
"s3://commoncrawl/crawl-002/2009/09/17/12/1253234344648_12.arc.gz"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

If you are familiar with [cc-pyspark](https://github.com/commoncrawl/cc-pyspark) you may find this familiar. We will define a class "Args" here which has some other configurations with respect to how we want the resultant dataframe to look like, and where it will be located. These fields are similar to those that are found in cc-pyspark, except in this case we must do everything programatically (that means no using spark-submit and sending the options by command line!). 

The important class members that we should note is "output". This will be the location to the S3 bucket that you want to write your resultant dataframe to. In this example we will be writing to the `s3://emr-arc-notebook/test_arc_output` bucket.

You may also change the input and output partitions. The input partitions are used to partition up the list of data locations (so, partitioning our `file_list`). The output partitions determine how many partitions the resulting dataframe will be in. You can read more about partitions and effective ways to partition data [here](https://www.dezyre.com/article/how-data-partitioning-in-spark-helps-achieve-more-parallelism/297).

In [8]:
class Args:
    warc_parse_http_header = True
    records_processed = None
    warc_input_processed = None
    warc_input_failed = None
    num_input_partitions = 10
    num_output_partitions = 2
    output = "s3://emr-arc-notebook/test_arc_output"
    output_format = "parquet"
    output_compression = "gzip"
    output_option = []
    local_temp_dir = None
    spark_profiler = False

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This is similar to the CCSparkJob found in the `sparkcc.py` file of [cc-pyspark](https://github.com/commoncrawl/cc-pyspark/blob/master/sparkcc.py). First we retrieve the SparkContext that we have (from the PySpark kernel provided by AWS EMR, which we configured earlier). Then, we run our job by partitioning the input files. For each input file, we run the `ArchiveIterator` object on it, provided by the `warcio` library which allows us to process WARC/WAT/WET/ARC files. We run this on our `ServerCountJob` class, although we could use any class that inherits from the `CCSparkJob` class found [here](https://github.com/commoncrawl/cc-pyspark) (for example, `word_count.py`, `wat_extract_links.py`, etc).

In [9]:
import argparse
import logging
import os
import re

from io import BytesIO
from tempfile import TemporaryFile

import boto3
import botocore

from warcio.archiveiterator import ArchiveIterator
from warcio.recordloader import ArchiveLoadFailed

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
import tempfile

LOGGING_FORMAT = '%(asctime)s %(levelname)s %(name)s: %(message)s'

    
class JupyterCCSparkJob(object):
    """
    A simple Spark job definition to process Common Crawl data
    """

    name = 'CCSparkJob'

    output_schema = StructType([
        StructField("key", StringType(), True),
        StructField("val", LongType(), True)
    ])

    # description of input and output shown in --help
    input_descr = "Path to file listing input paths"
    output_descr = "Name of output table (saved in spark.sql.warehouse.dir)"

    warc_parse_http_header = True

    args = None
    records_processed = None
    warc_input_processed = None
    warc_input_failed = None
    log_level = 'INFO'
    logging.basicConfig(level=log_level, format=LOGGING_FORMAT)


    def parse_arguments(self):
        """ Returns the parsed arguments from the command line """

        description = self.name
        if self.__doc__ is not None:
            description += " - "
            description += self.__doc__
        args = Args()
        return args

    def add_arguments(self, parser):
        pass

    def validate_arguments(self, args):
        if "orc" == args.output_format and "gzip" == args.output_compression:
            # gzip for Parquet, zlib for ORC
            args.output_compression = "zlib"
        return True

    def get_output_options(self):
        return {x[0]: x[1] for x in map(lambda x: x.split('=', 1),
                                        self.args.output_option)}

    def init_logging(self, level=None):
        if level is None:
            level = self.log_level
        else:
            self.log_level = level
        logging.basicConfig(level=level, format=LOGGING_FORMAT)

    def init_accumulators(self, sc):
        self.records_processed = sc.accumulator(0)
        self.warc_input_processed = sc.accumulator(0)
        self.warc_input_failed = sc.accumulator(0)

    def get_logger(self, spark_context=None):
        """Get logger from SparkContext or (if None) from logging module"""
        if spark_context is None:
            return logging.getLogger(self.name)
        return spark_context._jvm.org.apache.log4j.LogManager \
            .getLogger(self.name)

    def run(self):
        self.args = self.parse_arguments()
        
        conf = SparkConf()
        
        if self.args.spark_profiler:
            conf = conf.set("spark.python.profile", "true")
        
        sc = SparkContext.getOrCreate(conf=conf)
        
        
        sqlc = SQLContext(sparkContext=sc)

        self.init_accumulators(sc)

        self.run_job(sc, sqlc)

        if self.args.spark_profiler:
            sc.show_profiles()

        sc.stop()

    def log_aggregator(self, sc, agg, descr):
        self.get_logger(sc).info(descr.format(agg.value))

    def log_aggregators(self, sc):
        self.log_aggregator(sc, self.warc_input_processed,
                            'WARC/WAT/WET input files processed = {}')
        self.log_aggregator(sc, self.warc_input_failed,
                            'WARC/WAT/WET input files failed = {}')
        self.log_aggregator(sc, self.records_processed,
                            'WARC/WAT/WET records processed = {}')

    @staticmethod
    def reduce_by_key_func(a, b):
        return a + b

    def run_job(self, sc, sqlc):
        
        input_data = sc.parallelize(file_list)
           
        output = input_data.mapPartitionsWithIndex(self.process_warcs)
        sqlc.createDataFrame(output, schema=self.output_schema) \
            .coalesce(self.args.num_output_partitions) \
            .write \
            .format(self.args.output_format) \
            .option("compression", self.args.output_compression) \
            .options(**self.get_output_options()) \
            .parquet(self.args.output)
    
        self.log_aggregators(sc)
    
    def process_warcs(self, id_, iterator):
        s3pattern = re.compile('^s3://([^/]+)/(.+)')
        base_dir = "/user/"

        # S3 client (not thread-safe, initialize outside parallelized loop)
        s3client = boto3.client('s3')
       
        for uri in iterator:
            self.warc_input_processed.add(1)
            if uri.startswith('s3://'):
                self.get_logger().info('Reading from S3 {}'.format(uri))
                s3match = s3pattern.match(uri)
                if s3match is None:
                    self.get_logger().error("Invalid S3 URI: " + uri)
                    continue
                bucketname = s3match.group(1)
                path = s3match.group(2)
                warctemp = TemporaryFile(mode='w+b',
                                         dir=self.args.local_temp_dir)
                try:
                    s3client.download_fileobj(bucketname, path, warctemp)
                except botocore.client.ClientError as exception:
                    self.get_logger().error(
                        'Failed to download {}: {}'.format(uri, exception))
                    self.warc_input_failed.add(1)
                    warctemp.close()
                    continue
                warctemp.seek(0)
                stream = warctemp
            elif uri.startswith('hdfs://'):
                self.get_logger().error("HDFS input not implemented: " + uri)
                continue
            else:
                self.get_logger().info('Reading local stream {}'.format(uri))
                if uri.startswith('file:'):
                    uri = uri[5:]
                uri = os.path.join(base_dir, uri)
                try:
                    stream = open(uri, 'rb')
                except IOError as exception:
                    self.get_logger().error(
                        'Failed to open {}: {}'.format(uri, exception))
                    self.warc_input_failed.add(1)
                    continue

            no_parse = (not self.warc_parse_http_header)
           
            try:
                archive_iterator = ArchiveIterator(stream,
                                                   no_record_parse=no_parse, arc2warc = True)
                
                for res in self.iterate_records(uri, archive_iterator):

                    yield res
            except ArchiveLoadFailed as exception:
                self.warc_input_failed.add(1)
                self.get_logger().error(
                    'Invalid WARC: {} - {}'.format(uri, exception))
            finally:
                stream.close()

    def process_record(self, record):
        raise NotImplementedError('Processing record needs to be customized')

    def iterate_records(self, _warc_uri, archive_iterator):
        """Iterate over all WARC records. This method can be customized
           and allows to access also values from ArchiveIterator, namely
           WARC record offset and length."""
    
        for record in archive_iterator:
            for res in self.process_record(record):
              
            
                yield res
         

            self.records_processed.add(1)
            # WARC record offset and length should be read after the record
            # has been processed, otherwise the record content is consumed
            # while offset and length are determined:
            #  warc_record_offset = archive_iterator.get_record_offset()
            #  warc_record_length = archive_iterator.get_record_length()

    @staticmethod
    def is_wet_text_record(record):
        """Return true if WARC record is a WET text/plain record"""
        return (record.rec_type == 'conversion' and
                record.content_type == 'text/plain')

    @staticmethod
    def is_wat_json_record(record):
        """Return true if WARC record is a WAT record"""
        return (record.rec_type == 'metadata' and
                record.content_type == 'application/json')

    @staticmethod
    def is_html(record):
        """Return true if (detected) MIME type of a record is HTML"""
        html_types = ['text/html', 'application/xhtml+xml']
        if (('WARC-Identified-Payload-Type' in record.rec_headers) and
            (record.rec_headers['WARC-Identified-Payload-Type'] in
             html_types)):
            return True
        for html_type in html_types:
            if html_type in record.content_type:
                return True
        return False


class ServerCountJob(JupyterCCSparkJob):
    """ Count server names sent in HTTP response header
        (WARC and WAT is allowed as input)"""

    name = "CountServers"
    fallback_server_name = '(no server in HTTP header)'

    def process_record(self, record):
        server_name = None

        if self.is_wat_json_record(record):
            # WAT (response) record
            record = json.loads(record.content_stream().read())
            try:
                payload = record['Envelope']['Payload-Metadata']
                if 'HTTP-Response-Metadata' in payload:
                    server_name = payload['HTTP-Response-Metadata'] \
                                         ['Headers'] \
                                         ['Server'] \
                                         .strip()
                else:
                    # WAT request or metadata records
                    return
            except KeyError:
                pass
        elif record.rec_type == 'response':
            # WARC response record
            server_name = record.http_headers.get_header('server', None)
        else:
            # warcinfo, request, non-WAT metadata records
            return

        if server_name and server_name != '':
            yield server_name, 1
        else:
            yield ServerCountJob.fallback_server_name, 1

job = ServerCountJob()
job.run()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now that we've created some parquet files, let's learn how to process them in the [dataframe analysis notebook](dataframe_analysis.ipynb)!