For this assignment, we will use [Brazilian E-Commerce Public Dataset by Olist](https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_customers_dataset.csv) from Kaggle.

In [1]:
# Install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=644e9bb0e703add2034e845b1822f734d6c1162b65894cc4b8b29892c025d0d7
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


We will use some of the tables in the dataset:
- olist_orders_dataset
- olist_order_items_dataset
- olist_customer_dataset
- olist_products_dataset
- product_category_name

In [2]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Read CSV") \
    .getOrCreate()

# Getting olist_orders_dataset table
order_df = spark.read.csv("olist_orders_dataset.csv", header=True, inferSchema=True)

# Showing the table
order_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [3]:
# Showing the schema
order_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [4]:
 # Getting historical data of order count in each day
 from pyspark.sql.functions import date_format

history_order_df =(
    order_df
    .groupBy(date_format("order_purchase_timestamp", "yyyy-MM-dd").alias("day"))
    .count()
    .orderBy("day")
 )

history_order_df.show()
history_order_pd = history_order_df.toPandas()

+----------+-----+
|       day|count|
+----------+-----+
|2016-09-04|    1|
|2016-09-05|    1|
|2016-09-13|    1|
|2016-09-15|    1|
|2016-10-02|    1|
|2016-10-03|    8|
|2016-10-04|   63|
|2016-10-05|   47|
|2016-10-06|   51|
|2016-10-07|   46|
|2016-10-08|   42|
|2016-10-09|   26|
|2016-10-10|   39|
|2016-10-22|    1|
|2016-12-23|    1|
|2017-01-05|   32|
|2017-01-06|    4|
|2017-01-07|    4|
|2017-01-08|    6|
|2017-01-09|    5|
+----------+-----+
only showing top 20 rows



In [5]:
history_order_pd.describe()

Unnamed: 0,count
count,634.0
mean,156.847003
std,94.224817
min,1.0
25%,96.0
50%,148.0
75%,215.75
max,1176.0


In [6]:
import plotly.express as px

# Create the line chart for Order by Day
fig = px.line(history_order_pd, x="day", y="count", title="Order Count by Day")

# Show the chart
fig.show()


In [8]:
# Getting olist_customers_dataset table
customer_df = spark.read.csv("olist_customers_dataset.csv", header=True, inferSchema=True)

customer_df.show()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

Since customer_id in order table is unique (one customer has several customer_id for different orders), we need to join them with the customers table. This is done since customers table has "customer_unique_id" column that needed to identify the customer in each order.

In [11]:
# Joining order table with customers table
order_customer_df = order_df.join(customer_df, order_df["customer_id"] == customer_df["customer_id"], how="left")

order_customer_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+--------------------+------------------------+--------------------+--------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+--------------------+------------------------+--------------------+--------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-

In [25]:
# Getting the data on how many orders done by each customer
(
    order_customer_df
    .groupBy("customer_unique_id")
    .count()
    .orderBy("count", ascending = False)
    .show(30)
 )

+--------------------+-----+
|  customer_unique_id|count|
+--------------------+-----+
|8d50f5eadf50201cc...|   17|
|3e43e6105506432c9...|    9|
|6469f99c1f9dfae77...|    7|
|1b6c7548a2a1f9037...|    7|
|ca77025e7201e3b30...|    7|
|12f5d6e1cbf93dafd...|    6|
|63cfc61cee11cbe30...|    6|
|dc813062e0fc23409...|    6|
|de34b16117594161a...|    6|
|47c1a3033b8b77b3a...|    6|
|f0e310a6839dce9de...|    6|
|fe81bb32c243a86b2...|    5|
|4e65032f1f574189f...|    5|
|b4e4f24de1e8725b7...|    5|
|394ac4de8f3acb142...|    5|
|56c8638e7c058b98a...|    5|
|35ecdf6858edc6427...|    5|
|5e8f38a9a1c023f3d...|    5|
|74cb1ad7e6d567432...|    5|
|83e7958a94bd7f74a...|    4|
|f64ec6d8dd2994026...|    4|
|b896655e2083a1d76...|    4|
|c37cc6c1a59d81460...|    4|
|397b44d5bb99eabf5...|    4|
|7305430719d715992...|    4|
|6358b15ef7fa9b121...|    4|
|d3359a56d96666ac0...|    4|
|12d8b5ed661190a3a...|    4|
|ec7f1811826ab04a2...|    4|
|a1874c5550d2f0bc1...|    4|
+--------------------+-----+
only showing t

In [19]:
# Getting the data on how many orders done by customers in certain cities

  (
    order_customer_df
    .groupBy("customer_city")
    .count()
    .orderBy("count", ascending = False)
    .show()
 )

