In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("maxmind-warc") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", 1) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
    .config("spark.dynamicAllocation.initialExecutors", 2) \
    .config("spark.dynamicAllocation.minExecutors", 0) \
    .config("spark.dynamicAllocation.maxExecutors", 10) \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/20 14:13:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pathlib import Path

from warcio import ArchiveIterator
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import udf, col

In [3]:
cc_dir = Path("/opt/workspace/datasets/common-crawl/")
!ls $cc_dir

CC-MAIN-20240724014956-20240724044956-00798.warc     get_files.sh
CC-MAIN-20240725114544-20240725144544-00476.warc     warc.paths
CC-MAIN-20240725114544-20240725144544-00476_extract


In [4]:
warc_files = []
with open("paths.txt") as f:
    for i, line in enumerate(f):
        warc_files.append(Path(line.strip()))
        print(f"file {i+1}: {line}")

file 1: /opt/workspace/datasets/common-crawl/CC-MAIN-20240724014956-20240724044956-00798.warc

file 2: /opt/workspace/datasets/common-crawl/CC-MAIN-20240725114544-20240725144544-00476.warc


In [5]:
warc_files

[PosixPath('/opt/workspace/datasets/common-crawl/CC-MAIN-20240724014956-20240724044956-00798.warc'),
 PosixPath('/opt/workspace/datasets/common-crawl/CC-MAIN-20240725114544-20240725144544-00476.warc')]

In [6]:
max_records = 2
input_file = cc_dir/warc_files[0]

with open(input_file, 'rb') as file_stream:
    record_num = 0
    for record in ArchiveIterator(file_stream):
        if record.rec_type == "response":
            record_num += 1
            print(f"Record headers: {record.rec_headers}")
        if record_num == max_records: break

Record headers: WARC/1.0
WARC-Type: response
WARC-Date: 2024-07-24T02:50:03Z
WARC-Record-ID: <urn:uuid:f1ee65d4-9676-42a8-9e58-4255974742d2>
Content-Length: 104808
Content-Type: application/http; msgtype=response
WARC-Warcinfo-ID: <urn:uuid:e9e442c8-3d43-4395-a202-dd2aa8c91b0a>
WARC-Concurrent-To: <urn:uuid:d7712f1c-2f25-4280-9c3c-6a0bb149d67a>
WARC-IP-Address: 107.163.232.92
WARC-Target-URI: http://0.furkid.net/
WARC-Protocol: http/1.1
WARC-Payload-Digest: sha1:7TBKRIWHL3ND2HAIEPKDTARXTOYGYESP
WARC-Block-Digest: sha1:FWGYYUJS6NSB5DLDH2C4PHJDCWQAESPZ
WARC-Identified-Payload-Type: text/html

Record headers: WARC/1.0
WARC-Type: response
WARC-Date: 2024-07-24T02:42:19Z
WARC-Record-ID: <urn:uuid:c0beded9-7d7b-4a3d-b4ef-161431f16cf1>
Content-Length: 154
Content-Type: application/http; msgtype=response
WARC-Warcinfo-ID: <urn:uuid:e9e442c8-3d43-4395-a202-dd2aa8c91b0a>
WARC-Concurrent-To: <urn:uuid:86f9ed18-d923-4e5f-90a7-af5565d2e7bd>
WARC-IP-Address: 192.229.64.48
WARC-Target-URI: http://01r

In [7]:
schema = StructType([
    StructField("ip", StringType(), True),
])

In [8]:
def process_warc(filepath):
    """Yield tuple containing ip, url if record is of response type"""
    with open(filepath, 'rb') as stream:
        for record in ArchiveIterator(stream):
            if record.rec_type == "response":
                yield record.rec_headers.get_header("WARC-IP-Address", "-")

def proc_wrapper(_id, iterator):
    """Wrapper function for `process_warc` to handle multiple `warc` files"""
    for filepath in iterator:
        for res in process_warc(filepath):
            yield res

Collection of shape `n, 1` is expected and not `1, n`. Hence the mapping on `output` below.

In [9]:
data_files = spark.sparkContext.textFile("/opt/workspace/paths.txt")
output = data_files.mapPartitionsWithIndex(proc_wrapper)
output = output.map(lambda x: [x])

In [10]:
output.collect()[:5]

                                                                                

[['107.163.232.92'],
 ['192.229.64.48'],
 ['195.170.8.34'],
 ['107.190.226.20'],
 ['172.67.142.198']]

