# 1. Data Extraction

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('olist_project') \
    .getOrCreate()

spark

<pyspark.sql.connect.session.SparkSession at 0x7fdca9498df0>

## Linking Azure Data Lake Gen2

In [0]:

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

## Reading Datasets From ADLS Gen2

In [0]:

storage_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/bronze/"

In [0]:
customer_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'customers.csv')
orders_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'orders.csv')
items_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'items.csv')
payments_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'payments.csv')
location_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'location.csv') 
products_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'products.csv') 
sellers_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'sellers.csv')
review_df = spark.read.format("csv").option('inferSchema',True).option('header',True).load(storage_path + 'reviews.csv')


# 2. Data Exploration / EDA

## Data Model

![Data Model](https://i.imgur.com/HRhd2Y0.png)

In [0]:
def print_schema(df, name):
    if df is None:
        print(f"{name} is None. Cannot print schema.")
    else:
        print(f"Schema for {name}:")
        df.printSchema()

In [0]:
print_schema(customer_df,'customer dataset')
print_schema(orders_df,'orders dataset')
print_schema(items_df,'items dataset')
print_schema(payments_df,'payments dataset')
print_schema(location_df,'location dataset')
print_schema(products_df,'products dataset') 
print_schema(sellers_df,'sellers dataset')
print_schema(review_df,'review dataset')
print_schema(payments_df,'payments dataset')

Schema for customer dataset:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)

Schema for orders dataset:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

Schema for items dataset:
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- pri

# 3. Data Validation / Quality Analysis

In [0]:
from pyspark.sql.functions import *

def data_validation(df,name):
    total_count = df.count()
    unique = df.dropDuplicates().count()
    dedup = total_count-unique

    print(name)
    print(f"Total Count : {total_count}")
    print(f"Total Duplicates : {dedup}")
    print(f"Total Nulls : ")
    
    null_count = df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show(vertical=True)

In [0]:
data_validation(customer_df,'customer dataset')
data_validation(orders_df,'orders dataset')
data_validation(items_df,'items dataset')
data_validation(payments_df,'payments dataset')
data_validation(location_df,'location dataset')
data_validation(products_df,'products dataset') 
data_validation(sellers_df,'sellers dataset')
data_validation(review_df,'review dataset')
data_validation(payments_df,'payments dataset')

customer dataset
Total Count : 99441
Total Duplicates : 0
Total Nulls : 
-RECORD 0-----------------------
 customer_id              | 0   
 customer_unique_id       | 0   
 customer_zip_code_prefix | 0   
 customer_city            | 0   
 customer_state           | 0   

orders dataset
Total Count : 99441
Total Duplicates : 0
Total Nulls : 
-RECORD 0-----------------------------
 order_id                      | 0    
 customer_id                   | 0    
 order_status                  | 0    
 order_purchase_timestamp      | 0    
 order_approved_at             | 160  
 order_delivered_carrier_date  | 1783 
 order_delivered_customer_date | 2965 
 order_estimated_delivery_date | 0    

items dataset
Total Count : 112650
Total Duplicates : 0
Total Nulls : 
-RECORD 0------------------
 order_id            | 0   
 order_item_id       | 0   
 product_id          | 0   
 seller_id           | 0   
 shipping_limit_date | 0   
 price               | 0   
 freight_value       | 0   

payments 

# 4. Data Curation

## Analysing Required Columns

In [0]:
order = orders_df.select('order_id','order_status','order_purchase_timestamp','order_delivered_customer_date','order_estimated_delivery_date','customer_id')

customer = customer_df.select('customer_id','customer_city','customer_state')

item = items_df.select("order_id","product_id","seller_id","price","freight_value")

payment = payments_df.select('order_id','payment_type','payment_value')

product = products_df.select('product_id','product_category_name')

review = review_df.select("order_id",'review_score')

## Joining Dataset

In [0]:
from pyspark.sql.functions import broadcast

final_df = order.join(broadcast(customer),on='customer_id',how='left') \
                .join(broadcast(review),on='order_id',how='left') \
                .join(payment,on='order_id',how='left') \
                .join(item,on='order_id',how='left') \
                .join(broadcast(product),on='product_id',how='left')

In [0]:
final_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- product_category_name: string (nullable = true)



# 5. Optimization : Caching on final_df

In [0]:
final_df.cache()

DataFrame[product_id: string, order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: timestamp, order_delivered_customer_date: timestamp, order_estimated_delivery_date: timestamp, customer_city: string, customer_state: string, review_score: string, payment_type: string, payment_value: double, seller_id: string, price: double, freight_value: double, product_category_name: string]

In [0]:
final_df.columns

