Skip to content

Commit

Permalink
Add README and more examples: HTML tag count, word count
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-nagel committed May 5, 2017
1 parent 22abd7f commit e930b50
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 1 deletion.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright (c) 2017 Common Crawl

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
103 changes: 103 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
![Common Crawl Logo](http://commoncrawl.org/wp-content/uploads/2016/12/logocommoncrawl.png)

# Common Crawl PySpark Examples

This project provides examples how to process the Common Crawl dataset with [Apache Spark](http://spark.apache.org/) and Python:

+ count HTML tags in Common Crawl's raw response data (WARC files)
+ count web server names in Common Crawl's metadata (WAT files) and/or WARC files
+ word count (term and document frequency) in Common Crawl's extracted text (WET files)
+ extract links from WAT files and construct (host-level) web graph


## Setup

To develop and test locally, you will need to install
* Spark, see the [detailed instructions](http://spark.apache.org/docs/latest/), and
* all required Python modules by running
```
pip install -r requirements.txt
```

## Compatibility and Requirements

Tested with Spark 2.1.0 in combination with Python 2.7 and/or 3.5.


## Get Sample Data

To develop locally, you'll need at least three data files -- one for each format the crawl uses. They can be fetched from the following links:
* [warc.gz](https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218186353.38/warc/CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz)
* [wat.gz](https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218186353.38/wat/CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.wat.gz)
* [wet.gz](https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2017-13/segments/1490218186353.38/wet/CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.wet.gz)

Alternatively, running `get-data.sh` will download the sample data. It also writes input files containing
* sample input as `file://` URLs
* all input of one monthly crawl as `s3://` URLs


### Running locally

First, point the environment variable `SPARK_HOME` to your Spark installation.
Then submit a job via

```
$SPARK_HOME/bin/spark-submit ./server_count.py \
--num_output_partitions 1 --log_level WARN \
./input/test_warc.txt servernames
```

This will count web server names sent in HTTP response headers for the sample WARC input and store the resulting counts in the SparkSQL table "servernames" in your ... (usually in `./spark-warehouse/servernames`). The

The output table can be accessed via SparkSQL, e.g.,

```
$SPARK_HOME/spark/bin/pyspark
>>> df = sqlContext.read.parquet("spark-warehouse/servernames")
>>> for row in df.sort(df.val.desc()).take(10): print(row)
...
Row(key=u'Apache', val=9396)
Row(key=u'nginx', val=4339)
Row(key=u'Microsoft-IIS/7.5', val=3635)
Row(key=u'(no server in HTTP header)', val=3188)
Row(key=u'cloudflare-nginx', val=2743)
Row(key=u'Microsoft-IIS/8.5', val=1459)
Row(key=u'Microsoft-IIS/6.0', val=1324)
Row(key=u'GSE', val=886)
Row(key=u'Apache/2.2.15 (CentOS)', val=827)
Row(key=u'Apache-Coyote/1.1', val=790)
```

See also
* [running the Spark shell and submitting Spark jobs](http://spark.apache.org/docs/latest/#running-the-examples-and-shell)
* [PySpark SQL API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)


### Running in Spark cluster over large amounts of data

As the Common Crawl dataset lives in the Amazon Public Datasets program, you can access and process it on Amazon AWS without incurring any transfer costs. The only cost that you incur is the cost of the machines running your Spark cluster.

1. spinning up the Spark cluster: [AWS EMR](https://aws.amazon.com/documentation/emr/) contains a ready-to-use Spark installation but you'll find multiple descriptions on the web how to deploy Spark on a cheap cluster of AWS spot instances. See also [launching Spark on a cluster](http://spark.apache.org/docs/latest/#launching-on-a-cluster).

2. choose appropriate cluster-specific settings when [submitting jobs](http://spark.apache.org/docs/latest/submitting-applications.html) and also check for relevant command-line options (e.g., `--num_input_partitions` or `--num_output_partitions`) by running

./spark/bin/spark-submit ./server_count.py --help

3. don't forget to deploy all dependencies in the cluster, see [advanced dependency management](http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management)


## Credits

The examples are ported from Stephen Merity's [cc-mrjob](../cc-mrjob/) with a couple of upgrades:
* based on Apache Spark (instead of [mrjob](https://pythonhosted.org/mrjob/))
* [boto3](http://boto3.readthedocs.io/) supporting multi-part download of data from S3
* [warcio](https://github.com/webrecorder/warcio) a Python 2 and Python 3 compatible module to access WARC files

Further inspirations are taken from
* [cosr-back] written by Sylvain Zimmer for [Commonsearch](). You definitely should have a look at it if you need more to process the WARC content (a HTML parser for example).
* Mark Litwintschik's blog post [Analysing Petabytes of Websites](http://tech.marksblogg.com/petabytes-of-website-data-spark-emr.html)


## License

MIT License, as per [LICENSE](./blob/master/LICENSE)
32 changes: 32 additions & 0 deletions html_tag_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import re

from collections import Counter

from sparkcc import CCSparkJob


class TagCountJob(CCSparkJob):
""" Count HTML tag names in Common Crawl WARC files"""

name = "TagCount"

# match HTML tags (element names) on binary HTML data
html_tag_pattern = re.compile(b'<([a-z0-9]+)')

def process_record(self, record):
if record.rec_type != 'response':
# WARC request or metadata records
return
content_type = record.http_headers.get_header('content-type', None)
if content_type is None or 'html' not in content_type:
# skip non-HTML or unknown content types
return
data = record.content_stream().read()
counts = Counter(TagCountJob.html_tag_pattern.findall(data))
for tag, count in counts.items():
yield tag.decode('ascii').lower(), count


if __name__ == '__main__':
job = TagCountJob()
job.run()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
botocore
boto3
ujson
warcio
Expand Down
2 changes: 1 addition & 1 deletion server_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ServerCountJob(CCSparkJob):
def process_record(self, record):
server_name = None

if self.is_wat_json_record:
if self.is_wat_json_record(record):
# WAT (response) record
record = json.loads(record.content_stream().read())
try:
Expand Down
44 changes: 44 additions & 0 deletions word_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import re

from collections import Counter

from pyspark.sql.types import StructType, StructField, StringType, LongType

from sparkcc import CCSparkJob


class WordCountJob(CCSparkJob):
""" Word count (frequency list) from texts in Common Crawl WET files"""

name = "WordCount"

# output is <word, <term_frequency, document_frequency>>
output_schema = StructType([
StructField("key", StringType(), True),
StructField("val", StructType([
StructField("tf", LongType(), True),
StructField("df", LongType(), True)]), True)
])

# simple Unicode-aware tokenization
# (not suitable for CJK languages)
word_pattern = re.compile('\w+', re.UNICODE)

@staticmethod
def reduce_by_key_func(a, b):
# sum values of tuple <term_frequency, document_frequency>
return ((a[0] + b[0]), (a[1] + b[1]))

def process_record(self, record):
if not self.is_wet_text_record(record):
return
data = record.content_stream().read().decode('utf-8')
words = map(lambda w: w.lower(),
WordCountJob.word_pattern.findall(data))
for word, count in Counter(words).items():
yield word, (count, 1)


if __name__ == '__main__':
job = WordCountJob()
job.run()

0 comments on commit e930b50

Please sign in to comment.