<a href="https://colab.research.google.com/github/Elie-Makhoul/V-50/blob/main/BD26_code_ElieMakhoul_MichaelAoun_LennyFraise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BIG DATA 0607 Final Project
## Elie Makhoul - Michael Aoun - Lenny Fraise
### 2025 - 2026


## 1. Initialization & Data Ingestion

### 1.1 Libraries and Configuration

In [None]:
# ===============================
# PySpark imports for Big Data project
# ===============================

# Core Spark
from pyspark.sql import SparkSession, DataFrame

# SQL functions
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp, col, sum  # for datetime conversion and null checks

# Data types (schemas)
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType,
    LongType, BooleanType, TimestampType, DateType
)

# Window functions
from pyspark.sql.window import Window

# ML / Feature Engineering
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder,
    VectorAssembler, StandardScaler
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Spark config
from pyspark import SparkConf

# Utilities
import sys
import os

# Mount Google Drive folder
from google.colab import drive
drive.mount('/content/drive')

from pyspark.sql.functions import *


Mounted at /content/drive


### 1.2 Initialize Spark Session

In [None]:
spark = SparkSession.builder \
    .appName("BigDataProject") \
    .getOrCreate()

print("SparkSession created")

SparkSession created


### 1.3 Path Setup

In [None]:
# orders path
orders_filepath = "/content/drive/MyDrive/BigData/Project/data/orders.parquet"

# order_items path
order_items_filepath = "/content/drive/MyDrive/BigData/Project/data/order_items.parquet"

# products path
products_filepath = "/content/drive/MyDrive/BigData/Project/data/products.parquet"

# website_sessions path
website_sessions_filepath = "/content/drive/MyDrive/BigData/Project/data/website_sessions.parquet"

# website_pageviews path
website_pageviews_filepath = "/content/drive/MyDrive/BigData/Project/data/website_pageviews.parquet"



### 1.4 data ingestion

In [None]:
# load order
orders_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(orders_filepath)

# load order_items
order_items_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(order_items_filepath)

# load products
products_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(products_filepath)

# load website_sessions
website_sessions_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(website_sessions_filepath)

# load website_pageviews
website_pageviews_df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(website_pageviews_filepath)




### 1.5 Data Inspection

#### orders

In [None]:
# Show first 20 rows of orders_df
orders_df.show()

# Display column names and data types
orders_df.printSchema()

# Count missing values (nulls) in each column
orders_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in orders_df.columns]).show()


+--------+-------------------+------------------+-------+------------------+---------------+----------+---------+
|order_id|         created_at|website_session_id|user_id|primary_product_id|items_purchased|price_euro|cogs_euro|
+--------+-------------------+------------------+-------+------------------+---------------+----------+---------+
|       1|2022-03-19 10:42:46|                20|     20|                 1|              1|    149.99|    69.49|
|       2|2022-03-19 19:27:37|               104|    104|                 1|              1|    149.99|    69.49|
|       3|2022-03-20 06:44:45|               147|    147|                 1|              1|    149.99|    69.49|
|       4|2022-03-20 09:41:45|               160|    160|                 1|              1|    149.99|    69.49|
|       5|2022-03-20 11:28:15|               177|    177|                 1|              1|    149.99|    69.49|
|       6|2022-03-20 16:12:47|               232|    232|                 1|            

#### order_items

In [None]:
# Show first 20 rows of the order_items_df
order_items_df.show()

# Display column names and data types
order_items_df.printSchema()

# Count missing values (nulls) in each column
order_items_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in order_items_df.columns]).show()


+-------------+-------------------+--------+----------+---------------+----------+---------+
|order_item_id|         created_at|order_id|product_id|is_primary_item|price_euro|cogs_euro|
+-------------+-------------------+--------+----------+---------------+----------+---------+
|            1|2022-03-19 10:42:46|       1|         1|              1|    149.99|    69.49|
|            2|2022-03-19 19:27:37|       2|         1|              1|    149.99|    69.49|
|            3|2022-03-20 06:44:45|       3|         1|              1|    149.99|    69.49|
|            4|2022-03-20 09:41:45|       4|         1|              1|    149.99|    69.49|
|            5|2022-03-20 11:28:15|       5|         1|              1|    149.99|    69.49|
|            6|2022-03-20 16:12:47|       6|         1|              1|    149.99|    69.49|
|            7|2022-03-20 17:03:41|       7|         1|              1|    149.99|    69.49|
|            8|2022-03-20 23:35:27|       8|         1|              1

#### products

In [None]:
# Show first 20 rows of the products_df
products_df.show()

# Display column names and data types
products_df.printSchema()

# Count missing values (nulls) in each column
products_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in products_df.columns]).show()


+----------+-------------------+------------+
|product_id|         created_at|product_name|
+----------+-------------------+------------+
|         1|2022-03-19 08:00:00|    CorePack|
|         2|2023-01-06 13:00:00|TechFortress|
|         3|2023-12-12 09:00:00|     AirLite|
|         4|2024-02-05 10:00:00|    EcoShell|
+----------+-------------------+------------+

root
 |-- product_id: integer (nullable = true)
 |-- created_at: string (nullable = true)
 |-- product_name: string (nullable = true)

+----------+----------+------------+
|product_id|created_at|product_name|
+----------+----------+------------+
|         0|         0|           0|
+----------+----------+------------+



#### website_sessions

In [None]:
# Show first 20 rows of the website_sessions_df
website_sessions_df.show()

