In [1]:
import pyspark
print(pyspark.__version__)

3.5.4


In [2]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col ,when ,first ,coalesce , count , avg , count_distinct

## Load the Data Using Spark

In [4]:
spark = SparkSession.builder.appName("PySpark Transformation ").getOrCreate()
data_path = "hdfs://172.17.0.2:9000/user/strang/ecommerce_data/2019-Oct.csv"


25/02/04 17:16:57 WARN Utils: Your hostname, Inspiron-5570 resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp3s0)
25/02/04 17:16:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 17:16:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.show()

25/02/04 17:17:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 02:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 02:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 02:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 02:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 02:00:04|      view|   1004237|2053013555631882655|electr

In [6]:
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



## Data Overview

This dataset represents customer behavior data for one month (October 2019) from a large multi-category online store, containing 42,448,762 rows and 9 columns. The dataset is valuable for implementing advanced analytics such as recommendation systems, sales forecasting, and customer segmentation. Below is a detailed overview of the data and key considerations for handling it.


## Handling Missing Values in `category_code`

Before attempting to handle the missing values, it is important to understand the approach we will take to fill the `NULL` values in the `category_code` column. The goal is to use the corresponding `category_id` to fill the missing `category_code` values. Specifically:

- If a row has a `NULL` in the `category_code` column, we will search for another row with the same `category_id` that has a non-null `category_code`.
- The non-null `category_code` value from the matching `category_id` will be used to replace the `NULL` value in the original row.

This approach assumes that rows with the same `category_id` should have the same `category_code`. However, it is important to note that this method may not work if there are `category_id` values in the dataset that **never have a corresponding `category_code`** in any row. In such cases, additional steps (e.g., filling with a default value or dropping rows) may be required.

In [7]:
# Count the number of NULL values for each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     13515609|6113008|    0|      0|           2|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



                                                                                

In [30]:
# Step 1: Create a mapping of category_id to category_code (excluding NULLs)
category_code_mapping = df.filter(col("category_code").isNotNull()) \
    .select("category_id", "category_code") \
    .dropDuplicates(["category_id"])
category_code_mapping.show()



+-------------------+--------------------+
|        category_id|       category_code|
+-------------------+--------------------+
|2053013552226107603|appliances.enviro...|
|2053013552293216471|appliances.enviro...|
|2053013552326770905|appliances.enviro...|
|2053013552351936731|appliances.enviro...|
|2053013552502931683|computers.periphe...|
|2053013552570040549|electronics.video...|
|2053013552603594983|electronics.video.tv|
|2053013552637149417|computers.periphe...|
|2053013552695869677|  electronics.tablet|
|2053013552737812719| stationery.cartrige|
|2053013552888807671|computers.periphe...|
|2053013552913973497|computers.periphe...|
|2053013552955916539|computers.periphe...|
|2053013553031414015|computers.periphe...|
|2053013553056579841|computers.periphe...|
|2053013553090134275|           kids.toys|
|2053013553140465927|           kids.toys|
|2053013553224352013|          kids.dolls|
|2053013553257906447|           kids.toys|
|2053013553316626707|    computers.ebooks|
+----------

                                                                                

In [35]:
# Rename the category_code column in the mapping DataFrame
category_code_mapping = category_code_mapping.withColumnRenamed("category_code", "category_code_mapping")
# Join the original DataFrame with the mapping
df_filled = df.join(category_code_mapping, on="category_id", how="left") \
    .withColumn("category_code", coalesce(col("category_code"), col("category_code_mapping"))) \
    .drop("category_code_mapping")  # Drop the temporary column
df_filled.show()



+-------------------+-------------------+----------+----------+--------------------+--------+-------+---------+--------------------+
|        category_id|         event_time|event_type|product_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+-------------------+----------+----------+--------------------+--------+-------+---------+--------------------+
|2053013555631882655|2019-10-01 02:00:04|      view|   1004237|electronics.smart...|   apple|1081.98|535871217|c6bd7419-2748-4c5...|
|2053013555631882655|2019-10-01 02:00:11|      view|   1004545|electronics.smart...|  huawei| 566.01|537918940|406c46ed-90a4-478...|
|2053013555631882655|2019-10-01 02:00:11|      view|   1005011|electronics.smart...| samsung| 900.64|530282093|50a293fb-5940-41b...|
|2053013555631882655|2019-10-01 02:00:19|      view|   1005135|electronics.smart...|   apple|1747.79|535871217|c6bd7419-2748-4c5...|
|2053013555631882655|2019-10-01 02:00:20|      view|   1003306|electr

                                                                                

