# Olist E-Commerce - Extract, Transform, Load

- Load data from ADLS\Storage\Container\Silver

- Create Fact, Dimension, KPI-ready, and Feature tables

- Write the new tables into ADLS\Storage\Container\Gold

In [0]:
spark

## Connecting to Azure Data Lake Storage

In [0]:
storage_account = "storageaccountolistecom"
application_id = "10a55e8d-b4d6-4dd2-b74f-ee734d146ac0"
directory_id = "0a1ff938-feb1-4e8d-95b9-e2f699407226"
client_secret = "6yj8Q~Cauzb4TybflRb4Kvb-unQZeRM12oM8Aai."

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

## Loading datasets from Azure Storage Account

In [0]:
datasets = {'customer_df':'customer',
            'order_df':'order',
            'order_item_df':'order_item',
            'order_payment_df':'order_payment',
            'order_review_df':'order_review',
            'product_df':'product',
            'seller_df':'seller',
            'geolocation_df':'geolocation',
            'product_name_English':'product_name_English'}

In [0]:
for key, value in datasets.items():
    df = spark.read\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .csv(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Silver/{value}")
    globals()[key] = df

In [0]:
for key, _ in datasets.items():
    print(f'Columns in {key}:')
    print(globals()[key].columns, '\n')

Columns in customer_df:
['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state'] 

Columns in order_df:
['order_id', 'customer_id', 'order_status', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date', 'order_purchase_time', 'order_purchase_date', 'order_approved_at_time', 'order_approved_at_date', 'order_delivered_carrier_time', 'order_delivered_customer_time', 'order_estimated_delivery_time'] 

Columns in order_item_df:
['order_id', 'order_item_id', 'product_id', 'seller_id', 'shipping_limit_date', 'price', 'freight_value', 'shipping_limit_time'] 

Columns in order_payment_df:
['order_id', 'payment_sequential', 'payment_type', 'payment_installments', 'payment_value'] 

Columns in order_review_df:
['review_id', 'order_id', 'review_score', 'review_comment_title', 'review_comment_message', 'review_creation_date', 'review_answer_time', 'review_answer_date'] 

Columns in product_df:
['product_id', 'pro

## Joining Datasets

- Segregating fact, dimension tables

In [0]:
from pyspark.sql.functions import *

### Order Fact Table

In [0]:
fact_orders = order_df.join(order_item_df, "order_id", "inner")\
                    .join(order_payment_df, "order_id", "inner")\
                    .join(order_review_df, "order_id", "inner")\
                    .withColumn('review_score', col('review_score').cast('int'))\
                    .select("order_id", "customer_id", "product_id", "seller_id",\
                            "order_status", "payment_value", "price", "freight_value",\
                            "review_score", "order_purchase_date", "order_delivered_customer_date",\
                            "order_estimated_delivery_date" )
fact_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- payment_value: double (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- order_purchase_date: date (nullable = true)
 |-- order_delivered_customer_date: date (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)



### Daily Sales Fact Table

In [0]:
fact_sales_daily = fact_orders.groupBy("order_purchase_date")\
                            .agg(count('order_id').alias('total_orders'), round(sum('payment_value'),2).alias('total_revenue'),round(sum('payment_value')/count('order_id'), 2).alias('avg_order_value'),
                            round(sum('price'), 2).alias('total_price'), round(avg('review_score'), 1).alias('avg_review_score'))
fact_sales_daily.printSchema()                      

root
 |-- order_purchase_date: date (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_revenue: double (nullable = true)
 |-- avg_order_value: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- avg_review_score: double (nullable = true)



### Delivery Fact Table

In [0]:
fact_delivery = order_df.join(order_item_df, 'order_id', "inner")\
                        .withColumn('delivery_delay_days', datediff('order_estimated_delivery_date', 'order_delivered_customer_date'))\
                        .withColumn('delay_flag', when(col('delivery_delay_days') > 0, lit(1)).otherwise(lit(0)))\
                        .withColumn('delay_flag', col('delay_flag').cast('boolean'))
fact_delivery.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_delivered_carrier_date: date (nullable = true)
 |-- order_delivered_customer_date: date (nullable = true)
 |-- order_estimated_delivery_date: date (nullable = true)
 |-- order_purchase_time: timestamp (nullable = true)
 |-- order_purchase_date: date (nullable = true)
 |-- order_approved_at_time: timestamp (nullable = true)
 |-- order_approved_at_date: date (nullable = true)
 |-- order_delivered_carrier_time: timestamp (nullable = true)
 |-- order_delivered_customer_time: timestamp (nullable = true)
 |-- order_estimated_delivery_time: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: date (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- shipping_limit_time: tim

### Product Dimension Table

In [0]:
dim_product = product_df.join(product_name_English, 'product_category_name', "inner")
dim_product.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)
 |-- product_category_name_english: string (nullable = true)



### Customer Dimension Table

In [0]:
dim_customer = customer_df.join(order_df, 'customer_id', "inner")\
                          .join(order_payment_df, 'order_id', "inner")\
                          .join(order_review_df, 'order_id', "inner")

dim_customer = (dim_customer.groupBy('customer_id')
                            .agg(
                                first('customer_unique_id').alias('customer_unique_id'),
                                first('customer_zip_code_prefix').alias('customer_zip_code_prefix'),
                                first('customer_city').alias('customer_city'),
                                first('customer_state').alias('customer_state'),
                                count('*').alias('total_orders'),
                                round(sum('payment_value'), 2).alias('total_revenue'),
                                round(avg('review_score'), 1).alias('avg_review_score'),
                                min('order_purchase_date').alias('first_purchase_date'),
                                max('order_purchase_date').alias('last_purchase_date'))
                            .select(
                                'customer_id', 'customer_unique_id', 'customer_zip_code_prefix',
                                'customer_city', 'customer_state', 'total_orders', 'total_revenue',
                                'avg_review_score', 'first_purchase_date', 'last_purchase_date'))
dim_customer.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- total_orders: long (nullable = false)
 |-- total_revenue: double (nullable = true)
 |-- avg_review_score: double (nullable = true)
 |-- first_purchase_date: date (nullable = true)
 |-- last_purchase_date: date (nullable = true)



### Seller Dimension Table

In [0]:
dim_seller = seller_df
dim_seller.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



### Geolocation Dimension Table

In [0]:
dim_geolocation = geolocation_df
dim_geolocation.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



## Writing the fact, dimension tables onto Azure Storage Account

In [0]:
fact_orders.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/fact_orders")

fact_delivery.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/fact_delivery")

fact_sales_daily.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/fact_sales_daily")

dim_customer.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/dim_customer")

dim_product.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/dim_product")

dim_seller.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/dim_seller")

dim_geolocation.write.mode("overwrite").parquet(f"abfss://container-olist-ecommerce-data@{storage_account}.dfs.core.windows.net/Gold/dim_geolocation")