# Display column names and data types
website_sessions_df.printSchema()

# Count missing values (nulls) in each column
website_sessions_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in website_sessions_df.columns]).show()


+------------------+-------------------+-------+-----------------+----------+------------+-----------+-----------+--------------------+--------------+
|website_session_id|         created_at|user_id|is_repeat_session|utm_source|utm_campaign|utm_content|device_type|        http_referer|traffic_source|
+------------------+-------------------+-------+-----------------+----------+------------+-----------+-----------+--------------------+--------------+
|                 1|2022-03-19 08:04:16|      1|                0|   gsearch|    nonbrand|     g_ad_1|     mobile|https://www.gsear...|   paid_search|
|                 2|2022-03-19 08:16:49|      2|                0|   gsearch|    nonbrand|     g_ad_1|    desktop|https://www.gsear...|   paid_search|
|                 3|2022-03-19 08:26:55|      3|                0|   gsearch|    nonbrand|     g_ad_1|    desktop|https://www.gsear...|   paid_search|
|                 4|2022-03-19 08:37:33|      4|                0|   gsearch|    nonbrand|    

#### website_pageviews

In [None]:
# Show first 20 rows of the website_pageviews_df
website_pageviews_df.show()

# Display column names and data types
website_pageviews_df.printSchema()

# Count missing values (nulls) in each column
website_pageviews_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in website_pageviews_df.columns]).show()


+-------------------+-------------------+------------------+-------------+
|website_pageview_id|         created_at|website_session_id| pageview_url|
+-------------------+-------------------+------------------+-------------+
|                  1|2022-03-19 08:04:16|                 1|        /home|
|                  2|2022-03-19 08:16:49|                 2|        /home|
|                  3|2022-03-19 08:26:55|                 3|        /home|
|                  4|2022-03-19 08:37:33|                 4|        /home|
|                  5|2022-03-19 09:00:55|                 5|        /home|
|                  6|2022-03-19 09:05:46|                 6|        /home|
|                  7|2022-03-19 09:06:27|                 7|        /home|
|                  8|2022-03-19 09:10:08|                 6|    /products|
|                  9|2022-03-19 09:10:52|                 6|/the-corepack|
|                 10|2022-03-19 09:14:02|                 6|        /cart|
|                 11|2022

## 2 Data Preprocessing

### 2.1 orders

In [None]:
# change data type of "created_at" str to timestamp
orders_df = orders_df.withColumn("created_at", to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))

orders_df.printSchema()


root
 |-- order_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- website_session_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- primary_product_id: integer (nullable = true)
 |-- items_purchased: integer (nullable = true)
 |-- price_euro: double (nullable = true)
 |-- cogs_euro: double (nullable = true)



### 2.2 order_items

In [None]:
# change data type of "created_at" str to timestamp
order_items_df = order_items_df.withColumn("created_at", to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))

order_items_df.printSchema()


root
 |-- order_item_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- is_primary_item: integer (nullable = true)
 |-- price_euro: double (nullable = true)
 |-- cogs_euro: double (nullable = true)



### 2.3 products

In [None]:
# change data type of "created_at" str to timestamp
products_df = products_df.withColumn("created_at", to_timestamp("created_at", "yyyy-MM-dd HH:mm:ss"))

products_df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- product_name: string (nullable = true)



### 2.4 website_sessions

In [None]:
# change data type of "created_at" from str to timestamp
from pyspark.sql.functions import to_timestamp, col, when, concat, lit

website_sessions_df = website_sessions_df.withColumn("created_at",when(col("created_at").rlike(r"^\d{4}-\d{2}-\d{2}$"),  # date only
to_timestamp(concat(col("created_at"), lit(" 00:00:00")), "yyyy-MM-dd HH:mm:ss")).otherwise(to_timestamp(col("created_at"), "yyyy-MM-dd HH:mm:ss"))) # full timestamp




# drop http_referer
website_sessions_df = website_sessions_df.drop("http_referer")

website_sessions_df.printSchema()

root
 |-- website_session_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_repeat_session: integer (nullable = true)
 |-- utm_source: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_content: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- traffic_source: string (nullable = true)



### 2.5 website_pageviews

In [None]:
# change data type of "created_at" from str to timestamp
website_pageviews_df = website_pageviews_df.withColumn("created_at",when(col("created_at").rlike(r"^\d{4}-\d{2}-\d{2}$"),  # date only
to_timestamp(concat(col("created_at"), lit(" 00:00:00")), "yyyy-MM-dd HH:mm:ss")).otherwise(to_timestamp(col("created_at"), "yyyy-MM-dd HH:mm:ss")))

website_pageviews_df.printSchema()

root
 |-- website_pageview_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- website_session_id: integer (nullable = true)
 |-- pageview_url: string (nullable = true)



In [None]:
website_sessions_df.printSchema()

root
 |-- website_session_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_repeat_session: integer (nullable = true)
 |-- utm_source: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_content: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- traffic_source: string (nullable = true)



## 3. Feautre Engineering

In [None]:
# 1️⃣ Training Set (50% of timeline)
# IV (Features): Mar 1, 2022 → Jun 12, 2023 (469 days)

# Gap (2 days): Jun 13, 2023 → Jun 14, 2023

# DV (Target): Jun 15, 2023 → Aug 14, 2023 (61 days)

# (Inter-set Gap: Aug 15, 2023 → Aug 16, 2023)

# 2️⃣ Validation Set (25% of timeline)
# IV (Features): Aug 17, 2023 → Mar 6, 2024 (203 days)