In [36]:
null_counts = df_filled.select([_sum(col(column).isNull().cast("int")).alias(column) for column in df_filled.columns])
null_counts.show()



+-----------+----------+----------+----------+-------------+-------+-----+-------+------------+
|category_id|event_time|event_type|product_id|category_code|  brand|price|user_id|user_session|
+-----------+----------+----------+----------+-------------+-------+-----+-------+------------+
|          0|         0|         0|         0|     13515609|6113008|    0|      0|           2|
+-----------+----------+----------+----------+-------------+-------+-----+-------+------------+



                                                                                

After attempting to fill missing values in the `category_code` column using a mapping approach, it was observed that the `NULL` values remain unchanged. This indicates that the approach did not work as expected, likely due to the following reasons:

**No Matching `category_code` for Some `category_id` Values**:
   - Certain `category_id` values do not have any corresponding non-null `category_code` in the dataset, leaving the `NULL` values unfilled.

#### Next Steps
To address this issue,
**Drop Rows with Missing `category_code` and  `brand` (Optional)**:


##   Transformation & EDA

#### Why Split `event_time`?

Splitting `event_time` into **`date`**, **`weekday`**, and **`time`** provides:

1. **`date`**: Analyze daily, monthly, or yearly trends.  
2. **`weekday`**: Identify weekly patterns (e.g., weekend spikes).  
3. **`time`**: Detect hourly trends for peak activity.  

This enables granular time-series analysis for better insights and decision-making.

In [7]:
## split the event_time column into date, weekday, and time:
from pyspark.sql.functions import col, date_format, to_date, dayofweek

df= df.withColumn("date", to_date(col("event_time"))) \
                   .withColumn("weekday", date_format(col("event_time"), "EEEE")) \
                   .withColumn("time", date_format(col("event_time"), "HH:mm:ss"))

df.show()

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------+-------+--------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|      date|weekday|    time|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------+-------+--------+
|2019-10-01 02:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|2019-10-01|Tuesday|02:00:00|
|2019-10-01 02:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|2019-10-01|Tuesday|02:00:00|
|2019-10-01 02:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|2019-10-01|Tuesday|02:00:01|
|2019-10-01 02:00:01|      view|  

# Category Representation Summary

The dataset includes two types of category representations:

1. **Category ID**  
   - A numeric identifier used for indexing and efficient querying. - It is less human-readable but computationally efficient.

2. **Category Code**  
   - A hierarchical textual representation (e.g., `appliances.environment.water_heater`) offering structured insights.  
   - It is split into:  
     - **Main Category**: The top-level category (e.g., `appliances`).  
     - **Sub-Category**: More specific levels (e.g., `environment.water_heater`).  

While `category_code` provides more detailed and human-readable insights, its high count of missing values (**13515609**) limits its usability in some cases.



In [8]:
## To split the category_code column into main_category and sub_category
from pyspark.sql.functions import col, split

df= df.withColumn("main_category", split(col("category_code"), "\.").getItem(0)) \
                   .withColumn("sub_category", split(col("category_code"), "\.").getItem(1))

df.show()

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------+-------+--------+-------------+------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|      date|weekday|    time|main_category|sub_category|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----------+-------+--------+-------------+------------+
|2019-10-01 02:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|2019-10-01|Tuesday|02:00:00|         NULL|        NULL|
|2019-10-01 02:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|2019-10-01|Tuesday|02:00:00|   appliances| environment|
|2019-10-01 02:00:01|      view|  17200506|20530135597926324

In [9]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("ecommerce_data")

In [10]:
# Count the Total Number of Rows
spark.sql("""
          SELECT COUNT(*) AS total_rows 
          FROM ecommerce_data
""").show()



+----------+
|total_rows|
+----------+
|  42448764|
+----------+



                                                                                

In [11]:
# find unique values in the event_type column:
spark.sql("""
    SELECT DISTINCT event_type
    FROM ecommerce_data
""").show()