In [11]:
warcip_df = spark.createDataFrame(output, schema=schema); warcip_df.show()

+---------------+
|             ip|
+---------------+
| 107.163.232.92|
|  192.229.64.48|
|   195.170.8.34|
| 107.190.226.20|
| 172.67.142.198|
|   23.108.56.90|
| 137.184.244.32|
| 172.67.198.153|
|  220.228.6.123|
|  220.228.6.241|
|    220.228.6.6|
|   61.66.228.75|
|  77.232.40.211|
|   198.2.232.33|
|  220.228.6.119|
| 114.118.10.124|
|107.163.236.251|
| 208.109.212.43|
| 208.109.212.43|
|107.163.212.188|
+---------------+
only showing top 20 rows



In [12]:
warcip_df.count()

                                                                                

56366

In [13]:
maxmind_blocks_csv = "/opt/workspace/datasets/maxmind/GeoLite2-City-CSV_20240809/GeoLite2-City-Blocks-IPv4.csv"
maxmind_city_csv = "/opt/workspace/datasets/maxmind/GeoLite2-City-CSV_20240809/GeoLite2-City-Locations-en.csv"

In [14]:
maxmind_blocks_df = spark.read\
        .format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load(maxmind_blocks_csv)

                                                                                

In [15]:
maxmind_blocks_df.show()

+-------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+
|      network|geoname_id|registered_country_geoname_id|represented_country_geoname_id|is_anonymous_proxy|is_satellite_provider|postal_code|latitude|longitude|accuracy_radius|is_anycast|
+-------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+
|   1.0.0.0/24|      NULL|                      2077456|                          NULL|                 0|                    0|       NULL|    NULL|     NULL|           NULL|      NULL|
|   1.0.1.0/24|   1814991|                      1814991|                          NULL|                 0|                    0|       NULL| 34.7732|  113.722|           1000|      NULL|
|   1.0.2.0/23|   1814991|                      1814991|         

In [16]:
maxmind_blocks_df.count()

2419710

In [17]:
maxmind_blocks_df.printSchema()