# Gap (2 days): Mar 7, 2024 → Mar 8, 2024

# DV (Target): Mar 9, 2024 → May 8, 2024 (61 days)

# (Inter-set Gap: May 9, 2024 → May 10, 2024)

# 3️⃣ Testing Set (25% of timeline)
# IV (Features): May 11, 2024 → Nov 28, 2024 (202 days)

# Gap (2 days): Nov 29, 2024 → Nov 30, 2024

# DV (Target): Dec 1, 2024 → Jan 31, 2025 (62 days)

In [None]:
# group by user_id

# 1. Set the dates
train_iv_start = "2022-03-01 00:00:00"
train_iv_end   = "2023-06-12 23:59:59"

# website_sessions_training
website_sessions_training = website_sessions_df.filter((col("created_at") >= train_iv_start) &(col("created_at") <= train_iv_end))

# order_training
orders_training = orders_df.filter((col("created_at") >= train_iv_start) &(col("created_at") <= train_iv_end))

# order_items_training
order_items_training = order_items_df.filter((col("created_at") >= train_iv_start) &(col("created_at") <= train_iv_end))

# last order
last_order_training = orders_training.groupBy("user_id").agg(max("created_at").alias("last_order_date"))

# website_pageviews_training
website_pageviews_training = website_pageviews_df.filter((col("created_at") >= train_iv_start) &(col("created_at") <= train_iv_end))

# last website pageviews
last_website_pageviews = website_pageviews_training.groupBy("website_session_id").agg(max("created_at").alias("last_website_pageview_date"))


In [None]:
user_counts = website_sessions_training.groupBy("user_id").agg(count("*").alias("session_count"))
# Show only duplicates (more than 1 session per user)
duplicates = user_counts.where(col("session_count") > 1)

# duplicates.show()

In [None]:
# last_order_training.sort(col("last_order_date").asc()).show(truncate=False)

#### Init Base Table

In [None]:
# intialize base table
base_table = website_sessions_training.groupBy("user_id").agg(
    # 1. Timeline info
    max("created_at").alias("last_session_date"),

    # 2. Session IDs & Flags (associated with last session)
    max(struct("created_at", "website_session_id"))["website_session_id"].alias("last_website_session_id"),
    max(struct("created_at", "is_repeat_session"))["is_repeat_session"].alias("last_is_repeat_session"),

    # 3. UTM / Marketing Info (associated with last session)
    max(struct("created_at", "utm_source"))["utm_source"].alias("last_utm_source"),
    max(struct("created_at", "utm_campaign"))["utm_campaign"].alias("last_utm_campaign"),
    max(struct("created_at", "utm_content"))["utm_content"].alias("last_utm_content"),

    # 4. Tech & Traffic (associated with last session)
    max(struct("created_at", "device_type"))["device_type"].alias("last_device_type"),
    max(struct("created_at", "traffic_source"))["traffic_source"].alias("last_traffic_source")

)

In [None]:
base_table.count()

89951

#### Compute Recency

In [None]:
# calculate session recency from website sessions
base_table= base_table.withColumn("session_recency",datediff(lit(train_iv_end), col("last_session_date")))

# Calculate purchase recency

# merge last order to base table
base_table= base_table.join(last_order_training,on="user_id",how="left")


base_table= base_table.withColumn("purchase_recency", datediff(lit(train_iv_end), col("last_order_date")))

# base_table.sort(col("session_recency").asc(), col('purchase_recency').desc()).show(truncate=False)


# adjust column positions
base_table= base_table.select(
    "user_id", "last_session_date", "last_website_session_id","last_order_date",
    "last_is_repeat_session", "last_utm_source", "last_utm_campaign",
    "last_utm_content", "last_device_type", "last_traffic_source",
    "session_recency", "purchase_recency"
)

# impute missing values
base_table = base_table.fillna({"last_order_date": 0, "purchase_recency": -1})


#### Compute Frequency

In [None]:
# calculate session frequency

# Count sessions per user
session_freq_df = website_sessions_training.groupBy("user_id").agg(count("*").alias("session_freq"))

# Merge with base table
base_table = base_table.join(session_freq_df, on="user_id", how="left")

# calculate purchase frequency

# Count orders per user
purch_freq_df = orders_training.groupBy("user_id").agg(count("*").alias("purchase_freq"))

# Merge with base table
base_table = base_table.join(purch_freq_df, on="user_id", how="left")

#fill missing purchase counts with 0 (users with no orders)
base_table = base_table.fillna({"purchase_freq": 0, "session_freq": 0})

#### Compute purchase_session_recency_ratio

In [None]:
# compute purchase_session_recency_ratio

# compute purchase_session_recency_ratio
# 1    -> best (last purchase = last session)
# >1   -> worse (last purchase older than last session)
# -1   -> no orders

base_table = base_table.withColumn(
    "purchase_session_recency_ratio",
    when(col("purchase_recency") <= -1, -1)  # no purchase
    .when((col("purchase_recency") == 0) & (col("session_recency") == 0), 1)  # purchase = last session
    .when((col("session_recency") == 0) & (col("purchase_recency") > 0), col("purchase_recency") / (col("session_recency") + 1))  # avoid div by 0
    .otherwise(round(col("purchase_recency") / col("session_recency"),2))  # normal case
)

#### compute purchase_session_freq_ratio

In [None]:
# compute purchase_session_freq_df

