In [0]:
# Define the file path
file_path = "/Volumes/workspace/default/ecomm_project/2019-Nov-sample.csv"

# Load CSV into a Spark DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Show the first few rows to verify
df.show(5)


+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 00:00:01|      view|   1004775|2053013555631882655|electronics.smart...|xiaomi

In [0]:
# column names and data types
df.printSchema()

# first 10 rows 
df.show(10)

# Display total rows
print(f"Total rows: {df.count()}")

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|  janome

In [0]:
from pyspark.sql.functions import col, sum
#counting nulls per column
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|         3741| 1805|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



In [0]:
#loading the sample csv file
df = spark.read.option("header", True).csv("/Volumes/workspace/default/ecomm_project/2019-Nov-sample.csv")

# dropping nulls from category_code and brand, 3741 and 1805 respectively 
df_clean = df.filter(df.category_code.isNotNull() & df.brand.isNotNull())

# Checking how many rows remain 
print(f"Rows before Cleaning: {df.count()}")
print(f"Rows after dropping nulls: {df_clean.count()}")

Rows before Cleaning: 10000
Rows after dropping nulls: 5525


In [0]:
# Reloading the CSV 
df = spark.read.option("header", True).csv("/Volumes/workspace/default/ecomm_project/2019-Nov-sample.csv")

# Dropping rows with nulls in any column
df_clean = df.dropna()

# Confirming number of rows
print("Rows after dropping nulls:", df_clean.count())


Rows after dropping nulls: 5525


In [0]:
from pyspark.sql.functions import col
# Casting selected columns to appropriate data types to ensure consistent schema
# NB: Spark reads everything as string by default when using csv()
df_typed = df_clean.select(
    col("event_time"),
    col("event_type"),
    col("product_id").cast("long"),
    col("category_id").cast("long"),
    col("category_code"),
    col("brand"),
    col("price").cast("double"),
    col("user_id").cast("long"),
    col("user_session")
)

df_typed.printSchema()
df_typed.show(5)

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_session: string (nullable = true)

+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496

## LOADING CLEANED DATA INTO A TABLE (DELTA FORM)

In [0]:
from pyspark.sql.functions import col

# Load original data
df = spark.read.option("header", True).csv("/Volumes/workspace/default/ecomm_project/2019-Nov-sample.csv")

# Clean data (remove nulls from important fields)
df_clean = df.dropna(subset=["category_code", "brand"])

# Cast data types
df_typed = df_clean.select(
    col("event_time"),
    col("event_type"),
    col("product_id").cast("long"),
    col("category_id").cast("long"),
    col("category_code"),
    col("brand"),
    col("price").cast("double"),
    col("user_id").cast("long"),
    col("user_session")
)


In [0]:
# Ensuring that a schema exists before saving tha table
spark.sql("CREATE SCHEMA IF NOT EXISTS workspace.ecomm")


DataFrame[]

In [0]:
# saving the cleaned and typed data as a delta table 
df_typed.write.format("delta").mode("overwrite").saveAsTable("workspace.ecomm.cleaned_events")


In [0]:
# Displaying the current catalog and schema to confirm the active namspace
spark.sql("SELECT current_catalog(), current_schema()").show()

+-----------------+----------------+
|current_catalog()|current_schema()|
+-----------------+----------------+
|        workspace|         default|
+-----------------+----------------+



## DATA EXPLORATION AND TRANSFORMATION

In [0]:
spark.sql("SHOW TABLES IN ecomm").show()



+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
|   ecomm|cleaned_events|      false|
+--------+--------------+-----------+



In [0]:
%sql
/* count each event type */
SELECT event_type, COUNT(*) AS total_events
FROM workspace.ecomm.cleaned_events
GROUP BY event_type
ORDER BY total_events DESC;

event_type,total_events
view,5372
cart,81
purchase,72


In [0]:
%sql
/* Product categories by purchase volume */ 
SELECT category_code, COUNT(*) AS purchase_count
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'purchase'
GROUP BY category_code
ORDER BY purchase_count DESC
LIMIT 10

category_code,purchase_count
electronics.smartphone,39
electronics.video.tv,9
electronics.audio.headphone,7
apparel.shoes.keds,3
computers.desktop,3
construction.tools.welding,2
computers.peripherals.printer,1
appliances.kitchen.blender,1
electronics.audio.subwoofer,1
construction.tools.generator,1


In [0]:
%sql
/* Revenue Analysis by brand */ 
SELECT brand, ROUND(SUM(price), 2) As total_revenue
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'purchase'
GROUP BY brand
ORDER BY total_revenue DESC
LIMIT 10;



brand,total_revenue
apple,17509.68
samsung,8114.12
netechnics,772.19
xiaomi,729.04
pulser,525.08
arg,347.49
oppo,308.45
magnetta,254.78
sv,195.35
tefal,167.29


In [0]:
%sql
/* Highest purchase volumes daily */ 
SELECT
DATE(event_time) AS date,
ROUND(SUM(price), 2) AS total_revenue
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'purchase'
GROUP BY date
ORDER BY date

date,total_revenue
2019-11-01,29989.86


In [0]:
%sql
/* Users interacted the most across all event types */ 
SELECT 
user_id,
COUNT(*) AS total_interactions
FROM workspace.ecomm.cleaned_events
GROUP BY user_id
ORDER BY total_interactions DESC
LIMIT 10;

user_id,total_interactions
534949460,59
516853100,59
513198013,44
557673447,40
517775322,34
512669930,31
515782589,31
516316320,31
560134869,28
528269305,27


In [0]:
%sql
/* Tracking which items sold the most */ 
SELECT 
product_id,
COUNT(*) AS purchase_count,
ROUND(SUM(price), 2) As total_revenue
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'purchase'
GROUP BY product_id
ORDER BY purchase_count DESC
LIMIT 10;


product_id,purchase_count,total_revenue
1801881,8,3910.4
1004856,5,642.1
4804056,3,481.71
1005105,3,4045.83
1005135,3,4996.08
1004768,3,728.16
1004750,2,390.12
1004870,2,550.5
4804295,2,45.6
1480707,1,525.08


In [0]:
%sql
/* Measuring how many users added to cart  */ 
SELECT COUNT(*) AS total_add_to_cart
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'cart';



total_add_to_cart
81


In [0]:
%sql
/* count of users who  made a purchase */
 SELECT COUNT(*) AS total_purchases
FROM workspace.ecomm.cleaned_events
WHERE event_type = 'purchase';


total_purchases
72