root
 |-- network: string (nullable = true)
 |-- geoname_id: integer (nullable = true)
 |-- registered_country_geoname_id: integer (nullable = true)
 |-- represented_country_geoname_id: integer (nullable = true)
 |-- is_anonymous_proxy: integer (nullable = true)
 |-- is_satellite_provider: integer (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- accuracy_radius: integer (nullable = true)
 |-- is_anycast: string (nullable = true)



In [18]:

import ipaddress

In [19]:
ipaddress.IPv4Network("1.0.164.16/30").broadcast_address

IPv4Address('1.0.164.19')

In [20]:
@udf(returnType=LongType())
def ip_to_int(ip):
    """Converts IPV4 to int representation."""
    return int(ipaddress.IPv4Address(ip))

@udf(returnType=StructType([
    StructField("start_ip", LongType(), False),
    StructField("end_ip", LongType(), False)
]))
def cidr_to_range(cidr):
    """Converts IPV4 network (CIDR blocks) to starting and ending range int representations of IPV4."""
    cidr = ipaddress.IPv4Network(cidr)
    return int(cidr.network_address), int(cidr.broadcast_address)

In [21]:
warcip_df = warcip_df.withColumn("ip_int", ip_to_int(col("ip")))

In [22]:
maxmind_blocks_df = maxmind_blocks_df.withColumn("cidr_range", cidr_to_range(col("network")))\
             .withColumn("cidr_start", col("cidr_range.start_ip"))\
             .withColumn("cidr_end", col("cidr_range.end_ip"))\
             .drop("cidr_range")

In [23]:
maxmind_blocks_df.show()

+-------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+----------+--------+
|      network|geoname_id|registered_country_geoname_id|represented_country_geoname_id|is_anonymous_proxy|is_satellite_provider|postal_code|latitude|longitude|accuracy_radius|is_anycast|cidr_start|cidr_end|
+-------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+----------+--------+
|   1.0.0.0/24|      NULL|                      2077456|                          NULL|                 0|                    0|       NULL|    NULL|     NULL|           NULL|      NULL|  16777216|16777471|
|   1.0.1.0/24|   1814991|                      1814991|                          NULL|                 0|                    0|       NULL| 34.7732|  113.722|           10

In [24]:
# left joins `warcip_df` and `maxmind_blocks_df` retaining everything
# from the 1st table and "attaching" available values from the 2nd table
maxmind_joined_df = warcip_df.join(maxmind_blocks_df, 
        (warcip_df.ip_int >= maxmind_blocks_df.cidr_start) &
        (warcip_df.ip_int <= maxmind_blocks_df.cidr_end),
        "left_outer")\
  .drop("ip_int", "cidr_start", "cidr_end")

In [25]:
maxmind_joined_df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+---------------+----------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+
|             ip|         network|geoname_id|registered_country_geoname_id|represented_country_geoname_id|is_anonymous_proxy|is_satellite_provider|postal_code|latitude|longitude|accuracy_radius|is_anycast|
+---------------+----------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+
| 107.163.232.92|  107.163.0.0/16|   6252001|                      6252001|                          NULL|                 0|                    0|       NULL|  37.751|  -97.822|           1000|      NULL|
|  192.229.64.48| 192.229.64.0/18|   5368361|                      6252001|                          NULL|                 0|                    0|      90060| 34.0544|-118.244

                                                                                

In [None]:
# this takes forever to execute
maxmind_joined_df.count()

In [24]:
maxmind_city_df = spark.read\
          .format('csv')\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .load(maxmind_city_csv)

In [25]:
maxmind_city_df.columns

['geoname_id',
 'locale_code',
 'continent_code',
 'continent_name',
 'country_iso_code',
 'country_name',
 'subdivision_1_iso_code',
 'subdivision_1_name',
 'subdivision_2_iso_code',
 'subdivision_2_name',
 'city_name',
 'metro_code',
 'time_zone',
 'is_in_european_union']

In [27]:
maxmind_city_df.show()

+----------+-----------+--------------+--------------+----------------+------------+----------------------+------------------+----------------------+------------------+------------+----------+----------------+--------------------+
|geoname_id|locale_code|continent_code|continent_name|country_iso_code|country_name|subdivision_1_iso_code|subdivision_1_name|subdivision_2_iso_code|subdivision_2_name|   city_name|metro_code|       time_zone|is_in_european_union|
+----------+-----------+--------------+--------------+----------------+------------+----------------------+------------------+----------------------+------------------+------------+----------+----------------+--------------------+
|      5819|         en|            EU|        Europe|              CY|      Cyprus|                    02| Limassol District|                  NULL|              NULL|       Soúni|      NULL|    Asia/Nicosia|                   1|
|     18918|         en|            EU|        Europe|              CY|     

In [28]:
# left joins `maxmind_joined_df` and `maxmind_city_df` retaining everything
# from the 1st table and "attaching" available values from the 2nd table
maxmind_joined_df = maxmind_joined_df.join(maxmind_city_df,
                                           maxmind_joined_df.geoname_id == maxmind_city_df.geoname_id,
                                           "left_outer").drop(maxmind_city_df.geoname_id)

In [29]:
maxmind_joined_df.show()

[Stage 19:>                                                         (0 + 1) / 1]

+---------------+----------------+----------+-----------------------------+------------------------------+------------------+---------------------+-----------+--------+---------+---------------+----------+-----------+--------------+--------------+----------------+-------------+----------------------+------------------+----------------------+------------------+---------------+----------+-------------------+--------------------+
|             ip|         network|geoname_id|registered_country_geoname_id|represented_country_geoname_id|is_anonymous_proxy|is_satellite_provider|postal_code|latitude|longitude|accuracy_radius|is_anycast|locale_code|continent_code|continent_name|country_iso_code| country_name|subdivision_1_iso_code|subdivision_1_name|subdivision_2_iso_code|subdivision_2_name|      city_name|metro_code|          time_zone|is_in_european_union|
+---------------+----------------+----------+-----------------------------+------------------------------+------------------+-------------

                                                                                

In [30]:
maxmind_joined_df.columns

['ip',
 'network',
 'geoname_id',
 'registered_country_geoname_id',
 'represented_country_geoname_id',
 'is_anonymous_proxy',
 'is_satellite_provider',
 'postal_code',
 'latitude',
 'longitude',
 'accuracy_radius',
 'is_anycast',
 'locale_code',
 'continent_code',
 'continent_name',
 'country_iso_code',
 'country_name',
 'subdivision_1_iso_code',
 'subdivision_1_name',
 'subdivision_2_iso_code',
 'subdivision_2_name',
 'city_name',
 'metro_code',
 'time_zone',
 'is_in_european_union']

In [32]:
maxmind_joined_df.write.csv("./processed_artifacts/maxmind_warc/", header=True)

                                                                                