In [None]:
pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions  import *

In [3]:
# creating a Spark session
spark = SparkSession.builder.master('local[*]').appName('E-commerce').getOrCreate()


In [4]:
spark

# Data preparation

In [None]:
# loading the dataset
df = spark.read.csv(r"2019-Nov.csv",header=True, inferSchema=True)


In [6]:
# inspect the schema
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [5]:
# checking the 5 first rows
df.show(5)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 01:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 01:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 01:00:01|      view|  17302664|2053013553853497655|                null| creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 01:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-11-01 01:00:01|      view|   1004775|2053013555631882655|electronics.smart...|xiaomi

In [5]:
# removing duplicate rows
df = df.dropDuplicates()

In [7]:
# number of rows
df.count()

67401460

In [10]:
# number of null values in each column
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     21871423|9209177|    0|      0|          10|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



# Feature engineering

In [8]:
# creating new sub-categories columns based on the column 'category_code'
df = df.withColumn('main_category', split(df['category_code'], '[.]').getItem(0)).withColumn('second_category', split(df['category_code'], '[.]').getItem(1)).withColumn('third_category', split(df['category_code'], '[.]').getItem(2)).withColumn('fourth_category', split(df['category_code'], '[.]').getItem(3)).withColumn('fifth_category', split(df['category_code'], '[.]').getItem(4))

In [9]:
df[["category_code","main_category","second_category","third_category","fourth_category","fifth_category"]].show(truncate=False)

+---------------------------------+-------------+---------------+--------------+---------------+--------------+
|category_code                    |main_category|second_category|third_category|fourth_category|fifth_category|
+---------------------------------+-------------+---------------+--------------+---------------+--------------+
|null                             |null         |null           |null          |null           |null          |
|null                             |null         |null           |null          |null           |null          |
|kids.toys                        |kids         |toys           |null          |null           |null          |
|electronics.smartphone           |electronics  |smartphone     |null          |null           |null          |
|electronics.smartphone           |electronics  |smartphone     |null          |null           |null          |
|electronics.smartphone           |electronics  |smartphone     |null          |null           |null    

In [12]:
# inspecting 'fourth_category' and 'fifth_category'
df.groupBy('fourth_category').count().show()

+---------------+--------+
|fourth_category|   count|
+---------------+--------+
|           null|67338202|
|          piano|   63258|
+---------------+--------+



In [13]:
df.groupBy('fifth_category').count().show()

+--------------+--------+
|fifth_category|   count|
+--------------+--------+
|          null|67401460|
+--------------+--------+



In [9]:
df = df.drop('fifth_category')

In [10]:
# number of views of each product

df_number_views = df.select('product_id').filter(df['event_type']=="view").groupBy('product_id').agg(count('product_id').alias('number_views'))
#df_number_views.show(5)
df = df.join(df_number_views, df.product_id == df_number_views.product_id).select(df["*"],df_number_views["number_views"])
#df.show(5)

In [11]:
# number of purchases of each product

df_number_purchases = df.select('product_id').filter(df['event_type']=="purchase").groupBy('product_id').agg(count('product_id').alias('number_purchases'))
#df_number_purchases.show(5)
df = df.join(df_number_purchases, df.product_id == df_number_purchases.product_id).select(df["*"],df_number_purchases["number_purchases"])
#df.show(5)

In [13]:
# conversion rate of each product

df = df.withColumn("conversion_rate", df.number_purchases/df.number_views) 


In [12]:
# saving the new data file
df.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata2.csv")

# Exploratory data analysis

In [14]:
# creating a new relational model for the data

# products table
products = df[['product_id','category_id','main_category','second_category','third_category','fourth_category','brand','price','number_views','number_purchases',"conversion_rate"]].drop_duplicates()

# user_sessions table
user_sessions = df[['user_session','user_id']].drop_duplicates()

# events table
events = df[['event_time','event_type','product_id','user_session']].drop_duplicates()

