# Spark Training Exercise Book Day 2



In this excercise you will process Apache HTTP access logs with Spark. Below you can see some sample lines from our test dataset.

The security team let you know, they would like to see some statistics from this large volume of log files, for example which resource produced the most of the non 200 (OK) HTTP response codes. They also have a list of malicious IP address. They would like to see a report about the specific requests which are initiated from one of these IPs.

It is up to you how to implement this use-case in Spark.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

sc = pyspark.SparkContext()
spark = SparkSession(sc)

In [2]:
!head -n 5 data/apache_logs.txt

head: cannot open 'data/apache_logs.txt' for reading: No such file or directory


As you can see, the Apache access log file is not perfactly a CSV so we need some additional processing to be able to build a dataframe from this file. Here are some helper functions which can help you in extracting the relevant fields for this excercise. You can use the **process_access_log_line** function to process one access log line and get back the IP address, the date (in string), the hour (int), some header information like resource as well as the HTTP response code generated by the server.

In [3]:
import time
import datetime

def get_ip(s):
    return s.split(' ')[0]

def get_timestamp(str):
    s = str.find('[')
    l = str.find(']')
    ts_str = str[s + 1:l]
    #return long(ts)
    return ts_str

def get_header(str):
    s = str.find('"')
    l = str[s + 1:].find('"')
    header = str[s + 1:s + l + 1].split(' ')
    method = header[0] if len(header) > 0 else "malformed"
    resource = header[1] if len(header) > 1 else "malformed"
    protocol = header[2] if len(header) > 2 else "malformed"        
    return (method, resource, protocol)
    
def get_error_code(str):
    f = str.split(' ')
    if len(f) < 9:
        return 0
    try:
        code = int(f[8])
    except ValueError:
        code = 0
    return code

# input: raw access log from the RDD
# output: structured daa: full line -> (ip, ts, date, hour, method, resource, protocol, response code)
def process_access_log_line(log_line):
    header = get_header(log_line)
    ts_str = get_timestamp(log_line)
    date_str = "1980-01-01"
    hour = 12
    try:
        td = datetime.datetime.strptime(ts_str, "%d/%b/%Y:%H:%M:%S %z")
        date_str = '{}-{}-{}'.format(td.year, td.month, td.day)   
        hour = td.hour
    except ValueError:
        pass
    return (get_ip(log_line), ts_str, date_str, hour, header[0], header[1], header[2], get_error_code(log_line))


Not it is time to read up the **data/access_logs.txt** file and do some transformation to be able to create a dataframe and do some aggregation. We also provided you the schema what is matching with the output of the **process_access_log_line** method and can be used to create the dataframe.

In [4]:
access_log_schema = StructType([
            StructField('ip', StringType(), True),
            StructField('ts', StringType(), True),
            StructField('date', StringType(), True),
            StructField('hour', IntegerType(), True),
            StructField('method', StringType(), True),
            StructField('resource', StringType(), True),
            StructField('protocol', StringType(), True),
            StructField('response', IntegerType(), True)
        ])

In [7]:
!pwd

/home/andras/git


In [25]:
# Your code comes here

# Step 1: read the data to a RDD
rdd = sc.textFile('spark-training-october-20/data/apache_logs.txt')

# Step 2: transoform the lines of RDD to a "structured" format
#   you can use the process_access_log_line function for the transformation
rdd = rdd.filter(lambda line: len(line) > 0) \
    .map(process_access_log_line)

# Step 3: create the dataframe
df = spark.createDataFrame(rdd, access_log_schema)
df.show()

+---------------+--------------------+----------+----+------+--------------------+--------+--------+
|             ip|                  ts|      date|hour|method|            resource|protocol|response|
+---------------+--------------------+----------+----+------+--------------------+--------+--------+
|109.169.248.247|12/Dec/2015:18:25...|2015-12-12|  18|   GET|     /administrator/|HTTP/1.1|     200|
|109.169.248.247|12/Dec/2015:18:25...|2015-12-12|  18|  POST|/administrator/in...|HTTP/1.1|     200|
|    46.72.177.4|12/Dec/2015:18:31...|2015-12-12|  18|   GET|     /administrator/|HTTP/1.1|     200|
|    46.72.177.4|12/Dec/2015:18:31...|2015-12-12|  18|  POST|/administrator/in...|HTTP/1.1|     200|
| 83.167.113.100|12/Dec/2015:18:31...|2015-12-12|  18|   GET|     /administrator/|HTTP/1.1|     200|
| 83.167.113.100|12/Dec/2015:18:31...|2015-12-12|  18|  POST|/administrator/in...|HTTP/1.1|     200|
|   95.29.198.15|12/Dec/2015:18:32...|2015-12-12|  18|   GET|     /administrator/|HTTP/1.1|

