In [1]:
from operator import add
import re
from collections import OrderedDict
from operator import itemgetter 
import itertools
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.19:7077") \
        .appName("common_crawl")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
#        .config('spark.executor.cores', 2)\


# Old API (RDD)
spark_context = spark_session.sparkContext

# (*/*) - out of memory›
# ~6.4mins for 39496 files. (...00000/)  (takes 1 minute with 40 partitions)
# ~5 secs for 10 files (...00000/0*) 
# ~20 secs for 11110 files (...00000/1*) 


rdd = spark_context.newAPIHadoopFile(
    'hdfs://192.168.1.19:9000/crawl-wet/CC-MAIN-20190318132220-20190318153611-00020.warc.wet',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'WARC/1.0'}
)\
.cache() # Keep this RDD in memory!

rdd.count()
# Only one job (previous .cache() did not trigger a job)

45412

In [3]:
rdd.take(3)
# [(line_number, partition)]

[(0, ''),
 (8,
  '\r\nWARC-Type: warcinfo\r\nWARC-Date: 2019-03-27T04:47:45Z\r\nWARC-Filename: CC-MAIN-20190318132220-20190318153611-00020.warc.wet.gz\r\nWARC-Record-ID: <urn:uuid:14cd40bb-fff3-45dc-95ba-ed9ff0fb9d83>\r\nContent-Type: application/warc-fields\r\nContent-Length: 370\r\n\r\nSoftware-Info: ia-web-commons.1.1.9-SNAPSHOT-20190314010907\r\nExtracted-Date: Wed, 27 Mar 2019 04:47:45 GMT\r\nrobots: checked via crawler-commons 0.11-SNAPSHOT (https://github.com/crawler-commons/crawler-commons)\r\nisPartOf: CC-MAIN-2019-13\r\noperator: Common Crawl Admin (info@commoncrawl.org)\r\ndescription: Wide crawl of the web for March 2019\r\npublisher: Common Crawl\r\n\r\n\r\n\r\n'),
 (645,
  '\r\nWARC-Type: conversion\r\nWARC-Target-URI: http://0-100.hotnews.ro/2013/11/18/noul-mercedes-benz-clasa-c-deconspirat-inainte-de-lansarea-din-2014/\r\nWARC-Date: 2019-03-18T14:25:39Z\r\nWARC-Record-ID: <urn:uuid:aee4e90b-93a6-4100-9186-f7b47caa1c58>\r\nWARC-Refers-To: <urn:uuid:591fe7ff-9e97-4ff4-a87

In [4]:
rdd.getNumPartitions()

4

In [4]:
print(spark_context.uiWebUrl)

http://ben-uppmax-haste-spark-master:4042


In [8]:
## Example #1 - Filter by TLD and compute most common words ##

# Try .ac.uk, .ru, .se, .com
p = re.compile('WARC-Target-URI: \S+\.ac.uk', re.IGNORECASE)

rdd1 = rdd\
.filter(lambda doc: bool(p.search(doc[1])))\
.map(lambda web_text: web_text[1].partition('\r\n\r\n')[2])\
.flatMap(lambda t: t.split(' '))\
.flatMap(lambda w: w.split('\n'))\

rdd2 = rdd1.map(lambda w: w.strip())\
.map(lambda w: (w,1))\
.reduceByKey(add)\
.takeOrdered(40, key=lambda x: -x[1])

rdd2.cache()

print(result)

[('(1)', 1190), ('and', 844), ('the', 757), ('of', 713), ('to', 569), ('TP', 391), ('in', 332), ('a', 312), ('(2)', 260), ('for', 252), ('&', 240), ('Research', 164), ('Test', 162), ('Pitting', 162), ('University', 159), ('1', 158), ('is', 154), ('', 147), ('Overview', 138), ('at', 136), ('with', 130), ('The', 129), ('-', 122), ('(0)', 120), ('results', 114), ('you', 112), ('are', 111), ('this', 111), ('(3)', 107), ('on', 107), ('Cambridge', 102), ('be', 100), ('that', 99), ('or', 99), ('by', 94), ('our', 91), ('as', 90), ('|', 81), ('(4)', 80), ('from', 79)]


In [6]:
## Example #2 - Group by TLD and compute most common words for each ##

ex = "WARC-Type: conversion\
WARC-Target-URI: http://news.bbc.co.uk/2/hi/africa/3414345.stm\
WARC-Date: 2014-08-02T09:52:13Z"

p = re.compile('WARC-Target-URI: \S+\.([a-zA-Z]{2,3})/', re.IGNORECASE)
# print(p.search(ex).group(1))
# uk

def get_tld(content):
    match = p.search(content)
    if match is not None:
        return match.group(1)
    else:
        return None

# discard the line number
# partition() -- python function -- split on the first occurance, returns (before,split,after)
# filter out those with no TLD

    
words_by_tld_rdd = rdd\
.map(lambda filename_content: filename_content[1])\
.map(lambda content: (get_tld(content), content.partition('\r\n\r\n')[2]))\
.filter(lambda tld_content: tld_content[0] is not None)\
.flatMapValues(lambda words: words.split(' '))\
.flatMapValues(lambda words: words.split('\n'))\
.mapValues(lambda word: word.strip())
#.take(10)

# print(words_by_tld_rdd.take(10))

tlds = words_by_tld_rdd.countByKey()
#print(tlds)

tlds = OrderedDict(sorted(tlds.items(), key = itemgetter(1), reverse = True))
# print(tlds)  

top_tlds = dict(itertools.islice(tlds.items(), 10))

# print(top_tlds)

print("Results:")

for tld in top_tlds:
    print(tld)
    top_words_for_tld = words_by_tld_rdd\
        .filter(lambda tld_word: tld_word[0] == tld)\
        .values()\
        .map(lambda w: (w,1))\
        .reduceByKey(add)\
        .takeOrdered(20, key=lambda x: -x[1])
    print(top_words_for_tld)

Results:
com
[('', 313751), ('the', 240565), ('to', 192484), ('and', 188530), ('of', 155312), ('a', 148880), ('-', 136600), ('in', 117613), ('de', 117381), ('for', 79645), ('|', 74581), ('&', 71469), ('is', 68145), ('on', 60071), ('with', 53353), ('The', 51921), ('you', 49771), ('►', 49083), ('(1)', 44558), ('by', 43888)]
ru
[('и', 52076), ('в', 41444), ('на', 24573), ('для', 21241), ('-', 19033), ('с', 18073), ('не', 12507), ('по', 11091), ('', 10914), ('|', 10475), ('В', 7738), ('от', 7037), ('из', 6453), ('—', 6354), ('что', 6198), ('к', 5933), ('1', 5454), ('–', 4968), ('о', 4919), ('за', 4625)]
org
[('', 51714), ('the', 32854), ('and', 23189), ('of', 22675), ('to', 22290), ('a', 16591), ('de', 16393), ('in', 15604), ('-', 11546), ('for', 9734), ('is', 8670), ('|', 7779), ('The', 7509), ('on', 6893), ('that', 6281), ('la', 6092), ('with', 5585), ('by', 5549), ('&', 5169), ('you', 5095)]
net
[('', 18788), ('the', 13865), ('-', 13397), ('to', 11934), ('a', 10647), ('de', 10020), ('an

In [7]:
#file_content = rdd.take(1)[0][1]
#print(file_content.partition('\r\n\r\n')[2])
from operator import add
import re

ex = "WARC-Type: conversion\
WARC-Target-URI: http://news.bbc.co.uk/2/hi/africa/3414345.stm\
WARC-Date: 2014-08-02T09:52:13Z"

p = re.compile('WARC-Target-URI: \S+\.(([a-zA-Z]{2,3}}\.)?[a-zA-Z]{2,3}})/', re.IGNORECASE)

print(p.search(ex))

#print(bool(p.search('\nWARC-Target-URI:\n')))

#rdd\
#.filter(lambda doc: bool(p.search(doc[1])))\
#.map(lambda filename_content: filename_content[1].partition('\r\n\r\n')[2])\
#.flatMap(lambda t: t.split(' '))\
#.flatMap(lambda w: w.split('\n'))\
#.map(lambda w: w.strip())\
#.map(lambda w: (w,1))\
#.reduceByKey(add)\
#.takeOrdered(100, key=lambda x: -x[1])
#.take(100)
#.take(10)
#.flatMap(lambda text: text.split(' ')).take(100)

None


In [8]:
spark_session.stop()