In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql import SQLContext
from pyspark.sql import Window
from pyspark.sql.types import *

session = SparkSession.builder.appName("Weblog Analytics").master("local[*]").getOrCreate()

In [3]:
baselog_df = session.read.text('./test.log')
baselog_df.show(2, truncate=True)

+--------------------+
|               value|
+--------------------+
|2019-07-22T09:00:...|
|2019-07-22T09:00:...|
+--------------------+
only showing top 2 rows



In [4]:
elbLogRegex = r'(\d{4}-\d{2}-\d{2}\w\d{2}:+\d.+\d\w) (\S+) (\S+) (\S+) ([\d.]+[:\d]) ([\d.]+[:\d]) (\d{3}) (\d{3}) (\d+) (\d+) "(.+?)" "(.+?)" ([\w-]+) ([\w+\d.]+)'

logs_df = baselog_df.select(func.regexp_extract('value', elbLogRegex, 1).alias('timestamp'),
                         func.regexp_extract('value', elbLogRegex, 2).alias('clientIP'),
                         func.regexp_extract('value', elbLogRegex, 3).alias('backendIP'),
                         func.regexp_extract('value', elbLogRegex, 4).alias('request_processing_time'),
                         func.regexp_extract('value', elbLogRegex, 5).alias('backend_processing_time'),
                         func.regexp_extract('value', elbLogRegex, 6).alias('response_process_time'),
                         func.regexp_extract('value', elbLogRegex, 7).alias('status_code'),
                         func.regexp_extract('value', elbLogRegex, 8).alias('backend_status_code'),
                         func.regexp_extract('value', elbLogRegex, 9).alias('received_bytes'),
                         func.regexp_extract('value', elbLogRegex, 10).alias('sent_bytes'),
                         func.regexp_extract('value', elbLogRegex, 11).alias('request'),
                         func.regexp_extract('value', elbLogRegex, 12).alias('user_agent'),
                         func.regexp_extract('value', elbLogRegex, 13).alias('ssl_cipher'),
                         func.regexp_extract('value', elbLogRegex, 13).alias('ssl_protocol'))

In [5]:
clientAddress = func.split(logs_df['clientIP'], ':')
backendAddress = func.split(logs_df['backendIP'], ':')

logs_df_addedColumns = logs_df.withColumn('c_IP', clientAddress.getItem(0)) \
                .withColumn('c_Port', clientAddress.getItem(1)) \
                .withColumn('b_IP', backendAddress.getItem(0)) \
                .withColumn('b_Port', backendAddress.getItem(1)) \
                .withColumn('timestamp', func.col('timestamp').cast('timestamp')
)

### Q1: Aggregate the pages by clientIP during a session.

In [6]:
logs_df_with_lags = logs_df_addedColumns.withColumn('prev_time', func.lag('timestamp', 1).over(Window.partitionBy('c_IP').orderBy('timestamp')))

logs_df_with_new_session = logs_df_with_lags.withColumn('is_new_session',
                func.when(func.unix_timestamp('timestamp') - func.unix_timestamp('prev_time') < func.lit(3600), func.lit(0))
                .otherwise(func.lit(1))
)

logs_df_with_session = logs_df_with_new_session.withColumn('session_id_by_cIP', func.sum(func.col('is_new_session')).over(Window.partitionBy('c_IP').orderBy('c_IP', 'timestamp')))

logs_df_with_session.show(5)
logs_df_with_session.write.parquet('Q1/output.parquet')

### Q2: Find average session time.

In [7]:
logs_df_with_session_time = logs_df_with_session.groupBy('c_IP', 'session_id_by_cIP') \
                .agg((func.unix_timestamp(func.max('timestamp'))- func.unix_timestamp(func.min('timestamp'))).alias("session_time"))

avg_session_time = logs_df_with_session_time.select(func.avg(func.col('session_time')))
avg_session_time.show()

+------------------+
| avg(session_time)|
+------------------+
|190.14408010691253|
+------------------+



### Q3: Find unique URL visits per session.

In [8]:
logs_df_with_req_info = logs_df_with_session.withColumn('request_type', func.split('request', ' ').getItem(0)) \
                .withColumn('temp_url', func.split('request', ' ').getItem(1)) \
                .withColumn('request_url', func.split('temp_url', '\?').getItem(0)) \
                .withColumn('request_params', func.split('temp_url', '\?').getItem(1)) \
                .drop('temp_url')

logs_df_with_unique_url = logs_df_with_req_info.groupBy('c_IP', 'session_id_by_cIP') \
                .agg(func.countDistinct('request_url').alias('distinct_url_count'))

logs_df_with_unique_url.write.parquet('Q3/output.parquet')
logs_df_with_unique_url.show()

+------------+-----------------+------------------+
|        c_IP|session_id_by_cIP|distinct_url_count|
+------------+-----------------+------------------+
|1.186.143.37|                1|                 2|
|1.187.164.29|                1|                 8|
|  1.22.41.76|                1|                 5|
| 1.23.208.26|                1|                 4|
| 1.23.36.184|                1|                 4|
|   1.38.19.8|                1|                 1|
|  1.38.20.34|                1|                14|
|  1.39.13.13|                1|                 2|
| 1.39.32.249|                1|                 4|
| 1.39.32.249|                2|                 2|
|  1.39.32.59|                1|                 1|
| 1.39.33.153|                1|                 6|
|  1.39.33.33|                1|                 2|
|  1.39.33.77|                1|                 2|
|  1.39.33.77|                2|                 4|
|   1.39.34.4|                1|                 1|
|  1.39.40.4

### Q4: Find IPs with the longest session times

In [9]:
logs_df_with_longest_session = logs_df_with_session_time.orderBy(func.col('session_time').desc())

logs_df_with_longest_session.write.parquet('Q4/output.parquet')
logs_df_with_longest_session.show()

+--------------+-----------------+------------+
|          c_IP|session_id_by_cIP|session_time|
+--------------+-----------------+------------+
| 220.226.206.7|                7|        8089|
| 119.81.61.166|                6|        6899|
|  52.74.219.71|                6|        6899|
| 54.251.151.39|                6|        6898|
| 54.169.20.106|                3|        6898|
| 54.169.191.85|                4|        6897|
| 117.242.50.53|                1|        6894|
|  54.232.40.76|                6|        6892|
|  103.5.132.66|                2|        6892|
| 66.249.71.118|                3|        6892|
| 66.249.71.110|                3|        6890|
| 220.227.97.99|                5|        6889|
| 54.244.52.204|                6|        6889|
|175.101.80.134|                5|        6885|
|  202.12.83.44|                5|        6883|
|180.179.213.94|                6|        6880|
|  20.139.66.64|                5|        6879|
|  54.228.16.12|                6|      

In [None]:
logs_df_with_req_info.write.parquet('FINAL/output.parquet')