# WARC Processing with Spark and Python

There are several [full featured](https://github.com/archivesunleashed/aut) [options](https://github.com/helgeho/ArchiveSpark) for processing [WARC](https://en.wikipedia.org/wiki/Web_ARChive) data with Scala in Spark. Unfortunately if you aren't familiar with Scala these can seem like a little bit of a black box, where you aren't quite sure what's going on behind the scenes. This can be particularly problematic when understanding what is going on behind the scenes is critical for your research.

Fortunately, if you happen to know Python there is [PySpark](https://spark.apache.org/docs/0.9.0/python-programming-guide.html) which allows you to process your WARC data with the clearly documented [warcio](https://pypi.org/project/warcio/) library. The JVM isn't able to optimize Python code the same way it can Scala. But these performance difference really depend on what you are doing. And at any rate, you may be willing to trade off a bit of performance for an increase in readability. This notebook is a brief guide for processing WARC data using PySpark and warcio. Somewhat unhelpfully this notebook kind of assumes you are passingly familiar with Python, Spark and WARC already.

*You may notice that some of the notes refer specifically to the [XSEDE platform](https://www.xsede.org/). You can comfortably ignore those bits.*

### XSEDE Setup

This is what you need to do to use PySpark in Jupyter on XSEDE.

In [2]:
import sys
sys.path.append("/opt/packages/spark/latest/python/lib/py4j-0.10.7-src.zip")
sys.path.append("/opt/packages/spark/latest/python/")
sys.path.append("/opt/packages/spark/latest/python/pyspark")
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
sqlc = SparkSession(sc)

## Input Data

Great, now lets create a list of all the WARC files we want to process. You can adjust the pattern to match where the data is on your filesystem. You may need to change this up depending on where your WARC files are stored.

In [3]:
import glob

warc_files = glob.glob('/pylon5/ec5fp4p/edsu/spn/*/*.warc.gz')[0:2]
print("found {} files".format(len(warc_files)))
print("e.g. {}".format(warc_files[0]))

found 2 files
e.g. /pylon5/ec5fp4p/edsu/spn/liveweb-20161025000152/live-20161024234426-wwwb-app13.us.archive.org.warc.gz


## Extractor Functions 

The needs of research are *many*, and it's not possible to anticipate all the possible ways you might want to analyze your WARC records. In fact, researchers are often concerned with asking *novel* questions, that have never been asked before. So they are unlikely to be interested in your code that has already answered it.

However in the case of web archive data, it should be possible to have a simple framework for writing your own functions that analyze WARC records and use Spark to apply them right? An *extractor* function is a simple Python function that takes a `warcio.ArcWarcRecord` as an argument and returns data for processing in Spark. We will take a look at a simple *extractor* function in a moment. But before we do, how will we apply this function to all of our WARC data?

To do do this we introduce the extractor *decorator*, which is a bit of syntactic sugar you use to annotate your extractor functions, to indicate that they are a function that will be iterating through a set of WARC records in a set of WARC files, and will apply your custom *extractor* function to each one. All the heavy lifting here is done by warcio, which is parsing the records.

In [4]:
import warcio

def extractor(f):
    def new_f(warc_files):
        for warc_file in warc_files:
            with open(warc_file, 'rb') as stream:
                for record in warcio.ArchiveIterator(stream):
                    yield from f(record)
    return new_f

So `extractor` is a function, that takes a function as an argument, that returns a new function. If this sounds complicated that's because, well, it is. But as complicated as it sounds [decorators](https://www.python.org/dev/peps/pep-0318/) are a standard part of Python for a reason--they cut down on code repetition, which can lead to bugs. Fortunately this hard part is over and we won't be writing any code like that again.

Instead we will be writing consise (or complicated if you want) *extractor functions* that look something like this one, which returns the URL belonging to *WARC Response* records in our WARC data (yes there are [other types](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#warc-type-mandatory) of WARC records.

In [5]:
@extractor
def url(record):
    if record.rec_type == 'response':
        yield (record.rec_headers.get_header('WARC-Target-URI'), )

This one is pretty simple, but there are a few things to note about *extractor functions*:

* You need to *deocrate* them with `@extractor` just before the function definition. Yep, this is the extractor function we just created.
* Your function needs to take one argument, which will be the WARC record.
* Your function needs to `yield` a `tuple` containing some piece or pieces of information from the WARC record that you would like to process with Spark.

The *extractor functions* need to return a tuple because the thing being returned may be more complicated than a single value. And the function needs to *yield* instead of simply *return* because there may be more than one of these items to to return from a given WARC Record. For example, consider a function that returns all the hyperlinks in a WARC Response. More on this in a bit.

In this case we'll start with a super simple function that is going to return the URL for a web resource that was was archived, which can be found in any *WARC Response* record. Remember there are  of WARC Records in a WARC File.


In [6]:
def process(f):
    def new_f(warc_files):
        for warc_file in warc_files:
            with open(warc_file, 'rb') as stream:
                for record in warcio.ArchiveIterator(stream):
                    yield from f(record)
    return new_f

## Analyze

And finally for the fun bit, we've all been waiting for--to run the analysis! We apply tell Spark to process all our WARC files:

In [7]:
warcs = sc.parallelize(warc_files[0:1])

At this point `warcs` is a Spark Resilient Distributed Dataset (RDD). We then apply our extractor function to each one and get the result as...yes, another RDD.

In [8]:
output = warcs.mapPartitions(url)

We can convert our RDD to a Spark Data Frame (giving it column headers to use), which for comfort we can even turn into a [Pandas](https://pandas.pydata.org/) dataframe. 

In [9]:
df = output.toDF(["url"])

Ahhh pandas....

In [None]:
df.url.value_counts()

In [None]:
sc.stop()

In [None]:
sqlc.stop()