In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/ubuntu/Github/DataEngineerChallenge/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, max, sum, mean
from pyspark.sql.functions import col, when, count, countDistinct
from pyspark.sql.functions import split, concat_ws
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import StringType, FloatType


if "spark" not in dir():
    spark = SparkSession.builder \
    .appName("web_log_analysis") \
    .getOrCreate()
    
logFile = "data/2015_07_22_mktplace_shop_web_log_sample.log.gz"
numPartitions = 15
session_time = 15*60


def duration(start, end):
    try:
        num_of_seconds = (end - start).total_seconds()
    except:
        num_of_seconds = 0
    return num_of_seconds;

get_duration = udf(duration, FloatType())



def initialization(spark):
    log_schema = StructType([
        StructField("timestamp", StringType(), False),
        StructField("elb", StringType(), False),
        StructField("client:port", StringType(), False),
        StructField("backend:port", StringType(), False),
        StructField("request_processing_time", StringType(), False),
        StructField("backend_processing_time", StringType(), False),
        StructField("response_processing_time", StringType(), False),
        StructField("elb_status_code", StringType(), False),
        StructField("backend_status_code", StringType(), False),
        StructField("received_bytes", StringType(), False),
        StructField("sent_bytes", StringType(), False),
        StructField("request", StringType(), False),
        StructField("user_agent", StringType(), False),
        StructField("ssl_cipher", StringType(), False),
        StructField("ssl_protocol", StringType(), False)])
    
    df = spark.read.csv(logFile, schema=log_schema, sep=" ").repartition(numPartitions).cache()
    split_client = split(df["client:port"], ":")
    split_backend = split(df["backend:port"], ":")
    split_request = split(df["request"], " ")

    return df.withColumn("client_ip", split_client.getItem(0)) \
                .withColumn("client_port", split_client.getItem(1)) \
                .withColumn("backend_ip", split_backend.getItem(0)) \
                .withColumn("backend_port", split_backend.getItem(1)) \
                .withColumn("request_action", split_request.getItem(0)) \
                .withColumn("request_url", split_request.getItem(1)) \
                .withColumn("request_protocol", split_request.getItem(2)) \
                .withColumn("current_timestamp", col("timestamp").cast("timestamp")) \
                .drop("client:port","backend:port","request").cache()


def preprocess(df_logs):
    window_func_ip = Window.partitionBy("client_ip").orderBy("current_timestamp")
    df = df_logs.withColumn("previous_timestamp",
                            lag(col("current_timestamp")).over(window_func_ip)) \
                .withColumn("session_duration",
                            get_duration(col("previous_timestamp"), col("current_timestamp"))) \
                .withColumn("is_new_session",
                            when((col("session_duration") > session_time), 1).otherwise(0)) \
                .withColumn("count_session",
                            sum(col("is_new_session")).over(window_func_ip)) \
                .withColumn("ip_session_count",
                            concat_ws("_", col("client_ip"), col("count_session")))

    return df.select(["ip_session_count", "client_ip", "request_url",
                               "previous_timestamp", "current_timestamp",
                               "session_duration", "is_new_session", "count_session"]);

def average_session_time(df_ip_session):
    window_func_session = Window.partitionBy("ip_session_count").orderBy("current_timestamp")
    df_session = df_ip_session.withColumn("previous_timestamp_session",
                              lag(df_ip_session["current_timestamp"]).over(window_func_session)) \
                  .withColumn("current_session_duration",
                              get_duration(col("previous_timestamp_session"), col("current_timestamp")))
    df_session_total = df_session.groupby("ip_session_count").agg(
            sum("current_session_duration").alias("total_session_time")).cache()
    df_session_avg = df_session_total.select([mean("total_session_time").alias("avg_session_time")]).cache()
    df_session_avg.show()
    return df_session


def count_unique_request(df_session):
    df_unique_url = df_session.groupby("ip_session_count").agg(
        countDistinct("request_url").alias("count_unique_requests"))
    df_unique_url.show()

def get_longest_session_time(df_session):
    df_ip_time = df_session.groupby("client_ip").agg(
                sum("session_duration").alias("session_time_all"),
                count("client_ip").alias("num_sessions"),
                max("session_duration").alias("session_duration_max")) \
          .withColumn("avg_session_time", col("session_time_all") / col("num_sessions")) \
          .orderBy(col("avg_session_time"), ascending=False)
    df_ip_time.show()


def solve(spark):
    df_logs = initialization(spark).cache()
    df_ip_session = preprocess(df_logs).repartition(numPartitions).cache()
    df_session = average_session_time(df_ip_session).repartition(numPartitions).cache()
    count_unique_request(df_session)
    get_longest_session_time(df_session)

solve(spark)

+------------------+
|  avg_session_time|
+------------------+
|125.63083815583757|
+------------------+

+-----------------+---------------------+
| ip_session_count|count_unique_requests|
+-----------------+---------------------+
|   182.69.48.36_0|                  108|
| 117.210.14.119_0|                    3|
| 59.165.251.191_2|                   86|
|115.248.233.203_2|                   86|
|205.175.226.101_0|                   89|
|  106.51.235.51_0|                   89|
| 122.164.34.125_0|                    8|
|115.242.129.233_0|                    7|
| 115.249.21.130_0|                   10|
| 59.184.184.157_0|                    9|
| 115.111.50.254_1|                   18|
|  182.68.136.65_0|                  104|
|   202.91.134.7_4|                   10|
| 223.255.247.66_0|                    7|
|  188.40.94.195_1|                   89|
|117.239.224.160_1|                   65|
|  101.57.193.44_0|                   82|
|122.179.135.178_0|                    8|
|    8.37.22