In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Olist').getOrCreate()

25/07/23 08:39:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
hdfs_path = '/data/olist/'

In [4]:
customers_df = spark.read.csv(hdfs_path+'olist_customers_dataset.csv',header=True, inferSchema=True)
products_df = spark.read.csv(hdfs_path+'olist_products_dataset.csv',header=True,inferSchema=True)
order_df = spark.read.csv(hdfs_path+'olist_orders_dataset.csv',header = True , inferSchema = True)
item_df = spark.read.csv(hdfs_path+'olist_order_items_dataset.csv',header = True, inferSchema=True)
payment_df = spark.read.csv(hdfs_path+'olist_order_payments_dataset.csv',header = True , inferSchema = True)
reviews_df = spark.read.csv(hdfs_path+'olist_order_reviews_dataset.csv',header = True , inferSchema = True)
location_df = spark.read.csv(hdfs_path+'olist_geolocation_dataset.csv',header = True , inferSchema = True)
seller_df = spark.read.csv(hdfs_path+'olist_sellers_dataset.csv',header = True , inferSchema = True)
trnaslation_df = spark.read.csv(hdfs_path+'product_category_name_translation.csv',header = True , inferSchema = True)

                                                                                

In [5]:
# caching the most used dfs
order_df.cache()
customers_df.cache()
item_df.cache()

DataFrame[order_id: string, order_item_id: int, product_id: string, seller_id: string, shipping_limit_date: timestamp, price: double, freight_value: double]

In [6]:
# all joins
order_item_joined_df = order_df.join(item_df,'order_id','inner')
order_items_products = order_item_joined_df.join(products_df,'product_id','inner')
order_item_product_seller = order_items_products.join(seller_df,'seller_id','inner')
full_orders_df = order_item_product_seller.join(customers_df,'customer_id','inner')

### A broadcast join sends the smaller table (or DataFrame) to all worker nodes, ensuring each worker node has a complete copy of the smaller table in memory. This allows the join operation to be conducted locally on each worker node, eliminating the network's data shuffle and transfer requirement.

In [7]:
from pyspark.sql.functions import *

full_orders_df =  full_orders_df.join(broadcast(location_df)\
                                      ,full_orders_df.customer_zip_code_prefix == location_df.geolocation_zip_code_prefix,'left')

In [8]:
full_orders_df = full_orders_df.join(broadcast(reviews_df),'order_id','left')

In [9]:
# last is payment one
full_orders_df = full_orders_df.join(payment_df,'order_id','left')

In [10]:
full_orders_df.cache()

25/07/23 08:40:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[order_id: string, customer_id: string, seller_id: string, product_id: string, order_status: string, order_purchase_timestamp: timestamp, order_approved_at: timestamp, order_delivered_carrier_date: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, order_item_id: int, shipping_limit_date: timestamp, price: double, freight_value: double, product_category_name: string, product_name_lenght: int, product_description_lenght: int, product_photos_qty: int, product_weight_g: int, product_length_cm: int, product_height_cm: int, product_width_cm: int, seller_zip_code_prefix: int, seller_city: string, seller_state: string, customer_unique_id: string, customer_zip_code_prefix: int, customer_city: string, customer_state: string, geolocation_zip_code_prefix: int, geolocation_lat: double, geolocation_lng: double, geolocation_city: string, geolocation_state: string, review_id: string, review_score: string, review_comment_title: string, review_commen

In [11]:
full_orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (null

In [12]:
# Top 10 most sold products
top_sold_products_df = full_orders_df.groupBy('product_id').agg(count('order_id').alias('Top_selling_products')).orderBy(desc('Top_selling_products'))

In [13]:
top_sold_products_df.show(10)



+--------------------+--------------------+
|          product_id|Top_selling_products|
+--------------------+--------------------+
|aca2eb7d00ea1a7b8...|               86740|
|422879e10f4668299...|               81110|
|99a4788cb24856965...|               78775|
|389d119b48cf3043d...|               60248|
|d1c427060a0f73f6b...|               59274|
|368c6c730842d7801...|               58358|
|53759a2ecddad2bb8...|               52654|
|53b36df67ebb7c415...|               52105|
|154e7e31ebfa09220...|               42700|
|3dd2a17168ec895c7...|               40787|
+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [15]:
# Top 10 customer by spending
top_10_spending_cust = full_orders_df.groupBy('customer_id').agg(sum('price').alias('Top_10_spending_cust'))\
.orderBy(desc('Top_10_spending_cust')).limit(10)

top_10_spending_cust.show()



+--------------------+--------------------+
|         customer_id|Top_10_spending_cust|
+--------------------+--------------------+
|d3e82ccec3cb5f956...|           6662844.0|
|df55c14d1476a9a34...|           3565657.0|
|fe5113a38e3575c04...|           3293604.0|
|ec5b2ba62e5743423...|           2556120.0|
|63b964e79dee32a35...|           2501664.0|
|46bb3c0b1a65c8399...|           2336752.0|
|05455dfa7cd02f13d...|   2160194.400000087|
|3690e975641f01bd0...|           2124498.0|
|349509b216bd5ec11...|           1923627.0|
|695476b5848d64ba0...|  1820543.1299999943|
+--------------------+--------------------+



                                                                                

## Window Functions and Ranking

In [16]:
from pyspark.sql.window import Window

In [17]:
window_spec = Window.partitionBy('seller_id').orderBy(desc('price'))

In [18]:
# Rank top selling products per seller
top_seller_products_df = full_orders_df.withColumn('rank',rank().over(window_spec)).filter(col('rank')<=5)

top_seller_products_df.select('seller_id','price','rank').show()



+--------------------+-----+----+
|           seller_id|price|rank|
+--------------------+-----+----+
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
|0015a82c2db000af6...|895.0|   1|
+--------------------+-----+----+
only showing top 20 rows



                                                                                

In [19]:
# Dense rank 
top_seller_products_densedf = full_orders_df.withColumn('dense_rank',dense_rank().over(window_spec)).filter(col('dense_rank')<=5)

top_seller_products_densedf.select('seller_id','price','dense_rank').show()

[Stage 43:>                                                         (0 + 1) / 1]

+--------------------+-----+----------+
|           seller_id|price|dense_rank|
+--------------------+-----+----------+
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
|0015a82c2db000af6...|895.0|         1|
+--------------------+-----+----------+
only showing top 20 rows



                                                                                

In [20]:
spark.stop()