In [15]:
def generator(line):
    s = line.split(' ')
    for words in s:
        yield words
        
for i in generator('sadas sada sd as dasd'):
    print(i)
    
        
#sc.textFile('data/apache_logs.txt').flatMap(generator).take(100)



sadas
sada
sd
as
dasd


The security team would like to see the top 10 resources which generates the highest number of error (not 200) response codes. They would like to see this information on hourly basis. The security team would like to store this information on a shared distributed data storage (like S3) where they effectively can query this information. Most of the queries are filtered for one specific day.

In [27]:
# Your code comes here
df.createOrReplaceTempView('access_log')
df_res = spark.sql("""
SELECT date, hour, resource, count(1) as cnt
FROM access_log
WHERE response != 200
GROUP BY date, hour, resource
ORDER BY cnt DESC
LIMIT 10;
""")
df_res.show()

# Pythonic way, homework!


+----------+----+--------------------+---+
|      date|hour|            resource|cnt|
+----------+----+--------------------+---+
| 2016-1-14|  20|/templates/_syste...| 63|
| 2016-2-11|   0|/apache-log/acces...| 57|
|  2016-1-1|  21|/templates/_syste...| 35|
|  2016-2-8|   9|/templates/_syste...| 31|
|  2016-1-6|  19|/templates/_syste...| 29|
| 2016-1-15|  20|/templates/_syste...| 22|
|2015-12-20|  18|/templates/_syste...| 22|
|2015-12-20|  12|/apache-log/acces...| 20|
| 2016-1-21|  13|      /administrator| 19|
| 2016-1-14|   8|/templates/_syste...| 19|
+----------+----+--------------------+---+



In [38]:
df_res.write \
    .format('parquet') \
    .mode('overwrite') \
    .save('access_log_report')

In [36]:
!ls access_log_report

_SUCCESS  part-00000-f2fb1794-8cfd-459b-a47f-fc087239531e-c000.snappy.parquet


In [37]:
df_back = spark.read.format('parquet').load('access_log_report')
df_back.show()

+----------+----+--------------------+---+
|      date|hour|            resource|cnt|
+----------+----+--------------------+---+
| 2016-1-14|  20|/templates/_syste...| 63|
| 2016-2-11|   0|/apache-log/acces...| 57|
|  2016-1-1|  21|/templates/_syste...| 35|
|  2016-2-8|   9|/templates/_syste...| 31|
|  2016-1-6|  19|/templates/_syste...| 29|
| 2016-1-15|  20|/templates/_syste...| 22|
|2015-12-20|  18|/templates/_syste...| 22|
|2015-12-20|  12|/apache-log/acces...| 20|
| 2016-1-21|  13|      /administrator| 19|
| 2016-1-14|   8|/templates/_syste...| 19|
+----------+----+--------------------+---+



In [39]:
df_res.write \
    .format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .partitionBy('date') \
    .save('access_log_report')

In [42]:
!cat access_log_report/date=2016-1-6/part-00000-e0f6ca24-c133-42b4-85ba-e846f5f705d1.c000.csv

hour,resource,cnt
19,/templates/_system/css/general.css,29


In [44]:
df_res.write \
    .format('csv') \
    .option('header', 'true') \
    .mode('overwrite') \
    .bucketBy(6, 'resource') \
    .saveAsTable('access_log_report')
# .save('access_log_report')
    

In [47]:
!ls spark-warehouse/access_log_report

_SUCCESS
part-00000-4ad4f328-cf26-4fe9-9e7f-51a0383e935e_00000.c000.csv
part-00000-4ad4f328-cf26-4fe9-9e7f-51a0383e935e_00001.c000.csv
part-00000-4ad4f328-cf26-4fe9-9e7f-51a0383e935e_00003.c000.csv
part-00000-4ad4f328-cf26-4fe9-9e7f-51a0383e935e_00004.c000.csv


In [49]:
print(df.rdd.getNumPartitions())
df = df.coalesce(1)
print(df.rdd.getNumPartitions())

2
1


In [50]:
df = df.repartition(6)
df.write.format('csv').mode('overwrite').save('access_log_report')

In [51]:
!ls access_log_report

_SUCCESS
part-00000-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv
part-00001-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv
part-00002-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv
part-00003-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv
part-00004-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv
part-00005-413ad56e-01d3-4a66-b7b5-8fdf7dfdb0da-c000.csv


In [56]:
df = df.repartition(100, 'hour')
print(df.rdd.getNumPartitions())
df.rdd.glom().take(4)

100


