In [31]:
spark.stop()

**Setting up Spark Environment**

**1\. Deploy a spark Cluster (Dataproc, EMR, HD-Insight, on-prem)**

**2\. Store Data in HDFS**

*   **Download data from Kaggle(kaggle Olist Dataset:https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce****)**

* **curl command to download dataset -**

**curl -L -o ~/Downloads/brazilian-ecommerce.zip\\ https://www.kaggle.com/api/v1/datasets/download/olistbr/brazilian-ecommerce**

*   **Unzip the file - unzip** **brazilian-ecommerce.zip -d ~**/olist/Data/
*   **Put data in HDFS - hadoop fs -put ~/olist/Data/\*.csv /data/olist/**

**3\. Use Pyspark to interact with Data**

In [2]:
from pyspark.sql import SparkSession

spark= SparkSession.builder\
.appName('Olist e-commerce')\
.master('yarn')\
.getOrCreate()

25/09/23 20:18:44 INFO SparkEnv: Registering MapOutputTracker
25/09/23 20:18:44 INFO SparkEnv: Registering BlockManagerMaster
25/09/23 20:18:44 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/09/23 20:18:44 INFO SparkEnv: Registering OutputCommitCoordinator


In [1]:
spark

In [3]:
hdfs_path='/data/olist/'

customers_df=spark.read.format('csv').option('header','true').option('inferSchema','true').load(hdfs_path+ 'olist_customers_dataset.csv')

                                                                                

In [3]:
customers_df.show(10)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|            

In [4]:
orders_df=spark.read.csv(hdfs_path+'olist_orders_dataset.csv',header=True,inferSchema=True)
order_items_df=spark.read.csv(hdfs_path+'olist_order_items_dataset.csv',header=True,inferSchema=True)
payments_df=spark.read.csv(hdfs_path+'olist_order_payments_dataset.csv',header=True,inferSchema=True)
reviews_df=spark.read.csv(hdfs_path+'olist_order_reviews_dataset.csv',header=True,inferSchema=True)
geolocation_df=spark.read.csv(hdfs_path+'olist_geolocation_dataset.csv',header=True,inferSchema=True)
sellers_df=spark.read.csv(hdfs_path+'olist_sellers_dataset.csv',header=True,inferSchema=True)
product_df=spark.read.csv(hdfs_path+'olist_products_dataset.csv',header=True,inferSchema=True)
category_translation_df=spark.read.csv(hdfs_path+'product_category_name_translation.csv',header=True,inferSchema=True)

                                                                                

In [5]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [9]:
#check data lekage or drop
print(f"Customers : {customers_df.count()} rows")




Customers : 99441 rows


                                                                                

In [10]:
print(f"Orders : {orders_df.count()} rows")

Orders : 99441 rows


In [7]:
#Null Values or duplicate values
from pyspark.sql.functions import col, when, count

customers_df.select([count(when(col(c).isNull(),1)).alias(c) for c in customers_df.columns]).show()






+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



                                                                                

In [14]:
#checking duplicate values

customers_df. groupBy('customer_id').count().filter('count>1').show()

                                                                                

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [20]:
#Customer Distribution by state

customers_df.groupBy('customer_state').count().orderBy('count', ascending=False).show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
|            SC| 3637|
|            BA| 3380|
|            DF| 2140|
|            ES| 2033|
|            GO| 2020|
|            PE| 1652|
|            CE| 1336|
|            PA|  975|
|            MT|  907|
|            MA|  747|
|            MS|  715|
|            PB|  536|
|            PI|  495|
|            RN|  485|
|            AL|  413|
+--------------+-----+
only showing top 20 rows



In [21]:
orders_df.groupBy('order_status').count().orderBy('count',ascending=False).show()



+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



                                                                                

In [6]:
#Payments
payments_df.show()


+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|        99.33|
|a9810da82917af2d9...|                 1| credit_card|                   1|        24.39|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|        65.71|
|ba78997921bbcdc13...|                 1| credit_card|                   8|       107.78|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|       128.45|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|        96.12|
|771ee386b001f0620...|                 1| credit_card|                   1|        81.16|
|3d7239c394a212faa...|                 1| credit_card|                   3|        51.84|
|1f78449c8

In [10]:
payments_df.filter(col('payment_sequential')==3).show()

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b80ba0f0c3d2a63fe...|                 3|     voucher|                   1|         2.99|
|8ca5bdac5ebe8f2d6...|                 3|     voucher|                   1|         15.0|
|608570b95a4974397...|                 3|     voucher|                   1|        12.29|
|11f97a9967bb081ad...|                 3|     voucher|                   1|        33.56|
|6a0c96e8fd6d1c42b...|                 3|     voucher|                   1|        64.25|
|9c14756d0113f66be...|                 3|     voucher|                   1|         1.39|
|a3a97bc6e236d5351...|                 3|     voucher|                   1|         6.67|
|e47a63ad65b549537...|                 3|     voucher|                   1|         20.0|
|c27a26005

In [14]:
from pyspark.sql import functions as F

payments_df.groupBy("order_id") \
    .agg(
        F.expr(
            "concat_ws(', ', transform(array_sort(collect_list(struct(payment_sequential, payment_type))), x -> x.payment_type))"
        ).alias("payment_types"),
        F.expr(
            "concat_ws(', ', transform(array_sort(collect_list(struct(payment_sequential, payment_type))), x -> cast(x.payment_sequential as string)))"
        ).alias("sequences")
    ) \
    .filter(F.size(F.collect_set("payment_type")) > 1) \
    .show(truncate=False)




+--------------------------------+-----------------------------------------------------------------+-------------------+
|order_id                        |payment_types                                                    |sequences          |
+--------------------------------+-----------------------------------------------------------------+-------------------+
|0016dfedd97fc2950e388d2971d718c7|credit_card, voucher                                             |1, 2               |
|00b4a910f64f24dbcac04fe54088a443|credit_card, voucher                                             |1, 2               |
|00bd50cdd31bd22e9081e6e2d5b3577b|credit_card, voucher, voucher                                    |1, 2, 3            |
|00c405bd71187154a7846862f585a9d4|credit_card, voucher, voucher, voucher, voucher, voucher, voucher|1, 2, 3, 4, 5, 6, 7|
|00e6bc6b166eb28b4502c1cad4457248|credit_card, voucher                                             |1, 2               |
|0218c7a4fb8d5b1bd22c82b783b8359

                                                                                

In [15]:
payments_df.groupBy('payment_type').count().orderBy('count', ascending=False).show()



+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



                                                                                

In [16]:
order_items_df.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

In [17]:
product_df.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [19]:
order_products_df=order_items_df.join(product_df,"product_id","inner")
order_products_df.show(5)

                                                                                

+--------------------+--------------------+-------------+--------------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|            order_id|order_item_id|           seller_id|shipping_limit_date|price|freight_value|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+--------------------+-------------+--------------------+-------------------+-----+-------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|4244733e06e7ecb49...|00010242fe8c5a6d1...|            1|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|           cool_stuff|                 58|         

In [20]:
#Most selling product category
order_products_df.groupBy('product_category_name').count().orderBy('count',ascending=False).show(5)



+---------------------+-----+
|product_category_name|count|
+---------------------+-----+
|      cama_mesa_banho|11115|
|         beleza_saude| 9670|
|        esporte_lazer| 8641|
|     moveis_decoracao| 8334|
| informatica_acess...| 7827|
+---------------------+-----+
only showing top 5 rows



                                                                                

In [21]:
#Top category that generates the highest revenue
from pyspark.sql.functions import sum
order_products_df.groupBy('product_category_name').agg(sum('price').alias('revenue')).orderBy('revenue',ascending=False).show(5)




+---------------------+------------------+
|product_category_name|           revenue|
+---------------------+------------------+
|         beleza_saude| 1258681.340000017|
|   relogios_presentes|1205005.6800000127|
|      cama_mesa_banho|1036988.6800000388|
|        esporte_lazer| 988048.9700000194|
| informatica_acess...| 911954.3200000152|
+---------------------+------------------+
only showing top 5 rows



                                                                                

In [20]:
#Average Delivery Analysis

orders_df.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [26]:
delivery_df=orders_df.select('order_id', 'order_purchase_timestamp', 'order_delivered_customer_date')
delivery_df.show(5)

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8...|     2018-02-13 21:18:39|          2018-02-16 18:17:02|
+--------------------+------------------------+-----------------------------+
only showing top 5 rows



In [30]:
from pyspark.sql.functions import datediff, to_date, col
delivery_details_df=delivery_df\
.withColumn('delivery_time', datediff(
    col('order_delivered_customer_date'), col('order_purchase_timestamp'))).orderBy('delivery_time', ascending=False)

delivery_details_df.show(5)

[Stage 28:>                                                         (0 + 1) / 2]

+--------------------+------------------------+-----------------------------+-------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_time|
+--------------------+------------------------+-----------------------------+-------------+
|ca07593549f1816d2...|     2017-02-21 23:31:27|          2017-09-19 14:36:39|          210|
|1b3190b2dfa9d789e...|     2018-02-23 14:57:35|          2018-09-19 23:24:07|          208|
|440d0d17af552815d...|     2017-03-07 23:59:51|          2017-09-19 15:12:50|          196|
|2fb597c2f772eca01...|     2017-03-08 18:09:02|          2017-09-19 14:33:17|          195|
|285ab9426d6982034...|     2017-03-08 22:47:40|          2017-09-19 14:00:04|          195|
+--------------------+------------------------+-----------------------------+-------------+
only showing top 5 rows



                                                                                