In [None]:
# saving the data files of the new relational model
products.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("products.csv")
user_sessions.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("user_sessions.csv")
events.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("events.csv")

In [15]:
# number of clients
number_clients = user_sessions.select(countDistinct("user_id")).collect()[0][0]
print(number_clients)


3521766


In [48]:
# number of store sessions
number_sessions = user_sessions.select(countDistinct("user_session")).collect()[0][0]
print(number_sessions)


13002010


In [42]:
# count of each event related to the users
df.groupBy('event_type').count().show()

+----------+--------+
|event_type|   count|
+----------+--------+
|  purchase|  916939|
|      view|56973940|
|      cart| 2954620|
+----------+--------+



In [16]:
# creating views of the data to use SQL queries
products.createOrReplaceTempView("df")
products.createOrReplaceTempView("products")
user_sessions.createOrReplaceTempView("user_sessions")
events.createOrReplaceTempView("events")

In [10]:
# number of main categories of product
spark.sql('SELECT COUNT(DISTINCT main_category) AS number_main_category FROM products').show()

+--------------------+
|number_main_category|
+--------------------+
|                  13|
+--------------------+



# What are the 10 most represented product categories ?

In [11]:
spark.sql('''
select
  main_category, count(main_category) AS count_main_category
from products
group by main_category
ORDER BY count_main_category  DESC
''').show(10)

+-------------+-------------------+
|main_category|count_main_category|
+-------------+-------------------+
|  electronics|              43004|
|   appliances|              41750|
|    computers|              26636|
|      apparel|              19849|
| construction|              17278|
|    furniture|              13794|
|         kids|              11275|
|  accessories|               4533|
|         auto|               3999|
|        sport|               3361|
+-------------+-------------------+
only showing top 10 rows



# What are the 10 most represented product brands ?

In [41]:
spark.sql('''
select
  brand, count(brand) AS count_brand
from products
group by brand
ORDER BY count_brand  DESC
''').show(10)


+-------+-----------+
|  brand|count_brand|
+-------+-----------+
|samsung|       5327|
| xiaomi|       4658|
|  apple|       4399|
|  bosch|       3483|
|   sony|       2248|
|     lg|       1974|
|     hp|       1398|
|philips|       1227|
|polaris|       1171|
| huawei|       1036|
+-------+-----------+
only showing top 10 rows



# What are the highest grossing products and brands ?

In [18]:
spark.sql('''
select
  p.product_id, sum(p.price) as total_revenue
from products as p
left join events as e on p.product_id = e.product_id
where e.event_type = "purchase"
group by p.product_id
ORDER BY SUM(p.price) DESC
''').show(3)

+----------+--------------------+
|product_id|       total_revenue|
+----------+--------------------+
|   1005115|1.1340981057989848E9|
|   1005105|  5.58946651429612E8|
|   1005135| 4.050233686803127E8|
|   1005116| 3.459620452602164E8|
|   1004767| 3.169759184599301E8|
|   1004249| 2.927087901905449E8|
|   1004856|2.4265863417013538E8|
|   1004870| 2.385683392305557E8|
|   1002544| 2.117716547197075E8|
|   1005129|1.6546518812991145E8|
+----------+--------------------+
only showing top 10 rows



In [28]:
spark.sql('''
select
  p.brand, sum(p.price) as total_revenue
from products as p
left join events as e on p.product_id = e.product_id
where e.event_type = "purchase" and p.brand is not null
group by p.brand
ORDER BY SUM(p.price) DESC
''').show()

+--------+--------------------+
|   brand|       total_revenue|
+--------+--------------------+
|   apple| 5.439634138695442E9|
| samsung|2.0419447756527982E9|
|  xiaomi| 4.636368421698097E8|
|  huawei| 1.841406506499868E8|
|      lg| 7.391041067999822E7|
|    sony|3.7825916060000986E7|
|    oppo|2.6012030710000783E7|
|   artel| 2.491140440000656E7|
|    acer| 2.304527395999945E7|
|   bosch| 1.570311451000006E7|
|  lenovo| 1.398878022000004E7|
| indesit|1.2443717180000056E7|
|    beko|    9721496.09000004|
|  navien|   9102265.089999808|
|    asus|   6982373.899999894|
|dauscher|    6285338.72999997|
|   midea|   5317062.529999789|
|      hp|   4564293.309999909|
|  janome|   4449710.939999945|
| karcher|   4431845.360000005|
+--------+--------------------+
only showing top 20 rows