[[],
 [],
 [Row(ip='185.31.167.111', ts='13/Dec/2015:04:01:06 +0100', date='2015-12-13', hour=4, method='GET', resource='/administrator/', protocol='HTTP/1.1', response=200),
  Row(ip='185.31.167.111', ts='13/Dec/2015:04:01:06 +0100', date='2015-12-13', hour=4, method='POST', resource='/administrator/index.php', protocol='HTTP/1.1', response=200),
  Row(ip='31.134.53.96', ts='13/Dec/2015:04:08:50 +0100', date='2015-12-13', hour=4, method='GET', resource='/apache-log/access.log', protocol='HTTP/1.1', response=200),
  Row(ip='46.44.5.179', ts='13/Dec/2015:04:10:24 +0100', date='2015-12-13', hour=4, method='GET', resource='/administrator/', protocol='HTTP/1.1', response=200),
  Row(ip='46.44.5.179', ts='13/Dec/2015:04:10:24 +0100', date='2015-12-13', hour=4, method='POST', resource='/administrator/index.php', protocol='HTTP/1.1', response=200),
  Row(ip='178.204.13.65', ts='13/Dec/2015:04:10:50 +0100', date='2015-12-13', hour=4, method='GET', resource='/administrator/', protocol='HTTP/1.1

The security team also provided a list of malicious IP address. Your task is to count how many requests coming from each malicous IP address in each hour. You will find the IP list under the **data/ip-list.txt**

In [8]:
!head -10 data/ip-list.txt

Malicious IP address:  109.106.142.176
Malicious IP address:  109.184.11.34
Malicious IP address:  128.72.82.254
Malicious IP address:  130.255.13.57
Malicious IP address:  176.194.74.20
Malicious IP address:  176.214.240.8
Malicious IP address:  177.201.52.125
Malicious IP address:  178.35.29.219
Malicious IP address:  195.8.51.14
Malicious IP address:  2.86.149.141


In [21]:
def process_ip_line(line):
    s = line.split(':')[1].replace(' ', '')
    return s

#process_ip_line('Malicious IP address:  109.106.142.176')

# your code comes here
rdd = sc.textFile('spark-training-october-20/data/ip-list.txt')
rdd = rdd.map(process_ip_line)
print(rdd.take(10))

df_ip = rdd.toDF(StringType())
df_ip.show()

['109.106.142.176', '109.184.11.34', '128.72.82.254', '130.255.13.57', '176.194.74.20', '176.214.240.8', '177.201.52.125', '178.35.29.219', '195.8.51.14', '2.86.149.141']
+---------------+
|          value|
+---------------+
|109.106.142.176|
|  109.184.11.34|
|  128.72.82.254|
|  130.255.13.57|
|  176.194.74.20|
|  176.214.240.8|
| 177.201.52.125|
|  178.35.29.219|
|    195.8.51.14|
|   2.86.149.141|
|    2.93.22.237|
|   204.44.90.14|
|  206.59.253.65|
| 213.24.132.190|
|   37.1.206.196|
| 37.113.244.223|
| 46.146.101.241|
|  46.166.75.133|
|   46.211.46.18|
|   5.138.36.198|
+---------------+
only showing top 20 rows



In [28]:
def process_ip_line(line):
    s = line.split(':')[1].replace(' ', '')
    return (s,)

#process_ip_line('Malicious IP address:  109.106.142.176')

# your code comes here
rdd = sc.textFile('spark-training-october-20/data/ip-list.txt')
rdd = rdd.map(process_ip_line)
df_ip = rdd.toDF(['malicious_ip'])
df_ip.show()

+---------------+
|   malicious_ip|
+---------------+
|109.106.142.176|
|  109.184.11.34|
|  128.72.82.254|
|  130.255.13.57|
|  176.194.74.20|
|  176.214.240.8|
| 177.201.52.125|
|  178.35.29.219|
|    195.8.51.14|
|   2.86.149.141|
|    2.93.22.237|
|   204.44.90.14|
|  206.59.253.65|
| 213.24.132.190|
|   37.1.206.196|
| 37.113.244.223|
| 46.146.101.241|
|  46.166.75.133|
|   46.211.46.18|
|   5.138.36.198|
+---------------+
only showing top 20 rows



In [32]:
# 
# df.join(other_df, df.key1 == other_df.key2, 'inner')
# df.join(other_df.hint('broadcast'), df.key1 == other_df.key2, 'inner')
#
# df.explain()
df_joined = df.join(df_ip.hint('broadcast'), df.ip == df_ip.malicious_ip, 'inner')
df_joined.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [ip#191], [malicious_ip#301], Inner, BuildRight
:- *(2) Filter isnotnull(ip#191)
:  +- *(2) Scan ExistingRDD[ip#191,ts#192,date#193,hour#194,method#195,resource#196,protocol#197,response#198]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])), [id=#334]
   +- *(1) Filter isnotnull(malicious_ip#301)
      +- *(1) Scan ExistingRDD[malicious_ip#301]


