# PayPay Data Engineering Take-home Test

In [1]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Window

In [2]:
spark = (
    SparkSession.builder
    .master("local[*]") # only for demo and testing purposes
    .appName("Paypay Test")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/23 15:29:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
columns = [
    "timestamp", 
    "elb",
    "client:port",
    "backend:port",
    "request_processing_time",
    "backend_processing_time",
    "response_processing_time",
    "elb_status_code",
    "backend_status_code",
    "received_bytes",
    "sent_bytes",
    "request", 
    "user_agent",
    "ssl_cipher", 
    "ssl_protocol"
]

In [4]:
df = (
    spark.read.options(
        compression = "gzip",
        sep = " ",
        inferSchema = True
    )
    .csv("mktplace_shop_web_log_sample.log.gz")
    .toDF(*columns)
)

                                                                                

In [5]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- elb: string (nullable = true)
 |-- client:port: string (nullable = true)
 |-- backend:port: string (nullable = true)
 |-- request_processing_time: double (nullable = true)
 |-- backend_processing_time: double (nullable = true)
 |-- response_processing_time: double (nullable = true)
 |-- elb_status_code: integer (nullable = true)
 |-- backend_status_code: integer (nullable = true)
 |-- received_bytes: integer (nullable = true)
 |-- sent_bytes: integer (nullable = true)
 |-- request: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- ssl_cipher: string (nullable = true)
 |-- ssl_protocol: string (nullable = true)



## Processing & Analytical goals:
---
### Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session. https://en.wikipedia.org/wiki/Session_(web_analytics)

In [6]:
df = (
    df
    .withColumn("client:port", split(df["client:port"], ":"))
    .withColumn("client", element_at("client:port", 1))
    .withColumn("backend:port", split(df["backend:port"], ":"))
    .withColumn("backend", element_at("backend:port", 1))
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
    .drop(*[
        "client:port", 
        "backend:port",
        "request_processing_time",
        "backend_processing_time",
        "response_processing_time",
        "elb_status_code",
        "received_bytes",
        "sent_bytes",
        "request", 
        "user_agent",
        "ssl_cipher", 
        "ssl_protocol"
    ])
    .sort(["client", "timestamp"])
)

Restrict to status codes most likely to be associated with a valid request.

In [7]:
df = df.filter("backend_status_code >= 200 and backend_status_code < 400")

In [8]:
# get last event timestamp
df = df.withColumn(
    "last_event", 
    lag("timestamp", 1).over(Window.orderBy("timestamp").partitionBy("client"))
)

# get the length by taking difference b/t this event and last event
df = df.withColumn(
    "event_duration",
    when(df["last_event"].isNull(), 0).otherwise(unix_timestamp(df["timestamp"]) - unix_timestamp(df["last_event"]))
)

# if last event is null, it's the first event for that session, or if the difference is longer than our timeout period of 15 min
df = df.withColumn(
    "is_new_session",
    when(
        df["last_event"].isNull(), 1
    )
    .when(
        df["event_duration"] >= 60*15, 1
    )
    .otherwise(0)
)

# assign session ID by summing yes/no of new_session? events
df = df.withColumn(
    "user_session_id",
    sum("is_new_session").over(Window.orderBy("timestamp").partitionBy("client"))
)

In [9]:
df.createOrReplaceTempView("sessions")

### Determine the average session time

In [19]:
spark.sql(
    """
    SELECT 
        client, 
        AVG(session_length) AS average_session_length
    FROM (
        SELECT
            client,
            user_session_id,
            SUM(event_duration) AS session_length
        FROM sessions
        GROUP BY 1, 2
    ) t1
    GROUP BY 1
    """
).repartition(1).write.csv("average_session_length.csv", header=True, mode = "overwrite")

                                                                                

### Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.

In [20]:
spark.sql("""
    SELECT 
        client, 
        user_session_id, 
        COUNT(DISTINCT backend) AS unique_urls_per_session
    FROM sessions
    GROUP BY 1, 2
""").repartition(1).write.csv("unique_visits_per_session.csv", header=True, mode = "overwrite")

                                                                                

### Find the most engaged users, ie the IPs with the longest session times

In [25]:
spark.sql("""
    SELECT 
        client,
        SUM(session_length) AS total_sessions_length
     FROM (
        SELECT
            client,
            user_session_id,
            SUM(event_duration) AS session_length
        FROM sessions
        GROUP BY 1, 2
    ) t1
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 100
""").repartition(1).write.csv("100_most_engaged_users.csv", header=True, mode = "overwrite")

                                                                                