In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [9]:
csv_path = "data/e_commerce_data.csv"
csv = spark.read.csv(csv_path,inferSchema=True, header=True)

""" sql to convert
SELECT COUNT(DISTINCT user_id) AS distinct_users
FROM ecommerce_data;
"""

csv.select('user_id').distinct().count()


                                                                                

3022290

In [None]:
""" sql to convert
SELECT COUNT(DISTINCT user_id) AS purchasing_users
FROM ecommerce_data
WHERE event_type = 'purchase';
"""

csv.filter(csv.event_type == "purchase").select("user_id").distinct().count()

                                                                                

347118

In [11]:
""" sql to convert
SELECT product_id, COUNT(*) AS view_count
FROM ecommerce_data
WHERE event_type = 'view'
GROUP BY product_id
ORDER BY view_count DESC
LIMIT 5;
"""

csv.filter(csv.event_type == "view").groupBy(csv.product_id).count().withColumnRenamed(
    "count", "view_count"
).sort("view_count",ascending=False).limit(5).show()




+----------+----------+
|product_id|view_count|
+----------+----------+
|   1004856|    419287|
|   1004767|    378777|
|   1005115|    327715|
|   1004249|    207422|
|   1004833|    203018|
+----------+----------+



                                                                                

In [None]:
""" sql to convert
SELECT category_code,
       COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) * 1.0 / 
       COUNT(CASE WHEN event_type = 'view' THEN 1 END) AS conversion_rate
FROM ecommerce_data
GROUP BY category_code
ORDER BY conversion_rate DESC;
"""

df = \
csv.select(
    csv.category_code, csv.event_type.alias("purchased"), csv.event_type.alias("viewed")
)



+--------------------+---------+------+
|       category_code|purchased|viewed|
+--------------------+---------+------+
|                NULL|     view|  view|
|appliances.enviro...|     view|  view|
|furniture.living_...|     view|  view|
|  computers.notebook|     view|  view|
|electronics.smart...|     view|  view|
|   computers.desktop|     view|  view|
|                NULL|     view|  view|
|                NULL|     view|  view|
|  apparel.shoes.keds|     view|  view|
|electronics.smart...|     view|  view|
|appliances.kitche...|     view|  view|
|electronics.smart...|     view|  view|
|appliances.enviro...|     view|  view|
|                NULL|     view|  view|
|furniture.bedroom...|     view|  view|
|                NULL|     view|  view|
|electronics.video.tv|     view|  view|
|appliances.kitche...|     view|  view|
|  computers.notebook|     view|  view|
|electronics.smart...|     view|  view|
+--------------------+---------+------+
only showing top 20 rows


In [None]:
"""
SELECT user_session,
       COUNT(DISTINCT CASE WHEN event_type = 'view' THEN product_id END) AS views,
       COUNT(DISTINCT CASE WHEN event_type = 'cart' THEN product_id END) AS cart_adds,
       COUNT(DISTINCT CASE WHEN event_type = 'purchase' THEN product_id END) AS purchases
FROM ecommerce_data
GROUP BY user_session;
"""

In [None]:
"""
SELECT DATE(event_time) AS day,
       COUNT(DISTINCT user_id) AS dau
FROM ecommerce_data
GROUP BY day
ORDER BY day;
"""

In [None]:
"""
SELECT brand,
       SUM(price) AS total_revenue
FROM ecommerce_data
WHERE event_type = 'purchase'
GROUP BY brand
ORDER BY total_revenue DESC
LIMIT 10;"""

In [None]:
"""
WITH first_visit AS (
  SELECT user_id, MIN(DATE(event_time)) AS first_day
  FROM ecommerce_data
  GROUP BY user_id
),
future_visits AS (
  SELECT e.user_id, DATE(e.event_time) AS visit_day, f.first_day
  FROM ecommerce_data e
  JOIN first_visit f ON e.user_id = f.user_id
  WHERE DATE(e.event_time) > f.first_day
)
SELECT COUNT(DISTINCT user_id) AS retained_users
FROM future_visits
WHERE DATEDIFF(visit_day, first_day) <= 7;
"""

In [None]:
""" 
SELECT day,
       SUM(daily_revenue) OVER (
           ORDER BY day ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
       ) AS revenue_7_day_rolling
FROM (
  SELECT DATE(event_time) AS day,
         SUM(price) AS daily_revenue
  FROM ecommerce_data
  WHERE event_type = 'purchase'
  GROUP BY day
) t;
"""

In [None]:
"""
SELECT *
FROM (
  SELECT category_code, product_id, COUNT(*) AS view_count,
         RANK() OVER (PARTITION BY category_code ORDER BY COUNT(*) DESC) AS rank
  FROM ecommerce_data
  WHERE event_type = 'view'
  GROUP BY category_code, product_id
) t
WHERE rank = 1;

"""