# E-Commerce Analytics Pipeline

## Step 1: Load & Clean Data (PySpark)

In [0]:
#load data from csv into df
orders_raw = spark.read.option("header", True).csv("/FileStore/tables/kz.csv")

orders_raw.printSchema()
orders_raw.show(5)

root
 |-- event_time: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: string (nullable = true)
 |-- user_id: string (nullable = true)

+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|          event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+--------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 11:50:...|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|16

In [0]:
from pyspark.sql.functions import col, to_date, to_timestamp

#filter data for nulls and dups, cast types
orders_cleaned = orders_raw \
    .dropna(subset=["order_id", "user_id", "event_time", "price"]) \
    .dropDuplicates(["order_id"]) \
    .withColumn("event_time", to_date(col("event_time"), "yyyy-MM-dd HH:mm:ss 'UTC'")) \
    .withColumn("price", col("price").cast("float")) 
    
orders_cleaned.printSchema()
orders_cleaned.show(5)


root
 |-- event_time: date (nullable = true)
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: string (nullable = true)

+----------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+----------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-24|2294584263154074236|2273948316817424439|2268105471367840086|                null|karcher|217.57|1515915625443148002|
|2020-04-26|2295902490203259134|2273948311742316796|2268105393848713950|appliances.kitche...|     lg|462.94|1515915625450561165|
|2020-04-27|229640048099092

## Step 2: Store as Delta Table


In [0]:
#save as delta file
orders_cleaned.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/tmp/orders_cleaned_delta")

#register as spark SQL table
spark.sql("DROP TABLE IF EXISTS orders_cleaned")
spark.sql("CREATE TABLE orders_cleaned USING DELTA LOCATION '/tmp/orders_cleaned_delta'")

Out[14]: DataFrame[]

In [0]:
%sql
SELECT * FROM orders_cleaned LIMIT 5;

event_time,order_id,product_id,category_id,category_code,brand,price,user_id
2020-04-24,2294584263154074236,2273948316817424439,2268105471367840086,,karcher,217.57,1515915625443148002
2020-04-26,2295902490203259134,2273948311742316796,2268105393848713950,appliances.kitchen.refrigerators,lg,462.94,1515915625450561165
2020-04-27,2296400480990920715,2273948308663698152,2374498914000592280,electronics.video.tv,samsung,416.64,1515915625450899340
2020-04-27,2296628237930857206,1515966223509089660,2268105410021949476,computers.components.cpu,intel,91.41,1515915625451131565
2020-04-28,2297034737199350540,1515966223509719628,2268105635507732512,,,6.94,1515915625447779982


## Step 3: Transform with SQL


In [0]:
%sql
-- query to find how much each brands sell per week
CREATE OR REPLACE VIEW weekly_brand_sales
AS
SELECT 
  WEEKOFYEAR(event_time) AS week_number,
  brand,
  COUNT(*) AS orders_count,
  ROUND(SUM(price), 2) AS total_price_sum
FROM orders_cleaned
WHERE event_time IS NOT NULL AND brand IS NOT NULL AND price IS NOT NULL
GROUP BY week_number, brand
ORDER BY week_number, brand;

In [0]:
%sql
-- query to find revenue per day
CREATE OR REPLACE VIEW daily_revenue_table AS
SELECT 
  DATE(event_time) AS order_date,
  ROUND(SUM(price), 2) AS daily_total_revenue
FROM orders_cleaned
GROUP BY DATE(event_time)
ORDER BY daily_total_revenue ASC;

In [0]:
%sql
-- query to get sales per day of week
CREATE OR REPLACE VIEW sales_by_day AS
SELECT 
  DATE_FORMAT(event_time, 'E') AS day_of_week,  
  ROUND(AVG(price), 2) AS avg_sale_price,
  COUNT(*) AS total_orders,
  ROUND(SUM(price), 2) AS total_revenue
FROM orders_cleaned
WHERE price IS NOT NULL
GROUP BY day_of_week
ORDER BY 
  CASE day_of_week  
    WHEN 'Sun' THEN 1
    WHEN 'Mon' THEN 2
    WHEN 'Tue' THEN 3
    WHEN 'Wed' THEN 4
    WHEN 'Thu' THEN 5
    WHEN 'Fri' THEN 6
    WHEN 'Sat' THEN 7
  END;