['product_id',
 'order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'customer_city',
 'customer_state',
 'review_score',
 'payment_type',
 'payment_value',
 'seller_id',
 'price',
 'freight_value',
 'product_category_name']

# 6. Re-Validation on Merged Dataset

In [0]:
data_validation(final_df,"Merged Dataset")

Merged Dataset
Total Count : 119143
Total Duplicates : 11696
Total Nulls : 
-RECORD 0-----------------------------
 product_id                    | 833  
 order_id                      | 0    
 customer_id                   | 0    
 order_status                  | 0    
 order_purchase_timestamp      | 0    
 order_delivered_customer_date | 3421 
 order_estimated_delivery_date | 0    
 customer_city                 | 0    
 customer_state                | 0    
 review_score                  | 997  
 payment_type                  | 3    
 payment_value                 | 3    
 seller_id                     | 833  
 price                         | 833  
 freight_value                 | 833  
 product_category_name         | 2542 



# 7. Data Transformation

## Renaming Columns 

In [0]:
final_df = final_df.withColumnRenamed("order_purchase_timestamp","purchase_ts")
final_df = final_df.withColumnRenamed("order_delivered_customer_date","delivered_date")
final_df = final_df.withColumnRenamed("order_estimated_delivery_date","est_delivery")
final_df = final_df.withColumnRenamed("product_category_name","category")
final_df = final_df.withColumnRenamed("payment_type","pay_type")
final_df = final_df.withColumnRenamed("seller_id","seller")
final_df = final_df.withColumnRenamed("product_id","product")
final_df = final_df.withColumnRenamed("freight_value","shipping_fee")

In [0]:
final_df.printSchema()

