In [1]:
from warcio.recordloader import ArcWarcRecord

In [2]:
from warcio.archiveiterator import ArchiveIterator
import os
import gzip
import shutil
from urllib.parse import urlparse
from urllib.request import urlretrieve
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master("spark://spark-master:7077") \
.config("spark.executor.memory", "2g") \
.config("spark.executor.instances", "6") \
.config("spark.executor.cores", "2") \
.config("spark.driver.memory", "4g") \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/27 13:41:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Downloads sample warc file and extracts it if not already available

warc_url = "https://data.commoncrawl.org/crawl-data/CC-MAIN-2017-13/segments/1490218186353.38/warc/CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz"
warcgz_filepath = "CC-MAIN-20170322212946-00000-ip-10-233-31-227.ec2.internal.warc.gz"
warc_filepath = ".".join(warcgz_filepath.split(".")[:-1])

if not os.path.isfile(warc_filepath):
    _, _ = urlretrieve(warc_url, warcgz_filepath)
    with gzip.open(warcgz_filepath, 'rb') as f_in:
        with open(warc_filepath, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

    os.remove(warcgz_filepath)

In [5]:
types = set()
ctypes = set()
with open(warc_filepath, 'rb') as stream:
    for i,record in enumerate(ArchiveIterator(stream)):
        types.add(record.rec_type)
        ctypes.add(record.content_type)

print(types, ctypes)

{'warcinfo', 'request', 'metadata', 'response'} {'application/warc-fields', 'application/http; msgtype=request', 'application/http; msgtype=response'}


In [6]:
temp_record = None
with open(warc_filepath, 'rb') as stream:
    for i,record in enumerate(ArchiveIterator(stream)):
        temp_record = record
        if record.rec_type == "response":
            print(record.rec_headers)
            break

WARC/1.0
WARC-Type: response
WARC-Date: 2017-03-22T22:16:45Z
WARC-Record-ID: <urn:uuid:1eba28d7-5c50-4520-a58b-b18bb9691201>
Content-Length: 36415
Content-Type: application/http; msgtype=response
WARC-Warcinfo-ID: <urn:uuid:c9737a57-b812-4c1c-b82c-66f820799890>
WARC-Concurrent-To: <urn:uuid:b0f277a6-5b6a-45dc-a17a-dbf2bd24f959>
WARC-IP-Address: 104.244.98.64
WARC-Target-URI: http://00ena00.blog.fc2.com/?tag=SL
WARC-Payload-Digest: sha1:W2ZCZ4N7UPYD3SIOVWQQVJ7RVIEJNQ6A
WARC-Block-Digest: sha1:OCC7ULZJRWXPVVMQNRLNOQ7KY5BH46HQ



In [7]:
def get_header(record: ArcWarcRecord, header: str):
    """Utility function to get header"""
    return record.rec_headers.get_header(header, "na")

ip = get_header(temp_record, "WARC-IP-Address")
url = get_header(temp_record, "WARC-Target-URI")

In [8]:
ip, url

('104.244.98.64', 'http://00ena00.blog.fc2.com/?tag=SL')

In [9]:
urlparse(url).hostname

'00ena00.blog.fc2.com'

In [10]:
def process_record(record):
    """Get ip and url headers"""
    ip = get_header(record, "WARC-IP-Address")
    url = get_header(record, "WARC-Target-URI")
    return ip, url

def process_warc(filepath):
    """Yield tuple containing ip, url if record is of response type"""
    with open(filepath, 'rb') as stream:
        for record in ArchiveIterator(stream):
            if record.rec_type == "response":
                res = process_record(record)
                yield res

def proc_wrapper(_id, iterator):
    """Wrapper function for `process_warp` to handle multiple `warc` files"""
    for filepath in iterator:
        for res in process_warc(filepath):
            yield res

In [11]:
output_schema = StructType([
    StructField("ip", StringType(), True),
    StructField("host", StringType(), True)
])

Add all `warc` files to be processed to a file named `paths.txt`.

In [12]:
inp_data = spark.sparkContext.textFile("paths.txt")
output = inp_data.mapPartitionsWithIndex(proc_wrapper)
df = spark.createDataFrame(output, schema=output_schema)

In [13]:
df.show()

                                                                                

+---------------+--------------------+
|             ip|                host|
+---------------+--------------------+
|  104.244.98.64|http://00ena00.bl...|
|  104.244.98.63|http://00pon00.bl...|
|  104.244.98.65|http://00pon00.bl...|
|136.243.111.229|http://03online.c...|
|136.243.111.229|http://03online.c...|
|136.243.111.229|http://03online.c...|
|136.243.111.229|http://03online.c...|
|136.243.111.229|http://03online.c...|
|  213.155.18.48|http://08.od.ua/n...|
|  213.155.18.48|http://08.od.ua/o...|
|  213.155.18.48|http://08.od.ua/p...|
|  213.155.18.48|http://08.od.ua/s...|
|  104.244.98.62|http://09pilgrim....|
|  172.217.7.193|http://0baby.blog...|
| 138.201.16.125|http://0lik.ru/cl...|
| 138.201.16.125|http://0lik.ru/cl...|
| 138.201.16.125|http://0lik.ru/te...|
| 138.201.16.125|http://0lik.ru/te...|
|  185.103.37.43|http://1000seeds....|
|   185.99.11.75|http://1001.ru/ar...|
+---------------+--------------------+
only showing top 20 rows



In [14]:
df.count()

                                                                                

46288

In [15]:
df.write.parquet("/opt/workspace/warc_data/ip_host.parquet")

                                                                                

In [16]:
! du -h /opt/workspace/warc_data/ip_host.parquet/

2.5M	/opt/workspace/warc_data/ip_host.parquet/