+----------+
|event_type|
+----------+
|  purchase|
|      view|
|      cart|
+----------+



                                                                                

Understand the distribution of events (e.g., views, purchases, cart additions) to identify user behavior patterns.

In [12]:
# count the number of events by type:
spark.sql("""
    SELECT event_type, COUNT(*) AS event_count
    FROM ecommerce_data
    GROUP BY event_type
""").show()



+----------+-----------+
|event_type|event_count|
+----------+-----------+
|  purchase|     742849|
|      view|   40779399|
|      cart|     926516|
+----------+-----------+



                                                                                

 Identify the most popular products based on views, which can help in inventory management and marketing strategies.

In [13]:
## Top 5 Most Viewed Products
query = """
SELECT product_id, COUNT(*) AS view_count
FROM ecommerce_data
WHERE event_type = 'view'
GROUP BY product_id
ORDER BY view_count DESC
LIMIT 5
"""
spark.sql(query).show()



+----------+----------+
|product_id|view_count|
+----------+----------+
|   1004856|    419287|
|   1004767|    378777|
|   1005115|    327715|
|   1004249|    207422|
|   1004833|    203018|
+----------+----------+



                                                                                

Identify the most expensive products in the dataset.

In [14]:
## Most Expensive Products 
query = """
SELECT product_id, MAX(price) AS max_price
FROM ecommerce_data
GROUP BY product_id
ORDER BY max_price DESC
LIMIT 10
"""
spark.sql(query).show()


[Stage 22:>                                                         (0 + 8) / 8]

+----------+---------+
|product_id|max_price|
+----------+---------+
|  21408154|  2574.07|
|  21407287|  2574.07|
|  21407288|  2574.07|
|  21408165|  2574.07|
|  21408156|  2574.07|
|  21408160|  2574.07|
|   1801658|  2574.04|
|  43600032|  2574.04|
|   1802024|  2574.04|
|   1305976|  2574.04|
+----------+---------+



                                                                                

Determine which brands have the highest user engagement (views, purchases, etc.).

In [15]:
## Top 5 Brands by Engagement
query = """
SELECT brand, COUNT(*) AS engagement_count
FROM ecommerce_data
WHERE brand IS NOT NULL
GROUP BY brand
ORDER BY engagement_count DESC
LIMIT 5
"""
spark.sql(query).show()



+-------+----------------+
|  brand|engagement_count|
+-------+----------------+
|samsung|         5282775|
|  apple|         4122554|
| xiaomi|         3083763|
| huawei|         1111205|
|lucente|          655861|
+-------+----------------+



                                                                                

In [16]:
## Revenue Generated by Each Brand
query = """
SELECT brand, SUM(price) AS total_revenue
FROM ecommerce_data
WHERE event_type = 'purchase'
GROUP BY brand
ORDER BY total_revenue DESC
"""
spark.sql(query).show()



+--------+-------------------+
|   brand|      total_revenue|
+--------+-------------------+
|   apple|1.112092688200001E8|
| samsung|4.640753261000007E7|
|  xiaomi|  9194033.289999995|
|    NULL|  8540601.030000001|
|  huawei|  4883421.739999999|
|    acer| 3576719.5200000014|
|      lg| 3387887.9600000004|
| lucente|  3124113.370000002|
|    sony| 2478196.6799999997|
|    oppo| 2412959.7600000002|
|  lenovo| 1752638.5299999993|
| indesit|         1250060.95|
|   bosch| 1248729.0899999999|
|      hp|         1227215.99|
|   artel| 1034152.4099999999|
|    asus|          970019.39|
|    beko|  963940.5899999999|
|   haier|  892047.6899999998|
|dauscher|          608437.02|
|   canon|          561658.19|
+--------+-------------------+
only showing top 20 rows



                                                                                

Analyze the total price of purchased products in each main category to understand pricing trends.

In [17]:
## Top Main Categories by Sales 
query = """
SELECT main_category, SUM(price) AS total_sales
FROM ecommerce_data
WHERE event_type = 'purchase'
GROUP BY main_category
ORDER BY total_sales DESC
"""
spark.sql(query).show()