+--------------------+-----+
|       customer_city|count|
+--------------------+-----+
|           sao paulo|15540|
|      rio de janeiro| 6882|
|      belo horizonte| 2773|
|            brasilia| 2131|
|            curitiba| 1521|
|            campinas| 1444|
|        porto alegre| 1379|
|            salvador| 1245|
|           guarulhos| 1189|
|sao bernardo do c...|  938|
|             niteroi|  849|
|         santo andre|  797|
|              osasco|  746|
|              santos|  713|
|             goiania|  692|
| sao jose dos campos|  691|
|           fortaleza|  654|
|            sorocaba|  633|
|              recife|  613|
|       florianopolis|  570|
+--------------------+-----+
only showing top 20 rows



In [24]:
customer_city_order_df.tail(20)

[Row(customer_city='sao cristovao do sul', count=1),
 Row(customer_city="itaporanga d'ajuda", count=1),
 Row(customer_city='ipiranga de goias', count=1),
 Row(customer_city='ibitira', count=1),
 Row(customer_city='jardim olinda', count=1),
 Row(customer_city='lagoa da canoa', count=1),
 Row(customer_city='caxambu do sul', count=1),
 Row(customer_city='matias olimpio', count=1),
 Row(customer_city='novo planalto', count=1),
 Row(customer_city='dias d avila', count=1),
 Row(customer_city='patis', count=1),
 Row(customer_city='brejetuba', count=1),
 Row(customer_city='morro da garca', count=1),
 Row(customer_city='palmital de minas', count=1),
 Row(customer_city='delfim moreira', count=1),
 Row(customer_city='mamonas', count=1),
 Row(customer_city='monsenhor paulo', count=1),
 Row(customer_city='inhuma', count=1),
 Row(customer_city='sulina', count=1),
 Row(customer_city='desterro de entre rios', count=1)]

In [20]:
# Getting the data on how many orders done by customers in certain states

(
    order_customer_df
    .groupBy("customer_state")
    .count()
    .orderBy("count", ascending = False)
 )

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



Now, we will try to understand how the products' performance. We will use the products and product category name translation table for this purpose. We need the last table since the category name is in Brazilian Portuguese.