# purchase/session frequency ratio:
# ~1 → high engagement (purchase aligns with session)
# <1 → visits exceed purchases (lower engagement)
base_table = base_table.withColumn(
    "purchase_session_freq_ratio",
    when(col("purchase_freq") <= 0, 0)  # no purchases → ratio = 0
    .otherwise(round(col("purchase_freq") / col("session_freq"),2))
)

#### compute is_session_recency_0

In [None]:
base_table = base_table.withColumn(
    "is_session_recency_0",
    when(col("session_recency") == 0, True).otherwise(False)
)

#### compute monetary

In [None]:
# compute monetary
monetary_df = orders_training.groupBy("user_id").agg(sum("price_euro").alias("total_revenue"),round(avg("price_euro"),2)\
.alias("avg_revenue"),max("price_euro").alias("max_order_amount"), round(sum('cogs_euro'),2).alias('total_cogs') )

# Merge with base table
base_table = base_table.join(monetary_df, on="user_id", how="left")

# fill missing monetary values with 0 (users with no orders)
base_table = base_table.fillna({"total_revenue": 0, "avg_revenue": 0, "max_order_amount": 0,"total_cogs":0})

#### Compute total number of items purchased

In [None]:
# compute monetary
total_items_purchased_df = orders_training.groupBy("user_id").agg(sum('items_purchased').alias("total_items_purchased"))

# Merge with base table
base_table = base_table.join(total_items_purchased_df, on="user_id", how="left")

# fill missing total_items_purchaed values with 0
base_table = base_table.fillna({"total_items_purchased": 0})


#### Compute last primary product id


In [None]:
# Compute last primary product per user
last_primary_product_df = orders_training.groupBy("user_id").agg(
    max(struct("created_at", "primary_product_id"))["primary_product_id"].alias("last_primary_product_id")
)

# merge with base table
base_table = base_table.join(last_primary_product_df, on="user_id", how="left")

# fill missing values
base_table = base_table.fillna({"last_primary_product_id": 0})

#### compute day of week

In [None]:
base_table = base_table.withColumn("day_of_week",
    # Check if the year is 1970 (the default/irregular year)
    when(year(col("last_order_date")) == 1970, "-1")
    # Otherwise, calculate the day of the week
    .otherwise(date_format(col("last_order_date"), "E"))
)

#### Compute time of day

In [None]:
base_table = base_table.withColumn("order_hour", hour(col("last_order_date")))

# # Apply the custom categorization
# base_table = base_table.withColumn("time_of_day",
#     when((col("order_hour") >= 8) & (col("order_hour") <= 11), "Morning")
#     .when((col("order_hour") >= 12) & (col("order_hour") <= 13), "Noon")
#     .when((col("order_hour") >= 14) & (col("order_hour") <= 17), "Afternoon")
#     .when((col("order_hour") >= 18) & (col("order_hour") <= 22), "Night")
#     # 11 PM is 23, 12 AM is 0
#     .when((col("order_hour") == 23) | (col("order_hour") == 0), "Mid-night")
#     .otherwise("Middle of night") # Covers 1 AM to 7 AM
# )

base_table = base_table.withColumn("time_of_day",
    # 1. Check for the irregular 1970 date first
    when(col("last_order_date").cast("string") == "1970-01-01 00:00:00", "-1")
    # 2. Proceed with standard logic
    .when((col("order_hour") >= 8) & (col("order_hour") <= 11), "Morning")
    .when((col("order_hour") >= 12) & (col("order_hour") <= 13), "Noon")
    .when((col("order_hour") >= 14) & (col("order_hour") <= 17), "Afternoon")
    .when((col("order_hour") >= 18) & (col("order_hour") <= 22), "Night")
    .when((col("order_hour") == 23) | (col("order_hour") == 0), "Mid-night")
    .otherwise("Middle of night")
)

base_table = base_table.drop("order_hour")

#### Compute Landing Page

In [None]:
# 1. Define a window to find the first page of EACH session (The Landing Page)
session_window = Window.partitionBy("website_session_id").orderBy("created_at")

# 2. Extract the landing page for every session
session_landing_pages = website_pageviews_training \
    .withColumn("landing_page", first("pageview_url").over(session_window)) \
    .select("website_session_id", "landing_page") \
    .distinct()

# 3. Join back to your sessions to get the user_id and session timing
user_session_landing = website_sessions_training.select("user_id", "website_session_id", "created_at") \
    .join(session_landing_pages, on="website_session_id", how="left")

# 4. Define a window to find the landing page from the LATEST session for EACH user
user_window = Window.partitionBy("user_id").orderBy(col("created_at").desc())

# 5. Get the last landing page per user
last_landing_page_df = user_session_landing \
    .withColumn("last_landing_page", first("landing_page").over(user_window)) \
    .select("user_id", "last_landing_page") \
    .distinct()

# 6. Merge this into your base_table
base_table = base_table.join(last_landing_page_df, on="user_id", how="left")

#### Compute last page url

In [None]:
# 1. Define window to find the last pageview of a session (ordered by time DESC)
last_page_window = Window.partitionBy("website_session_id").orderBy(col("created_at").desc())

# 2. Extract the last page URL for every session
session_exit_pages = website_pageviews_training \
    .withColumn("last_page_url", first("pageview_url").over(last_page_window)) \
    .select("website_session_id", "last_page_url") \
    .distinct()

# 3. Join with base_table using the 'last_website_session_id'
base_table = base_table.join(
    session_exit_pages,
    base_table.last_website_session_id == session_exit_pages.website_session_id,
    "left"
).drop("website_session_id")