+-------------+--------------------+
|main_category|         total_sales|
+-------------+--------------------+
|  electronics|1.7646416835999924E8|
|         NULL|2.2924937129999954E7|
|   appliances|       1.358312192E7|
|    computers|1.1378874649999999E7|
|    furniture|          1673728.99|
|         auto|          1274031.65|
| construction|   932995.0199999996|
|         kids|           678140.67|
|      apparel|   624937.7499999998|
|        sport|   322559.0000000001|
|  accessories|            68783.88|
| country_yard|  15695.449999999999|
|     medicine|            13500.42|
|   stationery|  2027.3800000000003|
+-------------+--------------------+



                                                                                

 Identify the most purchase sub-categories to focus on high-demand areas.

In [19]:
## Top Sub-Categories by purchase
query = """
SELECT sub_category, COUNT(*) AS purchase_count
FROM ecommerce_data
WHERE event_type = 'purchase' and sub_category IS NOT NULL
GROUP BY sub_category
ORDER BY purchase_count DESC
LIMIT 10
"""
spark.sql(query).show()




+------------+--------------+
|sub_category|purchase_count|
+------------+--------------+
|  smartphone|        338018|
|     kitchen|         50022|
|       audio|         35592|
|       video|         21647|
| environment|         18074|
|      clocks|         17906|
|    notebook|         15590|
| accessories|         10620|
|       tools|          7814|
|       shoes|          7082|
+------------+--------------+



                                                                                

Analyze the average price of products in each sub-category to identify premium or budget segments.


In [20]:
## Average Price by Sub-Category:
query = """
SELECT sub_category, AVG(price) AS avg_price
FROM ecommerce_data
WHERE sub_category IS NOT NULL
GROUP BY sub_category
ORDER BY avg_price DESC
"""
spark.sql(query).show()



+--------------+------------------+
|  sub_category|         avg_price|
+--------------+------------------+
|      notebook| 711.3579476329248|
|       desktop| 535.6838440559884|
|        camera|486.93246061690394|
|    smartphone| 471.9470821347332|
|         video|444.49498353008744|
|       bicycle| 400.0162413078515|
|       trainer| 396.8443941877593|
|   living_room| 372.5210671259326|
|        tablet| 341.3216198083541|
|        skates|340.33881688018073|
|    cultivator| 331.0647395543177|
|        clocks| 294.4459092562893|
|           ski|278.10828722002645|
|sewing_machine|264.99615828670477|
|       kitchen| 255.8449049242054|
|     snowboard|240.34544098054891|
|     universal| 224.7509408575158|
|    lawn_mower| 222.9096182512271|
|      carriage|199.49357347826566|
|       bedroom| 187.6288388412792|
+--------------+------------------+
only showing top 20 rows



                                                                                

Calculate the conversion rate (purchases/views) for each product to identify high-performing products.

In [21]:
query = """
SELECT product_id,
       SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchases,
       SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) AS views,
       (SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) / SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END)) AS conversion_rate
FROM ecommerce_data
GROUP BY product_id
ORDER BY conversion_rate DESC
"""
spark.sql(query).show()



+----------+---------+-----+---------------+
|product_id|purchases|views|conversion_rate|
+----------+---------+-----+---------------+
|  26008004|        1|    1|            1.0|
|  12716845|        1|    1|            1.0|
|  26015297|        1|    1|            1.0|
|  12720815|        1|    1|            1.0|
|  26010498|        1|    1|            1.0|
|  26011573|        1|    1|            1.0|
|  26001727|        1|    1|            1.0|
|  29502825|        1|    1|            1.0|
|  12901176|        1|    1|            1.0|
|  26009418|        1|    1|            1.0|
|  26016665|        1|    1|            1.0|
|  24100063|        1|    1|            1.0|
|  50300057|        1|    1|            1.0|
|  26009730|        1|    1|            1.0|
|  21405915|        1|    1|            1.0|
|  26205483|        1|    1|            1.0|
|  26006029|        1|    1|            1.0|
|  12901272|        1|    1|            1.0|
|  20500270|        1|    1|            1.0|
|  2810201

                                                                                

Understand which days of the week have the highest user activity.

In [22]:
## Day of the Week with the Most Events
query = """
SELECT weekday, COUNT(*) AS event_count
FROM ecommerce_data
GROUP BY weekday
ORDER BY event_count DESC
"""
spark.sql(query).show()