In [59]:
# Getting olist_products_dataset table
product_brazil = spark.read.csv("olist_products_dataset.csv", header=True, inferSchema=True)
product_english = spark.read.csv("product_category_name_translation.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("olist_order_items_dataset.csv", header=True, inferSchema=True)

# Showing the table
product_brazil.show()
product_english.show()
order_items_df.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [74]:
product_brazil.printSchema()
product_english.printSchema()
order_items_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [64]:
# Joining both of the table to get better product table
product_df = product_brazil.join(product_english, product_brazil["product_category_name"]==product_english["product_category_name"], how="left")
product_df = product_df.drop(product_brazil["product_category_name"], product_english["product_category_name"])
product_df.show()

+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name_english|
+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|1e9e8ef04dbcff454...|                 40|                       287|                 1|             225|               16|               10|              14|                    perfumery|
|3aa071139cb16b67c...|                 44|                       276|                 1|            1000|               30|               18|              20|                          art|
|96bd76ec8810374ed...|                 46|             

In [65]:
# Joining order items and product table
order_product_df = order_items_df.join(product_df, order_items_df["product_id"]==product_df["product_id"], how="left")

order_product_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|          product_id|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name_english|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|a298b9d499a8e4ba3...|            1|017692475c1c954ff...|3c7c4a49ec3c65508...|2

In [68]:
order_product_df = order_product_df.drop(product_df["product_id"])
order_product_df.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_category_name_english|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-----------------------------+
|a298b9d499a8e4ba3...|            1|017692475c1c954ff...|3c7c4a49ec3c65508...|2018-04-05 06:15:08| 13.99|         7.39|                 54|   

In [75]:
order_product_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [71]:
# Getting the count of products by the ids

(
    order_product_df
    .groupBy("product_id")
    .count()
    .orderBy("count", ascending = False)
    .show(30)
)

+--------------------+-----+
|          product_id|count|
+--------------------+-----+
|aca2eb7d00ea1a7b8...|  527|
|99a4788cb24856965...|  488|
|422879e10f4668299...|  484|
|389d119b48cf3043d...|  392|
|368c6c730842d7801...|  388|
|53759a2ecddad2bb8...|  373|
|d1c427060a0f73f6b...|  343|
|53b36df67ebb7c415...|  323|
|154e7e31ebfa09220...|  281|
|3dd2a17168ec895c7...|  274|
|2b4609f8948be1887...|  260|
|7c1bd920dbdf22470...|  231|
|a62e25e09e05e6faf...|  226|
|5a848e4ab52fd5445...|  197|
|bb50f2e236e5eea01...|  195|
|e0d64dcfaa3b6db5c...|  194|
|e53e557d5a159f5aa...|  183|
|42a2c92a0979a949c...|  183|
|b532349fe46b38fbc...|  169|
|35afc973633aaeb6b...|  165|
|a92930c327948861c...|  160|
|19c91ef95d509ea33...|  156|
|6cdd53843498f9289...|  156|
|437c05a395e9e47f9...|  155|
|f1c7f353075ce59d8...|  154|
|3fbc0ef745950c793...|  150|
|c4baedd846ed09b85...|  149|
|461f43be3bdf8844e...|  146|
|06edb72f1e0c64b14...|  143|
|7a10781637204d8d1...|  143|
+--------------------+-----+
only showing t

In [91]:
from pyspark.sql.functions import sum as spark_sum

# Getting top products with highest revenue
(
    order_product_df
    .groupBy("product_id")
    .agg(spark_sum("price").alias("total_revenue"))
    .orderBy("total_revenue", ascending=False)
    .show(30)
)


+--------------------+------------------+
|          product_id|     total_revenue|
+--------------------+------------------+
|bb50f2e236e5eea01...|           63885.0|
|6cdd53843498f9289...|54730.200000000106|
|d6160fb7873f18409...|          48899.34|
|d1c427060a0f73f6b...| 47214.50999999998|
|99a4788cb24856965...| 43025.56000000037|
|3dd2a17168ec895c7...| 41082.60000000021|
|25c38557cf793876c...| 38907.32000000001|
|5f504b3a1c75b73d6...| 37733.90000000001|
|53b36df67ebb7c415...| 37683.42000000013|
|aca2eb7d00ea1a7b8...|37608.900000000314|
|e0d64dcfaa3b6db5c...| 31786.82000000007|
|d285360f29ac7fd97...|31623.810000000012|
|7a10781637204d8d1...| 30467.50000000003|
|f1c7f353075ce59d8...|29997.360000000004|
|f819f0c84a64f02d3...|29024.480000000003|
|588531f8ec37e7d5f...|          28291.99|
|422879e10f4668299...| 26577.22000000016|
|16c4e87b98a9370a9...|           25034.0|
|5a848e4ab52fd5445...|24229.030000000075|
|a62e25e09e05e6faf...|           24051.0|
|2b4609f8948be1887...| 22717.22000

In [93]:
# Getting the count of order for each product category

(
    order_product_df
    .groupBy("product_category_name_english")
    .count()
    .orderBy("count", ascending = False)
    .show(30)
)

+-----------------------------+-----+
|product_category_name_english|count|
+-----------------------------+-----+
|               bed_bath_table|11115|
|                health_beauty| 9670|
|               sports_leisure| 8641|
|              furniture_decor| 8334|
|         computers_accesso...| 7827|
|                   housewares| 6964|
|                watches_gifts| 5991|
|                    telephony| 4545|
|                 garden_tools| 4347|
|                         auto| 4235|
|                         toys| 4117|
|                   cool_stuff| 3796|
|                    perfumery| 3419|
|                         baby| 3065|
|                  electronics| 2767|
|                   stationery| 2517|
|         fashion_bags_acce...| 2031|
|                     pet_shop| 1947|
|             office_furniture| 1691|
|                         NULL| 1627|
|               consoles_games| 1137|
|          luggage_accessories| 1092|
|         construction_tool...|  929|
|           

In [94]:
# Getting top product categories with highest revenue
(
    order_product_df
    .groupBy("product_category_name_english")
    .agg(spark_sum("price").alias("total_revenue"))
    .orderBy("total_revenue", ascending=False)
    .show(30)
)


+-----------------------------+------------------+
|product_category_name_english|     total_revenue|
+-----------------------------+------------------+
|                health_beauty|1258681.3400000238|
|                watches_gifts|1205005.6800000104|
|               bed_bath_table|1036988.6800000357|
|               sports_leisure| 988048.9700000244|
|         computers_accesso...| 911954.3200000162|
|              furniture_decor| 729762.4900000057|
|                   cool_stuff| 635290.8499999939|
|                   housewares| 632248.6600000013|
|                         auto| 592720.1099999964|
|                 garden_tools|485256.45999999496|
|                         toys| 483946.5999999932|
|                         baby|  411764.889999993|
|                    perfumery|399124.86999999464|
|                    telephony| 323667.5299999938|
|             office_furniture| 273960.7000000003|
|                   stationery| 230943.2299999985|
|                    computers|