#### Compute last session pageview depth

In [None]:
# compute last session pageview depth
pageview_depth_df = website_pageviews_training.groupBy("website_session_id") \
    .agg(count("website_pageview_id").alias("last_session_pageview_depth"))

base_table = base_table.join(pageview_depth_df,
                             base_table.last_website_session_id == pageview_depth_df.website_session_id,
                             "left").drop("website_session_id")



#### Compute visited cart

In [None]:
# compute visited cart
cart_visitors = website_pageviews_training.filter(col("pageview_url") == "/cart") \
    .select("website_session_id").distinct() \
    .withColumn("visited_cart", lit(1))

base_table = base_table.join(cart_visitors,
                             base_table.last_website_session_id == cart_visitors.website_session_id,
                             "left").drop("website_session_id")
base_table = base_table.fillna({"visited_cart": 0})

In [None]:
base_table

DataFrame[user_id: int, last_session_date: timestamp, last_website_session_id: int, last_order_date: timestamp, last_is_repeat_session: int, last_utm_source: string, last_utm_campaign: string, last_utm_content: string, last_device_type: string, last_traffic_source: string, session_recency: int, purchase_recency: int, session_freq: bigint, purchase_freq: bigint, purchase_session_recency_ratio: double, purchase_session_freq_ratio: double, is_session_recency_0: boolean, total_revenue: double, avg_revenue: double, max_order_amount: double, total_cogs: double, total_items_purchased: bigint, last_primary_product_id: int, day_of_week: string, time_of_day: string, last_landing_page: string, last_page_url: string, last_session_pageview_depth: bigint, visited_cart: int]

#### Compute whether Brand or not Brand

In [None]:
base_table = base_table.withColumn("is_brand_traffic",
    when(col("last_utm_campaign").contains("brand"), 1).otherwise(0))

#### Compute revenue thresholds

In [None]:
buyers_df = base_table.filter(F.col("total_revenue") > 0)

# 2. Calculate the dynamic thresholds (10%, 30%, 50%)
# p50 is your "Standard" price baseline
thresholds = buyers_df.approxQuantile("total_revenue", [0.5, 0.7, 0.9], 0.0)
p50, p70, p90 = thresholds[0], thresholds[1], thresholds[2]

# 3. Apply the logic using strictly greater than (>) for the tiers
# This ensures that if p50 and p70 are the same, they fall into "standard"
base_table = base_table.withColumn("revenue_segment",
    F.when(F.col("total_revenue") > p90, "VIP (Top 10%)")
    .when(F.col("total_revenue") >= p50, "Medium Value")
    .when(F.col("total_revenue") > 0, "Standard")
    .otherwise("No Buyer")
)

In [None]:
p90

159.99

In [None]:
base_table.groupBy("purchase_freq") \
    .count() \
    .orderBy(col("purchase_freq").asc()) \
    .show(20)

+-------------+-----+
|purchase_freq|count|
+-------------+-----+
|            0|84781|
|            1| 5120|
|            2|   48|
|            3|    2|
+-------------+-----+



#### Compute top and top 50 percent Frequency flags

In [None]:
# Top frequency = max frequency
max_freq = base_table.agg({"purchase_freq": "max"}).collect()[0][0]

# Top 50% frequency = frequency >= 2 (adjust based on your data)
base_table = base_table.withColumn(
    "is_top_freq",
    when(col("purchase_freq") == max_freq, 1).otherwise(0)
).withColumn(
    "is_top50_freq",
    when(col("purchase_freq") >= 2, 1).otherwise(0)
)

#### Compute is cart abondonment

In [None]:
# 1. Identify if a session viewed the cart and if it resulted in an order
# We check pageviews for '/cart' and join with orders to see if they finished
session_cart_info = website_pageviews_training.groupBy("website_session_id").agg(
    max(when(col("pageview_url") == "/cart", 1).otherwise(0)).alias("viewed_cart")
)

session_order_info = orders_training.select("website_session_id", lit(1).alias("placed_order"))

# 2. Combine flags to determine abandonment at the session level
# Abandonment = Viewed Cart (1) AND Placed Order is NULL
session_abandonment = session_cart_info.join(session_order_info, on="website_session_id", how="left") \
    .withColumn("is_abandoned", when((col("viewed_cart") == 1) & (col("placed_order").isNull()), 1).otherwise(0))

# 3. Join back to sessions to get user_id and session timing (similar to your step 3)
user_session_abandon = website_sessions_training.select("user_id", "website_session_id", "created_at") \
    .join(session_abandonment.select("website_session_id", "is_abandoned"), on="website_session_id", how="left") \
    .na.fill(0)

# 4. Define a window to find the latest session for EACH user (identical to your step 4)
user_window = Window.partitionBy("user_id").orderBy(col("created_at").desc())

# 5. Get the cart abandonment status from the LATEST session per user
last_cart_abandon_df = user_session_abandon \
    .withColumn("last_session_cart_abandoned", first("is_abandoned").over(user_window)) \
    .select("user_id", "last_session_cart_abandoned") \
    .distinct()

# 6. Merge this into your base_table
base_table = base_table.join(last_cart_abandon_df, on="user_id", how="left") \
    .fillna({"last_session_cart_abandoned": 0})

#### Compute start and end time per session

In [None]:
# 1. Calculate start and end time for every session
session_duration_df = website_pageviews_training.groupBy("website_session_id").agg(
    (unix_timestamp(max("created_at")) - unix_timestamp(min("created_at"))).alias("time_spent_seconds")
)