+---------+-----------+
|  weekday|event_count|
+---------+-----------+
|  Tuesday|    6793119|
|Wednesday|    6648531|
| Thursday|    6374825|
|   Sunday|    5865806|
|   Friday|    5842459|
| Saturday|    5600640|
|   Monday|    5323384|
+---------+-----------+



                                                                                

 Identify the most active users for targeted marketing or loyalty programs.

In [23]:
## Most Active Users (by Events)
query = """
SELECT user_id, COUNT(*) AS total_events
FROM ecommerce_data
GROUP BY user_id
ORDER BY total_events DESC
LIMIT 10
"""
spark.sql(query).show()


[Stage 46:>                                                         (0 + 8) / 9]

+---------+------------+
|  user_id|total_events|
+---------+------------+
|512475445|        7436|
|512365995|        4013|
|526731152|        2912|
|512505687|        2894|
|513021392|        2862|
|546159478|        2433|
|546270188|        2426|
|514649263|        2390|
|516308435|        2316|
|512401084|        2232|
+---------+------------+



                                                                                

 Identify the top-spending users for targeted retention strategies.

In [24]:
## Users Who Spent the Most
query = """
SELECT user_id, SUM(price) AS total_spent
FROM ecommerce_data
WHERE event_type = 'purchase'
GROUP BY user_id
ORDER BY total_spent DESC
LIMIT 10
"""
spark.sql(query).show()




+---------+------------------+
|  user_id|       total_spent|
+---------+------------------+
|519267944|265569.51999999996|
|513117637|          244500.0|
|515384420|         210749.77|
|530834332|187128.93000000002|
|512386086|          182470.8|
|519250600|174449.34999999998|
|513784794|143821.03000000003|
|538216048|         134344.69|
|514320330|         119103.99|
|553431815|118762.23999999998|
+---------+------------------+



                                                                                

Analyze user retention by counting the number of days each user was active.

In [25]:
query = """
SELECT user_id, COUNT(DISTINCT date) AS active_days
FROM ecommerce_data
GROUP BY user_id
ORDER BY active_days DESC
"""
spark.sql(query).show()

25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/28 19:13:19 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+-----------+
|  user_id|active_days|
+---------+-----------+
|514649263|         31|
|512873159|         31|
|512412149|         31|
|512367874|         31|
|512386086|         31|
|512475445|         31|
|545478879|         31|
|546998567|         31|
|512388419|         31|
|543712859|         31|
|555394812|         31|
|545404707|         31|
|513847031|         31|
|512414095|         31|
|512771282|         31|
|512437914|         31|
|518539155|         31|
|530265258|         31|
|546159478|         31|
|549029080|         31|
+---------+-----------+
only showing top 20 rows



                                                                                

Analyze user sessions with multiple events to understand user engagement during a single session.


In [26]:
# aflow of a random user what he looked and what bought
spark.sql("""
    SELECT *
    FROM ecommerce_data
    WHERE user_session = '3c80f0d6-e9ec-4181-8c5c-837a30be2d68'
    ORDER BY event_time
""").show()
print("the flow of a random user what he looked and what bought ")



+-------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+----------+-------+--------+-------------+------------+
|         event_time|event_type|product_id|        category_id|       category_code|brand| price|  user_id|        user_session|      date|weekday|    time|main_category|sub_category|
+-------------------+----------+----------+-------------------+--------------------+-----+------+---------+--------------------+----------+-------+--------+-------------+------------+
|2019-10-01 02:01:45|      view|   1003305|2053013555631882655|electronics.smart...|apple|612.43|551377651|3c80f0d6-e9ec-418...|2019-10-01|Tuesday|02:01:45|  electronics|  smartphone|
|2019-10-01 02:02:08|      view|   1002528|2053013555631882655|electronics.smart...|apple|643.23|551377651|3c80f0d6-e9ec-418...|2019-10-01|Tuesday|02:02:08|  electronics|  smartphone|
|2019-10-01 02:02:47|      view|   1002532|2053013555631882655|electronics.smart

                                                                                

