In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Process RDD Logs").getOrCreate()
spark

In [2]:
import os
os.cpu_count()

32

# Apache Access Logs

In [3]:
rdd_2004 = spark.sparkContext.textFile("data/linux_2004/log/httpd/access_log*", 1)
rdd_2004.count()

38487

In [4]:
rdd_2005 = spark.sparkContext.textFile("data/linux_2005/log/httpd/access_log*", 1)
rdd_2005.count()

5936

In [5]:
rdd_2006 = spark.sparkContext.textFile("data/linux_2006/log/httpd/access_log*", 1)
rdd_2006.count()

36310

In [6]:
rdd_access = spark.sparkContext.textFile("data/apache/access.log")
rdd_access.count()

6016792

## Combine into single DataFrame

In [7]:
rdd = spark.sparkContext.union([rdd_access, rdd_2004, rdd_2005, rdd_2006])
total = rdd.count()
total

6097525

In [8]:
rdd.getNumPartitions()

108

# EDA

In [11]:
rdd_distinct = rdd.distinct()
total_distinct = rdd_distinct.count()
total_distinct

4256761

In [16]:
sample_rdd = rdd.sample(False, fraction=10/total)
print(sample_rdd.count())

15


In [21]:
sample = sample_rdd.collect()

## Using Regular Expressions for extracting the fields of the log as columns

In [24]:
import re
from pyspark.sql import functions as F, types as T

In [19]:
log_fields = ('IP', 'UserIdentity', 'Username', 'Timestamp', 'Request', 'StatusCode', 
              'Size(bytes)', 'Referrer', 'UserAgent', 'Unknown')
pattern = r'(.+) (.+) (.+) \[(.+)\] "(.+)" (\d+) (\d+) "(.+?)" "(.+?)" ?(".+")?'

#### Some re pattern explanations
<pre>
(.+?) - non-greedy matching
(".+")? - optional group
</pre>

### Try using sample

In [43]:
def extract(string):
    matched = re.match(pattern, string)
    if matched:
        return matched.groups()
    return ('')*len(log_fields)

In [44]:
sample