# 2. Join this with your base_table on the last_website_session_id
base_table = base_table.join(
    session_duration_df,
    base_table.last_website_session_id == session_duration_df.website_session_id,
    "left"
)

base_table = base_table.drop("website_session_id")

# 3. Fill missing values with 0
# (Sessions with only 1 pageview have 0 duration between clicks)
base_table = base_table.fillna({"time_spent_seconds": 0})

#### Compute user tenure (how long a user has been active / registered)

In [None]:
# Compute User Tenure (days since first ever session)
# 1. Find the first session date for each user
first_session_df = website_sessions_training.groupBy("user_id") \
    .agg(min("created_at").alias("first_session_date"))

# 2. Join back to base_table
base_table = base_table.join(first_session_df, on="user_id", how="left")

# 3. Calculate tenure: Difference between the snapshot date (train_iv_end) and first session
base_table = base_table.withColumn(
    "user_tenure",
    datediff(lit(train_iv_end), col("first_session_date"))
)

#### Compute difference between first and last session

In [None]:
# 1. Find the first session date for each user
first_session_df = website_sessions_training.groupBy("user_id") \
    .agg(min("created_at").alias("first_session_date"))

# 2. Join first_session_date to base_table (if not already there)
# Note: If you already joined this in the previous step, this join is redundant but safe.
# We check if the column exists to avoid ambiguity errors in some Spark versions,
# but a direct join with a selected dataframe is usually safest.
if "first_session_date" not in base_table.columns:
    base_table = base_table.join(first_session_df, on="user_id", how="left")

# 3. Calculate difference in days between last session and first session
base_table = base_table.withColumn(
    "activity_window_days",
    datediff(col("last_session_date"), col("first_session_date"))
)

In [None]:
len(base_table.columns)

38

#### Compute average time per session per user

In [None]:
# 1. Calculate duration for ALL sessions (not just the last one)
all_sessions_duration = website_pageviews_training.groupBy("website_session_id").agg(
    (unix_timestamp(max("created_at")) - unix_timestamp(min("created_at"))).alias("session_seconds")
)

# 2. Join these durations with the sessions table to get the user_id
user_sessions_duration = website_sessions_training.select("website_session_id", "user_id") \
    .join(all_sessions_duration, on="website_session_id", how="inner")

# 3. Calculate the average duration per user
avg_duration_df = user_sessions_duration.groupBy("user_id").agg(
    round(avg("session_seconds"), 2).alias("avg_session_duration")
)

# 4. Merge with base_table
base_table = base_table.join(avg_duration_df, on="user_id", how="left")

# 5. Fill missing values with 0
base_table = base_table.fillna({"avg_session_duration": 0})

#### Compute view share

In [None]:
# 1. Map session_id to user_id using the sessions table (The Bridge)
pageviews_with_user = website_pageviews_training.join(
    website_sessions_training.select("website_session_id", "user_id"),
    on="website_session_id",
    how="inner"
)

# 2. Aggregate views per user using URL keywords
# Note: Update these strings if your URLs use different names (e.g., 'corepack')
product_views_df = pageviews_with_user.groupBy("user_id").agg(
    F.count(F.when(F.col("pageview_url").contains("the-corepack"), 1)).alias("views_p1"),
    F.count(F.when(F.col("pageview_url").contains("the-techfortress"), 1)).alias("views_p2"),
    F.count(F.when(F.col("pageview_url").contains("the-airlite"), 1)).alias("views_p3"),
    F.count(F.when(F.col("pageview_url").contains("the-ecoshell"), 1)).alias("views_p4"),
    F.count("*").alias("total_pageviews")
)

# 3. Calculate the View Share Ratios (0.0 to 1.0)
view_share_final = product_views_df.withColumn(
    "p1_view_share", F.round(F.col("views_p1") / F.col("total_pageviews"), 2)
).withColumn(
    "p2_view_share", F.round(F.col("views_p2") / F.col("total_pageviews"), 2)
).withColumn(
    "p3_view_share", F.round(F.col("views_p3") / F.col("total_pageviews"), 2)
).withColumn(
    "p4_view_share", F.round(F.col("views_p4") / F.col("total_pageviews"), 2)
).select("user_id", "p1_view_share", "p2_view_share", "p3_view_share", "p4_view_share")

# 4. Cleanup & Merge into base_table
view_cols = ["p1_view_share", "p2_view_share", "p3_view_share", "p4_view_share"]
base_table = base_table.drop(*[c for c in view_cols if c in base_table.columns])

base_table = base_table.join(view_share_final, on="user_id", how="left").fillna(0, subset=view_cols)

# Out of everything this user looked at on the website, what percentage of their attention was focused on a specific product?

#### Compute cart share

In [None]:
# 1. Map session_id to user_id (The Bridge)
pv_with_user = website_pageviews_training.join(
    website_sessions_training.select("website_session_id", "user_id"),
    on="website_session_id", how="inner"
)

# 2. Use a Window to look at the NEXT pageview in the same session
window_spec = Window.partitionBy("website_session_id").orderBy("created_at")
pv_with_next_page = pv_with_user.withColumn("next_page", F.lead("pageview_url").over(window_spec))

# 3. Identify "Cart Adds": Current page is a Product, Next page is the Cart
cart_adds = pv_with_next_page.filter(F.col("next_page").contains("/cart"))