In [48]:
# Get hourly traffic data
hourly_traffic = spark.sql("""
    SELECT 
        HOUR(time) as hour,
        COUNT(CASE WHEN event_type = 'view' THEN 1 END) as views,
        COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) as purchases
    FROM ecommerce_data
    GROUP BY HOUR(time)
    ORDER BY hour
""")

traffic_pd = hourly_traffic.toPandas()

fig = make_subplots(specs=[[{"secondary_y": True}]])

fig.add_trace(
    go.Scatter(x=traffic_pd['hour'], y=traffic_pd['views'],
               name="Views", line=dict(color='blue')),
    secondary_y=False,
)

fig.add_trace(
    go.Scatter(x=traffic_pd['hour'], y=traffic_pd['purchases'],
               name="Purchases", line=dict(color='red')),
    secondary_y=True,
)

fig.update_layout(
    title='Hourly Traffic Pattern',
    xaxis_title='Hour of Day',
    yaxis_title='Number of Views',
    yaxis2_title='Number of Purchases',
    hovermode='x unified'
)

fig.show()

In [None]:
# Get brand performance data
brand_perf = spark.sql("""
    SELECT brand,COUNT(*) as order_count,   SUM(price) as total_revenue
    FROM ecommerce_data
    WHERE brand IS NOT NULL AND event_type = 'purchase'
    GROUP BY brand
    ORDER BY total_revenue DESC
    LIMIT 5
""")

brand_pd = brand_perf.toPandas()

fig = px.bar(
    brand_pd, 
    x="brand", 
    y=["total_revenue", "order_count"], 
    title="Top Brand Performance", 
    labels={"value": "Count/Revenue", "variable": "Metric"}, 
    barmode="group",
    color_discrete_map={
        "total_revenue": "royalblue",  # Vibrant blue for total_revenue
        "order_count": "red"          # Red for order_count
    }
)

fig.show()

In [None]:
category_dist = spark.sql("""
    SELECT    main_category,  COUNT(*) as event_count,   COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage
    FROM ecommerce_data
    WHERE main_category IS NOT NULL
    GROUP BY main_category
    ORDER BY event_count DESC
""")
category_pd = category_dist.toPandas()

fig = go.Figure(data=[go.Pie(
    labels=category_pd['main_category'],
    values=category_pd['event_count'],
    hole=0.4,  # Increased donut hole size
    textinfo='label+percent',
    textposition='inside',
    textfont=dict(size=14),
    marker=dict(colors=px.colors.qualitative.Set3),  # Enhanced color palette
    rotation=90,  # Rotate to better display labels
    pull=[0.1 if i == 0 else 0 for i in range(len(category_pd))]  # Pull out largest segment
)])

fig.update_layout(
    title={ 'text': 'Product Category Distribution', 'y':0.95, 'x':0.5,  'xanchor': 'center','yanchor': 'top', 'font': dict(size=24)},
    width=1000,  height=800,  showlegend=True,
    legend=dict( orientation="v", yanchor="middle", y=0.5, xanchor="left", x=1.1,font=dict(size=14)
),
    annotations=[  dict(text='Total Events: {:,}'.format(category_pd['event_count'].sum()),  x=0.5, y=-0.1, showarrow=False, font_size=16) ]
)
fig.show()

25/01/29 01:14:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/29 01:14:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/29 01:16:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/29 01:16:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/29 01:16:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/01/29 01:16:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [42]:
activity = spark.sql("""
    SELECT   HOUR(time) as hour,  weekday,  COUNT(*) as activity_count
    FROM ecommerce_data
    GROUP BY HOUR(time), weekday
    ORDER BY hour, weekday
""")
activity_pd = activity.toPandas()
activity_pivot = activity_pd.pivot(index='hour', columns='weekday', values='activity_count')

fig = go.Figure(data=go.Heatmap( z=activity_pivot.values, x=activity_pivot.columns,  y=activity_pivot.index, colorscale='Viridis'))

fig.update_layout( title='User Activity Heatmap', xaxis_title='Day of Week', yaxis_title='Hour of Day')

fig.show()

In [43]:
# 5. Average Price Per Main Category
query = """
SELECT main_category, AVG(price) as avg_price
FROM ecommerce_data
WHERE main_category IS NOT NULL AND price IS NOT NULL
GROUP BY main_category
ORDER BY avg_price DESC
"""
avg_price_df = spark.sql(query).toPandas()