['128.72.153.89 - - [18/Feb/2016:20:37:00 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.71 Safari/537.36" "-"',
 '193.186.225.253 - - [04/Oct/2016:11:36:21 +0200] "GET /templates/_system/css/general.css HTTP/1.1" 404 239 "http://www.almhuette-raith.at/" "Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko" "-"',
 '185.190.150.10 - - [21/Feb/2018:13:09:55 +0100] "GET /favicon.ico HTTP/1.1" 404 217 "http://www.almhuette-raith.at/apache-log/access.log" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 OPR/51.0.2830.34" "-"',
 '5.112.66.178 - - [21/Jun/2018:17:01:04 +0200] "GET /apache-log/access.log HTTP/1.1" 206 43192 "http://www.almhuette-raith.at/apache-log/" "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko" "-"',
 '79.62.229.212 - - [23/Jun/201

In [45]:
[print(row, len(row)) for row in map(extract, sample)];

('128.72.153.89', '-', '-', '18/Feb/2016:20:37:00 +0100', 'POST /administrator/index.php HTTP/1.1', '200', '4494', 'http://almhuette-raith.at/administrator/', 'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.71 Safari/537.36', '"-"') 10
('193.186.225.253', '-', '-', '04/Oct/2016:11:36:21 +0200', 'GET /templates/_system/css/general.css HTTP/1.1', '404', '239', 'http://www.almhuette-raith.at/', 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; Touch; rv:11.0) like Gecko', '"-"') 10
('185.190.150.10', '-', '-', '21/Feb/2018:13:09:55 +0100', 'GET /favicon.ico HTTP/1.1', '404', '217', 'http://www.almhuette-raith.at/apache-log/access.log', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 OPR/51.0.2830.34', '"-"') 10
('5.112.66.178', '-', '-', '21/Jun/2018:17:01:04 +0200', 'GET /apache-log/access.log HTTP/1.1', '206', '43192', 'http://www.almhuette-raith.at/apache-log/', 'Mozilla/5.0 (Wind

## Using map transformation in RDD

In [46]:
%%time
matched_rdd = rdd.map(extract)
matched_rdd.count()

CPU times: user 15.2 ms, sys: 1.81 ms, total: 17 ms
Wall time: 1min 5s


6097525

In [52]:
%%time
filtered_rdd = matched_rdd.filter(lambda row: row is not ('')*len(log_fields))
filtered_rdd.count()

CPU times: user 14.4 ms, sys: 2.39 ms, total: 16.8 ms
Wall time: 1min 4s


6084776

In [51]:
x = ('')*10
x == ('')*10

True

In [None]:
print("We can see that {-} rows were not successfully matched")

In [48]:
matched_rdd.take(3)

['',
 ('109.169.248.247',
  '-',
  '-',
  '12/Dec/2015:18:25:11 +0100',
  'GET /administrator/ HTTP/1.1',
  '200',
  '4263',
  '-',
  'Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0',
  '"-"'),
 ('109.169.248.247',
  '-',
  '-',
  '12/Dec/2015:18:25:11 +0100',
  'POST /administrator/index.php HTTP/1.1',
  '200',
  '4494',
  'http://almhuette-raith.at/administrator/',
  'Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0',
  '"-"')]

## Using DataFrames
### Using regexp_extract function

In [49]:
rdd.take(3)

['',
 '109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"',
 '109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"']

In [41]:
%%time
raw_df = rdd.toDF()

ValueError: The first row in RDD is empty, can not infer schema

In [26]:
%%time
extracted_df = raw_df.select(*[F.regexp_extract("value", pattern, idx).alias(colname) 
                               for idx, colname in zip(range(1,11), log_format)])
print(extracted_df.count())
extracted_df.limit(5).toPandas()

NameError: name 'raw_df' is not defined

In [177]:
%%time
extracted_df = raw_df.select(F.regexp_extract("value", pattern, 1).alias('IP'),
                             F.regexp_extract("value", pattern, 2).alias('UserIdentity'),
                             F.regexp_extract("value", pattern, 3).alias('Username'),
                             F.to_timestamp(regexp_extract("value", pattern, 4),
                                            '%d/%b/%Y:%H:%M:%S %z').alias('Timestamp'),
                             F.regexp_extract("value", pattern, 5).alias('Request'),
                             F.regexp_extract("value", pattern, 6).cast("integer").alias('StatusCode'),
                             F.regexp_extract("value", pattern, 7).cast("integer").alias('Size(bytes)'),
                             F.regexp_extract("value", pattern, 8).alias('Referrer'),
                             F.regexp_extract("value", pattern, 9).alias('UserAgent'),
                             F.regexp_extract("value", pattern, 10).alias('Unknown')
                             )
print(extracted_df.count())
extracted_df.show(5)

6097525
+---------------+------------+--------+---------+--------------------+----------+-----------+--------------------+--------------------+-------+
|             IP|UserIdentity|Username|Timestamp|             Request|StatusCode|Size(bytes)|            Referrer|           UserAgent|Unknown|
+---------------+------------+--------+---------+--------------------+----------+-----------+--------------------+--------------------+-------+
|               |            |        |     null|                    |      null|       null|                    |                    |       |
|109.169.248.247|           -|       -|     null|GET /administrato...|       200|       4263|                   -|Mozilla/5.0 (Wind...|    "-"|
|109.169.248.247|           -|       -|     null|POST /administrat...|       200|       4494|http://almhuette-...|Mozilla/5.0 (Wind...|    "-"|
|    46.72.177.4|           -|       -|     null|GET /administrato...|       200|       4263|                   -|Mozilla/5.0 (W

### Extract remote hostname/ip

In [12]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = []
for row in sample:
    hostname = None
    m = re.match(host_pattern, row)
    if m:
        hostname = m.group(1)
    hosts.append(hostname)
hosts

['198.50.156.189',
 '198.50.156.189',
 '158.69.225.36',
 '5.114.64.184',
 '213.137.37.58',
 '13.84.43.203',
 '84.115.26.129',
 '5.112.125.111']

#### Check

In [13]:
diff = total - df.select(regexp_extract('value', host_pattern, 1)).count()
if diff:
    print(f"{diff} rows did not match, need to work on the Regex pattern")
else:
    print("Good to go")

Good to go


### Extract timestamp

In [14]:
import time

In [15]:
ts_pattern = r'.*\[(.*)\].*'
ts = []
for row in sample:
    t = None
    m = re.match(ts_pattern, row)
    if m:
        t = m.group(1)
    ts.append(t)
ts

['01/Apr/2017:18:38:47 +0200',
 '01/Apr/2017:19:30:00 +0200',
 '21/Jan/2018:15:06:03 +0100',
 '27/Jun/2018:12:06:49 +0200',
 '08/Jan/2019:16:49:55 +0100',
 '05/Nov/2019:11:00:11 +0100',
 '23/Mar/2020:08:56:01 +0100',
 '30/Apr/2020:23:44:42 +0200']

In [165]:
time.strptime('01/Apr/2017:18:38:47 +0200' ,'%d/%b/%Y:%H:%M:%S %z')

time.struct_time(tm_year=2017, tm_mon=4, tm_mday=1, tm_hour=18, tm_min=38, tm_sec=47, tm_wday=5, tm_yday=91, tm_isdst=-1)

In [16]:
diff = total - df.select(regexp_extract('value', ts_pattern, 1)).count()
if diff:
    print(f"{diff} rows did not match, need to work on the Regex pattern")
else:
    print("Good to go")

Good to go


### Extract HTTP Request