# 4. Aggregate those specific transitions per user
product_carts_df = cart_adds.groupBy("user_id").agg(
    F.count(F.when(F.col("pageview_url").contains("the-corepack"), 1)).alias("cart_p1"),
    F.count(F.when(F.col("pageview_url").contains("the-techfortress"), 1)).alias("cart_p2"),
    F.count(F.when(F.col("pageview_url").contains("the-airlite"), 1)).alias("cart_p3"),
    F.count(F.when(F.col("pageview_url").contains("the-ecoshell"), 1)).alias("cart_p4"),
    F.count("*").alias("total_cart_adds")
)

# 5. Calculate Ratios and Merge (Same as your logic)
cart_share_final = product_carts_df.withColumn(
    "p1_cart_share", F.round(F.col("cart_p1") / F.col("total_cart_adds"), 2)
).withColumn(
    "p2_cart_share", F.round(F.col("cart_p2") / F.col("total_cart_adds"), 2)
).withColumn(
    "p3_cart_share", F.round(F.col("cart_p3") / F.col("total_cart_adds"), 2)
).withColumn(
    "p4_cart_share", F.round(F.col("cart_p4") / F.col("total_cart_adds"), 2)
).select("user_id", "p1_cart_share", "p2_cart_share", "p3_cart_share", "p4_cart_share")

cart_cols = ["p1_cart_share", "p2_cart_share", "p3_cart_share", "p4_cart_share"]
base_table = base_table.drop(*[c for c in cart_cols if c in base_table.columns])
base_table = base_table.join(cart_share_final, on="user_id", how="left").fillna(0, subset=cart_cols)

In [None]:
cart_share_final.orderBy(desc("p4_cart_share")).show()

+-------+-------------+-------------+-------------+-------------+
|user_id|p1_cart_share|p2_cart_share|p3_cart_share|p4_cart_share|
+-------+-------------+-------------+-------------+-------------+
|   1238|          1.0|          0.0|          0.0|          0.0|
|   1645|          1.0|          0.0|          0.0|          0.0|
|   7880|          1.0|          0.0|          0.0|          0.0|
|   7982|          1.0|          0.0|          0.0|          0.0|
|  15619|          1.0|          0.0|          0.0|          0.0|
|  15846|          1.0|          0.0|          0.0|          0.0|
|  16386|          1.0|          0.0|          0.0|          0.0|
|  19079|          1.0|          0.0|          0.0|          0.0|
|  20735|          1.0|          0.0|          0.0|          0.0|
|  25591|          1.0|          0.0|          0.0|          0.0|
|  26583|          1.0|          0.0|          0.0|          0.0|
|  26623|          1.0|          0.0|          0.0|          0.0|
|  28577| 

In [None]:
base_table