In [25]:
spark.sql('''
select distinct product_id, brand, main_category
from products
where product_id = 1005115 or product_id = 1005105 or product_id = 1005135
''').show()

+----------+-----+-------------+
|product_id|brand|main_category|
+----------+-----+-------------+
|   1005135|apple|  electronics|
|   1005105|apple|  electronics|
|   1005115|apple|  electronics|
+----------+-----+-------------+



# What are the brands with the highest average conversion rates ? 

In [26]:
products.groupBy('brand').agg(mean('conversion_rate').alias('conversion_rate_brand')).orderBy('conversion_rate_brand', ascending=False).show()

+----------------+---------------------+
|           brand|conversion_rate_brand|
+----------------+---------------------+
|          zvezda|   0.3333333333333333|
|     pixiebelles|                 0.25|
|        yoshioki|  0.22802197802197804|
|          kayser|                  0.2|
|           denso|                  0.2|
|           jetem|                  0.2|
|          yokito|   0.1989247311827957|
|           lamel|  0.18181818181818182|
|           freya|   0.1715686274509804|
|       superfine|  0.16666666666666666|
|           vasin|  0.16260987153482082|
|      florentina|  0.15512820512820513|
|           eksmo|  0.15269663916813578|
|           moose|  0.14583333333333334|
|           qplay|  0.14285714285714285|
|          kumano|  0.13392857142857142|
|  prajm-evroznak|                0.125|
|        likebook|                0.125|
|alexandermcqueen|                0.125|
|          vasden|                0.125|
+----------------+---------------------+
only showing top

# Average order value

In [34]:
# total number of orders
number_orders = df.filter(df['event_type']=="purchase").select(countDistinct("user_session")).collect()[0][0]
print(number_orders)

773214


In [38]:
# total revenue
total_revenue = df.filter(df['event_type']=="purchase").agg(sum('price')).collect()[0][0]
print(total_revenue)

275194890.4999921


In [40]:
average_order_value = total_revenue/number_orders
print(average_order_value)

355.9103825072904


# Average number of items per order

In [45]:
#df.filter(df['event_type']=="purchase").groupBy('user_session').agg(count('product_id').alias('number_items')).show()

+--------------------+------------+
|        user_session|number_items|
+--------------------+------------+
|6dceba3d-f2be-4a0...|           1|
|5a335441-d095-4de...|           1|
|f95850c8-4618-428...|           1|
|e701954f-6f16-48e...|           1|
|0a25d210-9e7e-4d0...|           1|
|53583d8e-c026-433...|           1|
|01bb1857-393f-456...|           1|
|77b1d2b6-a3b8-4ee...|           3|
|e929197c-e1be-44d...|           1|
|c2cae34d-ef2c-428...|           1|
|ed68b468-8f3f-450...|           1|
|2f49325e-57e2-4fe...|           1|
|3779910e-a0e2-436...|           1|
|3191f50b-88bb-471...|           1|
|87cd2b19-5644-48b...|           2|
|d2eeaf0b-f683-404...|           2|
|9629a3c3-c6ad-408...|           1|
|1ab43d26-65b7-465...|           1|
|218ca27d-8339-4d6...|           1|
|1667d4a3-2194-46c...|           1|
+--------------------+------------+
only showing top 20 rows



In [47]:
df.filter(df['event_type']=="purchase").groupBy('user_session').agg(count('product_id').alias('number_items')).select(mean('number_items').alias('mean_number_items')).show()

+------------------+
| mean_number_items|
+------------------+
|1.1858799763066887|
+------------------+