root
 |-- product: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- purchase_ts: timestamp (nullable = true)
 |-- delivered_date: timestamp (nullable = true)
 |-- est_delivery: timestamp (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- pay_type: string (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- seller: string (nullable = true)
 |-- price: double (nullable = true)
 |-- shipping_fee: double (nullable = true)
 |-- category: string (nullable = true)



## Structuring Columns

In [0]:
final_df.select('review_score','payment_value','price','shipping_fee').show(10)

+------------+-------------+-----+------------+
|review_score|payment_value|price|shipping_fee|
+------------+-------------+-----+------------+
|           4|        18.59|29.99|        8.72|
|           4|          2.0|29.99|        8.72|
|           4|        18.12|29.99|        8.72|
|           4|       141.46|118.7|       22.76|
|           5|       179.12|159.9|       19.22|
|           5|         72.2| 45.0|        27.2|
|           5|        28.62| 19.9|        8.72|
|           4|       175.26|147.9|       27.36|
|           2|        65.95| 49.9|       16.05|
|           5|        75.16|59.99|       15.17|
+------------+-------------+-----+------------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import *

final_df = final_df.withColumn('review_score',col('review_score').cast('Integer'))

In [0]:
final_df.printSchema()

root
 |-- product: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- purchase_ts: timestamp (nullable = true)
 |-- delivered_date: timestamp (nullable = true)
 |-- est_delivery: timestamp (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- pay_type: string (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- seller: string (nullable = true)
 |-- price: double (nullable = true)
 |-- shipping_fee: double (nullable = true)
 |-- category: string (nullable = true)



## Data Cleaning

In [0]:
final_df = final_df.dropDuplicates()

In [0]:
data_validation(final_df,'Deduped Data')

Deduped Data
Total Count : 107447
Total Duplicates : 0
Total Nulls : 
-RECORD 0--------------
 product        | 812  
 order_id       | 0    
 customer_id    | 0    
 order_status   | 0    
 purchase_ts    | 0    
 delivered_date | 3153 
 est_delivery   | 0    
 customer_city  | 0    
 customer_state | 0    
 review_score   | 847  
 pay_type       | 1    
 payment_value  | 1    
 seller         | 812  
 price          | 812  
 shipping_fee   | 812  
 category       | 2341 



# 8. Data Serving to Silver Layer

In [0]:
silver_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/silver/"

In [0]:
# Storing Transformed dataset into Silver layer

final_df.write \
    .mode("overwrite") \
    .format('delta') \
    .save(silver_path)

# 9 Feature Engineering

In [0]:
# filling nulls with mean values on price , payment_value and shipping_fee

from pyspark.sql.functions import col

def impute_with_mean(df, columns):
    for c in columns:
        mean_val = df.selectExpr(f"avg({c}) as mean").collect()[0]['mean']
        df = df.fillna({c: mean_val})
    return df

final_df = impute_with_mean(final_df, ['price', 'payment_value', 'shipping_fee','review_score'])

In [0]:
final_df.printSchema()

root
 |-- product: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- purchase_ts: timestamp (nullable = true)
 |-- delivered_date: timestamp (nullable = true)
 |-- est_delivery: timestamp (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- pay_type: string (nullable = true)
 |-- payment_value: double (nullable = false)
 |-- seller: string (nullable = true)
 |-- price: double (nullable = false)
 |-- shipping_fee: double (nullable = false)
 |-- category: string (nullable = true)



In [0]:
# filling nulls on delivered_date with estimated delivery date

from pyspark.sql.functions import when, col

final_df = final_df.withColumn(
    "delivered_date",
    when(col("delivered_date").isNull(), col("est_delivery")).otherwise(col("delivered_date"))
)

In [0]:
# Extracting Year From Pruchase Date

final_df.select(min('purchase_ts'),max('purchase_ts')).show()

+-------------------+-------------------+
|   min(purchase_ts)|   max(purchase_ts)|
+-------------------+-------------------+
|2016-09-04 21:15:19|2018-10-17 17:30:18|
+-------------------+-------------------+



In [0]:
final_df = final_df.withColumn(
    "year", \
    year(col("purchase_ts"))    
)

In [0]:
# finding Delivery Gap (delivery_gap = delivered_date - purchase_date)

from pyspark.sql.functions import datediff, col

final_df = final_df.withColumn(
    "delivery_gap",
    datediff(col("delivered_date").cast("date"), col("purchase_ts").cast("date"))
)

# 10. Business Insights

In [0]:
# Analysing Top 10 Customers based on Total Spent

top_cutomers = final_df.groupBy("customer_id")\
    .agg(sum('payment_value').alias('total_spent')) \
    .orderBy(desc('total_spent')) \
    .limit(10) 
display(top_cutomers)

customer_id,total_spent
71901689c5f3e5adc27b1dd16b33f0b8,15978.65
1617b1357756262bfa56ab541c47bc16,13664.08
0d861a5e4dd6a9079d89e1330848f0ab,9258.24
cc803a2c412833101651d3f90ca7de24,8891.0
30bb84b541c96af98ba7d90b9ebf35d0,8697.99
5e312b0d9bf5c91599eb42120e633b19,8530.14
ec5b2ba62e574342386871631fafd3fc,7274.88
c6e2731c5b391845f6800c97401a43a9,6929.31
f48d464a0baaea338cb25f816991ab1f,6922.21
3fd6777bbce08a352fddd04e4a7cc8f6,6726.66


Databricks visualization. Run in Databricks to view.

In [0]:
# Top Sellers Based on Revenue Generated

top_sellers = final_df.groupBy('seller') \
    .agg(sum('price').alias('total_revenue')) \
    .orderBy(desc('total_revenue')) \
    .limit(10)
display(top_sellers)

seller,total_revenue
4869f7a5dfa277a7dca6462dcf3b52b2,235266.04000000047
53243585a1d6dc2643021fd1853d8905,222269.31000000008
4a3ca9315b744ce9f8e9374361493884,203791.87000000128
fa1c13f2614d7b5c4749cbc52fecda94,203789.32000000044
7e93a43ef30c4f03f38b393420bc753a,182594.21
da8622b14eb17ae2831f4ac5b9dab84a,155550.8800000011
7c67e1448b00f6e969d365cea6b010ab,150306.55000000022
7a67c85e85bb2ce8582c35f2203ad736,148941.0000000005
46dc3b2cc0980fb8ec44634e21d2718e,130422.15000000034
6560211a19b47992c3666cc44a7e94c0,125397.81


Databricks visualization. Run in Databricks to view.

In [0]:
# Top Sellers Based on Reviews

top_sellers_by_review = final_df.groupBy('seller') \
    .agg(sum('review_score').alias('total_score')) \
    .orderBy(desc('total_score')) \
    .limit(10)
display(top_sellers_by_review)

seller,total_score
6560211a19b47992c3666cc44a7e94c0,8067
4a3ca9315b744ce9f8e9374361493884,7745
cc419e0650a3c5ba77189a1882b7556a,7267
1f50f920176fa81dab994f9023523100,6273
da8622b14eb17ae2831f4ac5b9dab84a,6203
955fee9216a65b617aa5c0531780ce60,5468
7a67c85e85bb2ce8582c35f2203ad736,5178
3d871de0142ce09b7081e2b9d1733cb1,4822
4869f7a5dfa277a7dca6462dcf3b52b2,4820
ea8482cd71df3c1969d7b9473ff13abc,4785


Databricks visualization. Run in Databricks to view.

# 11. Data Serving to Gold Layer

In [0]:
gold_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/gold/"

In [0]:
# Storing Transformed dataset into Gold layer

final_df.write \
    .mode("overwrite") \
    .format('delta') \
    .partitionBy("year") \
    .save(gold_path)