DataFrame[user_id: int, last_session_date: timestamp, last_website_session_id: int, last_order_date: timestamp, last_is_repeat_session: int, last_utm_source: string, last_utm_campaign: string, last_utm_content: string, last_device_type: string, last_traffic_source: string, session_recency: int, purchase_recency: int, session_freq: bigint, purchase_freq: bigint, purchase_session_recency_ratio: double, purchase_session_freq_ratio: double, is_session_recency_0: boolean, total_revenue: double, avg_revenue: double, max_order_amount: double, total_cogs: double, total_items_purchased: bigint, last_primary_product_id: int, day_of_week: string, time_of_day: string, last_landing_page: string, last_page_url: string, last_session_pageview_depth: bigint, visited_cart: int, is_brand_traffic: int, revenue_segment: string, is_top_freq: int, is_top50_freq: int, last_session_cart_abandoned: int, time_spent_seconds: bigint, first_session_date: timestamp, user_tenure: int, activity_window_days: int, avg_s

In [None]:
base_table.groupBy("total_items_purchased") \
.count() \
    .orderBy(col("total_items_purchased").asc()) \
    .show(20)

+---------------------+-----+
|total_items_purchased|count|
+---------------------+-----+
|                    0|84781|
|                    1| 5120|
|                    2|   48|
|                    3|    2|
+---------------------+-----+



In [None]:
base_table.filter(col("total_items_purchased") ==3).show()

+-------+-------------------+-----------------------+-------------------+----------------------+---------------+-----------------+----------------+----------------+-------------------+---------------+----------------+------------+-------------+------------------------------+---------------------------+--------------------+-------------+-----------+----------------+----------+---------------------+-----------------------+-----------+---------------+-----------------+-------------+---------------------------+------------+----------------+---------------+-----------+-------------+---------------------------+------------------+-------------------+-----------+--------------------+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|user_id|  last_session_date|last_website_session_id|    last_order_date|last_is_repeat_session|last_utm_source|last_utm_campaign|last_utm_content|last_device_type|last_traffic_sourc

#### Compute purchased product  (bought_p1, bought_p2, bought_p3, bought_p4)

In [None]:
# 1. Bridge orders and order_items
user_product_purchases = orders_training.select("order_id", "user_id") \
    .join(order_items_training.select("order_id", "product_id"), on="order_id", how="inner")

# 2. Pivot WITH a defined list of products [1, 2, 3, 4]
# This FORCES Spark to create all 4 columns even if the data is missing
user_product_flags = user_product_purchases.groupBy("user_id") \
    .pivot("product_id", [1, 2, 3, 4]) \
    .agg(F.lit(1))

# 3. Rename columns safely
user_product_flags = user_product_flags \
    .withColumnRenamed("1", "bought_p1") \
    .withColumnRenamed("2", "bought_p2") \
    .withColumnRenamed("3", "bought_p3") \
    .withColumnRenamed("4", "bought_p4")

# 4. Cleanup & Merge into base_table
product_cols = ["bought_p1", "bought_p2", "bought_p3", "bought_p4"]
base_table = base_table.drop(*[c for c in product_cols if c in base_table.columns])

base_table = base_table.join(user_product_flags, on="user_id", how="left").fillna(0, subset=product_cols)

#### Compute user_bounce_rate
description: The percentage of this user's total historical sessions where they viewed only one page

In [None]:
# 1. Identify bounced sessions (sessions with exactly 1 pageview)
session_bounces_df = website_pageviews_training.groupBy("website_session_id") \
    .agg(count("website_pageview_id").alias("pageviews")) \
    .withColumn("is_bounce", when(col("pageviews") == 1, 1).otherwise(0))

# 2. Join bounced info with user sessions
user_bounces = website_sessions_training.select("website_session_id", "user_id") \
    .join(session_bounces_df, on="website_session_id", how="inner")

# 3. Calculate bounce rate per user (Average of is_bounce)
user_bounce_rate_df = user_bounces.groupBy("user_id").agg(
    round(avg("is_bounce"), 2).alias("user_bounce_rate")
)

# 4. Merge with base_table
base_table = base_table.join(user_bounce_rate_df, on="user_id", how="left")

# 5. Fill missing values (if any) with 0
base_table = base_table.fillna({"user_bounce_rate": 0})

#### Compute most viewed product

In [None]:
# 1. Map URLs to Product IDs
# (Filters out non-product pages like /home or /cart)
product_views_df = website_pageviews_training.withColumn(
    "viewed_product_id",
    when(col("pageview_url").contains("corepack"), 1)     # Covers /the-corepack
    .when(col("pageview_url").contains("techfortress"), 2) # Covers /techfortress
    .when(col("pageview_url").contains("airlite"), 3)      # Covers /airlite
    .when(col("pageview_url").contains("ecoshell"), 4)     # Covers /ecoshell
    .otherwise(None)
).filter(col("viewed_product_id").isNotNull())

# 2. Join with Sessions to get User ID
user_product_views = product_views_df.join(
    website_sessions_training.select("website_session_id", "user_id"),
    on="website_session_id",
    how="inner"
)

# 3. Count how many times each user viewed each product
product_counts = user_product_views.groupBy("user_id", "viewed_product_id") \
    .agg(count("*").alias("view_count"))

# 4. Select the Product ID with the highest view count per user
window_spec = Window.partitionBy("user_id").orderBy(desc("view_count"))

most_viewed_df = product_counts.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") == 1) \
    .select(col("user_id"), col("viewed_product_id").alias("most_viewed_product_id"))

# 5. Merge with base_table
base_table = base_table.join(most_viewed_df, on="user_id", how="left")

# 6. Fill missing values with 0 (No specific product viewed)
base_table = base_table.fillna({"most_viewed_product_id": 0})

In [None]:
base_table.orderBy(desc("most_viewed_product_id")).show()

+-------+-------------------+-----------------------+-------------------+----------------------+---------------+-----------------+----------------+----------------+-------------------+---------------+----------------+------------+-------------+------------------------------+---------------------------+--------------------+-------------+-----------+----------------+----------+---------------------+-----------------------+-----------+-----------+-----------------+-----------------+---------------------------+------------+----------------+---------------+-----------+-------------+---------------------------+------------------+-------------------+-----------+--------------------+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+---------+---------+---------+---------+----------------+----------------------+
|user_id|  last_session_date|last_website_session_id|    last_order_date|last_is_repeat_session|last_u

In [None]:
# 1. Bridge orders and order_items
user_product_qty = orders_training.select("order_id", "user_id") \
    .join(order_items_training.select("order_id", "product_id"), on="order_id", how="inner")

# 2. Pivot to count Quantity
# FIX: Use F.count("order_id") instead of F.count("*")
product_qty_pivot = user_product_qty.groupBy("user_id") \
    .pivot("product_id", [1, 2, 3, 4]) \
    .agg(F.count("order_id"))

# 3. Rename columns
product_qty_pivot = product_qty_pivot \
    .withColumnRenamed("1", "qty_p1_bought") \
    .withColumnRenamed("2", "qty_p2_bought") \
    .withColumnRenamed("3", "qty_p3_bought") \
    .withColumnRenamed("4", "qty_p4_bought")

# 4. Cleanup & Merge into base_table
qty_cols = ["qty_p1_bought", "qty_p2_bought", "qty_p3_bought", "qty_p4_bought"]
base_table = base_table.drop(*[c for c in qty_cols if c in base_table.columns])

base_table = base_table.join(product_qty_pivot, on="user_id", how="left") \
                       .fillna(0, subset=qty_cols)

In [None]:
base_table.filter(col("total_items_purchased") ==3).show()

+-------+-------------------+-----------------------+-------------------+----------------------+---------------+-----------------+----------------+----------------+-------------------+---------------+----------------+------------+-------------+------------------------------+---------------------------+--------------------+-------------+-----------+----------------+----------+---------------------+-----------------------+-----------+---------------+-----------------+-------------+---------------------------+------------+----------------+---------------+-----------+-------------+---------------------------+------------------+-------------------+-----------+--------------------+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+---------+---------+---------+---------+----------------+----------------------+-------------+-------------+-------------+-------------+
|user_id|  last_session_date|last_website_sess