fig = px.bar(avg_price_df, x='avg_price', y='main_category', orientation='h', 
             title='Average Price Per Main Category', 
             labels={'avg_price': 'Average Price', 'main_category': 'Main Category'},
             color='avg_price')
fig.show()

In [44]:
# 7. Top 10 Brands by Event Counts
query = """
SELECT brand, COUNT(*) as count
FROM ecommerce_data
WHERE brand IS NOT NULL
GROUP BY brand
ORDER BY count DESC
LIMIT 10
"""
brand_df = spark.sql(query).toPandas()

fig = px.bar(brand_df, y='brand', x='count', orientation='h', 
             title='Top 10 Brands by Event Counts', labels={'brand': 'Brand', 'count': 'Count'},
             color='brand')
fig.show()

In [45]:
# 4. Sales Volume Over Time
query = """
SELECT date, COUNT(*) as count
FROM ecommerce_data
GROUP BY date
ORDER BY date
"""
sales_time_df = spark.sql(query).toPandas()

fig = px.line(sales_time_df, x='date', y='count', title='Sales Volume Over Time', 
              labels={'date': 'Date', 'count': 'Sales Volume'})
fig.show()

#### we need to delet all null values before we start at ml model

In [7]:
# Drop rows with any null values (adjust based on your data)
df = df.dropna()

In [8]:
print("Null values per column after cleaning:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Null values per column after cleaning:




+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|            0|    0|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



                                                                                


# Purchase Conversion Prediction Model

The steps to build a machine learning model for predicting purchase conversions using PySpark.

---

#### **1. Data Aggregation**
Grouped raw event data by `user_session` to create session-level features:
- `total_views`: Number of product views
- `total_carts`: Number of cart additions  
- `total_purchases`: Number of purchases
- `avg_price_viewed`: Average price of viewed products
- `distinct_brands`: Unique brands interacted with

In [9]:
from pyspark.sql.functions import countDistinct

In [10]:
# Aggregate data by user session (now using the cleaned DataFrame)
session_df = df.groupBy("user_session").agg(
    count(when(col("event_type") == "view", 1)).alias("total_views"),
    count(when(col("event_type") == "cart", 1)).alias("total_carts"),
    count(when(col("event_type") == "purchase", 1)).alias("total_purchases"),
    avg("price").alias("avg_price_viewed"),
    countDistinct("brand").alias("distinct_brands")
)

# Add target column (label)
session_df = session_df.withColumn("label", when(col("total_purchases") > 0, 1).otherwise(0))

# Handle nulls after aggregation (e.g., avg_price_viewed might still have nulls)
session_df = session_df.fillna(0, subset=["avg_price_viewed"])

#### Feature Engineering
Added two new ratio features:

    - view_to_cart_ratio: total_carts / (total_views + 1)

    - cart_to_purchase_ratio: total_purchases / total_carts (0 if no carts)


In [11]:
# Add view-to-cart ratio
session_df = session_df.withColumn(
    "view_to_cart_ratio", 
    col("total_carts") / (col("total_views") + 1)  # +1 to avoid division by zero
)
session_df = session_df.withColumn(
    "cart_to_purchase_ratio", 
    when(col("total_carts") > 0, col("total_purchases") / col("total_carts")).otherwise(0)
)

#### Train-Test Split

Split data into 80% `training` and 20% `testing` sets:

In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator


# Split data into training and test sets
train_df, test_df = session_df.randomSplit([0.8, 0.2], seed=42)


# Assemble features into a single vector
feature_columns = ["total_views", "total_carts", "avg_price_viewed", "distinct_brands", "view_to_cart_ratio","cart_to_purchase_ratio"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Train a Random Forest model
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, maxDepth=5)
model = rf.fit(train_df)

25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 17:28:42 WARN RowBasedKeyValueBatch: Calling spill() on

In [13]:
# Evaluate
predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC: {roc_auc}")

25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/02/04 18:08:25 WARN RowBasedKeyValueBatch: Calling spill() on

ROC AUC: 0.8568817890358003


### Results Summary

| Metric   | Score  |
|----------|--------|
| ROC AUC  | 0.8569 |

The model shows strong predictive power in distinguishing purchasing vs non-purchasing sessions.
