
####  Configure the connection between Azure Data Lake Storage  (ADLS) and Databricks

In [0]:
storage_account = "olistdatastorageaccountd"
application_id = "615d8082-bfcb-4fd8-8f69-e1089ea2a457"
directory_id = "39bafab8-b8b9-4cfc-8316-5116be5db767"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", "SDg8Q~986FB3IdCu4bDdTnfifTFQjr7TCttbpbvJ")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")


### Validate the connection by reading a file from ADLS Bronze folder using spark

In [0]:
## df = spark.read.csv(f"abfss://container@datalakestorage.dfs.core.windows.net/Bronze/ olist_customers_dataset.csv", header=True, inferSchema=True)

customer_df = spark.read.csv(f"abfss://olistdata@olistdatastorageaccountd.dfs.core.windows.net/Bronze/olist_customers_dataset.csv", header=True, inferSchema=True)
customer_df.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows




### Read Bronze layer Raw data into Spark datasets

In [0]:
base_path = "abfss://olistdata@olistdatastorageaccountd.dfs.core.windows.net/Bronze/"

orders_path = base_path + "olist_orders_dataset.csv"
payments_path = base_path + "olist_order_payments.csv"
reviews_path = base_path + "olist_order_reviews_dataset.csv"
order_items_path = base_path + "olist_order_items_dataset.csv"
customers_path = base_path + "olist_customers_dataset.csv"
sellers_path = base_path + "olist_sellers_dataset.csv"
geolocation_path = base_path + "olist_geolocation_dataset.csv"
products_path = base_path + "olist_products_dataset.csv"

orders_df = spark.read.csv(orders_path, header=True, inferSchema=True)
payments_df = spark.read.csv(payments_path, header=True, inferSchema=True)  
reviews_df = spark.read.csv(reviews_path, header=True, inferSchema=True)
order_items_df = spark.read.csv(order_items_path, header=True, inferSchema=True)
customers_df = spark.read.csv(customers_path, header=True, inferSchema=True)
sellers_df = spark.read.csv(sellers_path, header=True, inferSchema=True)
geolocation_df = spark.read.csv(geolocation_path, header=True, inferSchema=True)
products_df = spark.read.csv(products_path, header=True, inferSchema=True)


### Validate the mongo DB connection

In [0]:
# importing module
from pymongo import MongoClient

hostname = "q9g8j.h.filess.io"
database = "olistDataNoSQL_thencheese"
port = "61004"
username = "olistDataNoSQL_thencheese"
password = "916bc2473e798b3db3f94215c2b576b27b57fb3b"

uri = "mongodb://" + username + ":" + password + "@" + hostname + ":" + port + "/" + database

# Connect with the portnumber and host
client = MongoClient(uri)

# Access database
mydatabase = client[database]
display(mydatabase)


Database(MongoClient(host=['q9g8j.h.filess.io:61004'], document_class=dict, tz_aware=False, connect=True), 'olistDataNoSQL_thencheese')


### Read Mongo DB data into spark dataset

In [0]:
import pandas as pd

collection = mydatabase['product_categories']
mongo_data = pd.DataFrame(list(collection.find()))

mongo_data.head(5)

Unnamed: 0,_id,product_category_name,product_category_name_english
0,67f25de733881b9ebe2533d6,beleza_saude,health_beauty
1,67f25de733881b9ebe2533d7,informatica_acessorios,computers_accessories
2,67f25de733881b9ebe2533d8,automotivo,auto
3,67f25de733881b9ebe2533d9,cama_mesa_banho,bed_bath_table
4,67f25de733881b9ebe2533da,moveis_decoracao,furniture_decor


In [0]:
# drop the _id column before converting to Spark dataset
 
mongo_data.drop("_id",axis=1,inplace=True)
mongo_spark_df = spark.createDataFrame(mongo_data)


#### View the dataFrames and its schema

In [0]:
orders_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [0]:
print(f'Orders    : {orders_df.count()} rows')
print(f'customers : {customers_df.count()} rows')

Orders    : 99441 rows
customers : 99441 rows


In [0]:
customer_df.columns

['customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state']

In [0]:
# Null or Say Duplicate Values
from pyspark.sql.functions import col

customers_df.select([col(c).isNull().alias(c) for c in customers_df.columns]).show(5)

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
|      false|             false|                   false|        false|         false|
+-----------+------------------+------------------------+-------------+--------------+
only showing top 5 rows



In [0]:
# Null or Say Duplicate Values
from pyspark.sql.functions import col, when, count

customers_df.select([count(when(col(c).isNull(), 1)).alias(c) for c in customers_df.columns]).show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [0]:
# Duplicates values
customer_df.groupBy('customer_id').count().filter('count>1').show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [0]:
# Customer Distribution by state and diaplaying top5
customer_df.groupBy('customer_state').count().orderBy('count',ascending=False).show(5)

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            SP|41746|
|            RJ|12852|
|            MG|11635|
|            RS| 5466|
|            PR| 5045|
+--------------+-----+
only showing top 5 rows



In [0]:
# Order - Order status distribution
display(orders_df.head(5))

print(orders_df.columns)

orders_df.groupBy('order_status').count().orderBy('count',ascending=False).show()

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02T10:56:33Z,2017-10-02T11:07:15Z,2017-10-04T19:55:00Z,2017-10-10T21:25:13Z,2017-10-18T00:00:00Z
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24T20:41:37Z,2018-07-26T03:24:27Z,2018-07-26T14:31:00Z,2018-08-07T15:27:45Z,2018-08-13T00:00:00Z
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08T08:38:49Z,2018-08-08T08:55:23Z,2018-08-08T13:50:00Z,2018-08-17T18:06:29Z,2018-09-04T00:00:00Z
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18T19:28:06Z,2017-11-18T19:45:59Z,2017-11-22T13:39:59Z,2017-12-02T00:28:42Z,2017-12-15T00:00:00Z
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13T21:18:39Z,2018-02-13T22:20:29Z,2018-02-14T19:46:34Z,2018-02-16T18:17:02Z,2018-02-26T00:00:00Z


['order_id', 'customer_id', 'order_status', 'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date', 'order_estimated_delivery_date']
+------------+-----+
|order_status|count|
+------------+-----+
|   delivered|96478|
|     shipped| 1107|
|    canceled|  625|
| unavailable|  609|
|    invoiced|  314|
|  processing|  301|
|     created|    5|
|    approved|    2|
+------------+-----+



In [0]:
# Payments
display(payments_df.head(5))

print(payments_df.columns)

payments_df.groupBy('payment_type').count().orderBy('count',ascending=False).show()

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.3300018
a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.3899994
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.7099991
ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.779999
42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.449997


['order_id', 'payment_sequential', 'payment_type', 'payment_installments', 'payment_value']
+------------+-----+
|payment_type|count|
+------------+-----+
| credit_card|76795|
|      boleto|19784|
|     voucher| 5775|
|  debit_card| 1529|
| not_defined|    3|
+------------+-----+



In [0]:
# Top selling product
display(order_items_df.head(5))

from pyspark.sql.functions import sum
top_products = order_items_df.groupBy('product_id').agg(sum('price').alias('total_sales'))

top_products.orderBy('total_sales',ascending=False).show(5)

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19T09:45:35Z,58.9,13.29
00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03T11:05:13Z,239.9,19.93
000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18T14:48:30Z,199.0,17.87
00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15T10:10:18Z,12.99,12.79
00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13T13:57:51Z,199.9,18.14


+--------------------+-----------------+
|          product_id|      total_sales|
+--------------------+-----------------+
|bb50f2e236e5eea01...|          63885.0|
|6cdd53843498f9289...|54730.19999999998|
|d6160fb7873f18409...|         48899.34|
|d1c427060a0f73f6b...|47214.50999999998|
|99a4788cb24856965...| 43025.5599999999|
+--------------------+-----------------+
only showing top 5 rows



In [0]:
# Average Delivery Time Analysis
from pyspark.sql.functions import datediff
orders_df.printSchema()

# create a df with 3 columns of orders_df
delivery_df = orders_df.select('order_id','order_purchase_timestamp','order_delivered_customer_date')
delivery_df.show(5)

# create new delivery_time by calculating its value
delivery_detail_df = delivery_df.withColumn('delivery_time', datediff(col('order_delivered_customer_date'), col('order_purchase_timestamp')))

# display top 5 orders that too more delivery time
delivery_detail_df.orderBy('delivery_time',ascending=False).show(5)
                                        

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

+--------------------+------------------------+-----------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|
+--------------------+------------------------+-----------------------------+
|e481f51cbdc54678b...|     2017-10-02 10:56:33|          2017-10-10 21:25:13|
|53cdb2fc8bc7dce0b...|     2018-07-24 20:41:37|          2018-08-07 15:27:45|
|47770eb9100c2d0c4...|     2018-08-08 08:38:49|          2018-08-17 18:06:29|
|949d5b44dbf5de918...|     2017-11-18 19:28:06|          2017-12-02 00:28:42|
|ad21c59c0840e6cb8.

### Cleaning Data


In [0]:
# View the customers_df and its schema
customers_df.printSchema()
display(customer_df.head(5))

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


In [0]:
from pyspark.sql.functions import *


##### Identify missing values

In [0]:

# Defining the function to find missing values. 
def missing_values(df, df_name):
    print(f'Missing values in {df_name}:\n')
    df.select([count(when(col(c).isNull(),1)).alias(c) for c in df.columns]).show()

In [0]:
# Call the function missing_values function for customer_df
missing_values(customer_df, 'customer')

Missing values in customer:

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [0]:
# Call the function missing_values function for orders_df
missing_values(orders_df, 'orders')

Missing values in orders:

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [0]:
# Call the function missing_values function for order_items_df
missing_values(order_items_df, 'order_items')

Missing values in order_items:

+--------+-------------+----------+---------+-------------------+-----+-------------+
|order_id|order_item_id|product_id|seller_id|shipping_limit_date|price|freight_value|
+--------+-------------+----------+---------+-------------------+-----+-------------+
|       0|            0|         0|        0|                  0|    0|            0|
+--------+-------------+----------+---------+-------------------+-----+-------------+



##### Handling Missing Values

1. Drop Missing Values (For Non-Critical Columns)
2. Fill Missing Values (For Numerical Columns)
3. Impute Missing Values (For Continous Data)

In [0]:
# Dropping Missing Values
missing_values(orders_df, 'orders')

orders_df_cleaned = orders_df.na.drop(subset=['order_id','customer_id','order_status'])
orders_df_cleaned.show(5)

Missing values in orders:

+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+--

In [0]:
# Filling Missing Values. 

orders_df_cleaned = orders_df.fillna({'order_delivered_customer_date': '9999-12-31'})
orders_df_cleaned.show(5)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [0]:
# Impute Missing Values

######### Purposefully making the value to have null to understand Imputer function ########
payments_df_with_null=payments_df.withColumn('payment_value',when(col('payment_value')!=99.3300018,col('payment_value')).otherwise(lit(None)))

from pyspark.ml.feature import Imputer

# Imputer function calculates the Median value for the null value data in column "Payment_value"
imputer = Imputer(inputCols=['payment_value'], outputCols=['payment_value_imputed']).setStrategy("median")
payments_df_cleaned = imputer.fit(payments_df_with_null).transform(payments_df_with_null)

# Display the orginal df to see the NULL value
payments_df_with_null.show(10)
print("============================================================================")
# Display the df with Imputer data replacing the Null value with Median data
payments_df_cleaned.show(10)

+--------------------+------------------+------------+--------------------+-------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+------------------+------------+--------------------+-------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|
|a9810da82917af2d9...|                 1| credit_card|                   1|   24.3899994|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|   65.7099991|
|ba78997921bbcdc13...|                 1| credit_card|                   8|   107.779999|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|   128.449997|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|   96.1200027|
|771ee386b001f0620...|                 1| credit_card|                   1|   81.1600037|
|3d7239c394a212faa...|                 1| credit_card|                   3|   51.8400002|
|1f78449c8

### Standarizing the format

In [0]:
def print_schema(df,df_name):
    print(f'schema of {df_name}:')
    df.printSchema()

In [0]:
print_schema(customer_df,'customer')

schema of customer:
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [0]:
print_schema(payments_df,'payments')

schema of payments:
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [0]:
print_schema(orders_df,'orders')

schema of orders:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [0]:
print("Before Datatype conversion of order_purchase_timestamp")
#orders_df_cleaned.show(5)
print_schema(orders_df_cleaned,"orders")
orders_df_cleaned = orders_df_cleaned.withColumn('order_purchase_timestamp',to_date(col('order_purchase_timestamp')))
print("After Datatype conversion of order_purchase_timestamp")
#orders_df_cleaned.show(5)
print_schema(orders_df_cleaned,"orders")


Before Datatype conversion of order_purchase_timestamp
schema of orders:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

After Datatype conversion of order_purchase_timestamp
schema of orders:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable

In [0]:
payments_df_cleaned.show(10)

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1| credit_card|                   1|   24.3899994|           24.3899994|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|   65.7099991|           65.7099991|
|ba78997921bbcdc13...|                 1| credit_card|                   8|   107.779999|           107.779999|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|   128.449997|           128.449997|
|298fcdf1f73eb413e...|                 1| credit_card|                   2|   96.1200027|           96.1

### Data transformation.. 
#### replacing 'credit_card' to 'Credit Card'

In [0]:
print("Payment DF before transformation of payment_type column data")
payments_df_cleaned.show(10)

print("Payment DF before transformation of payment_type column data")
payments_df_cleaned=payments_df_cleaned.withColumn('payment_type', when(col('payment_type')=='credit_card',lit('Credit Card')).otherwise(col('payment_type')))
payments_df_cleaned.show(10)

Payment DF before transformation of payment_type column data
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| credit_card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1| credit_card|                   1|   24.3899994|           24.3899994|
|25e8ea4e93396b6fa...|                 1| credit_card|                   1|   65.7099991|           65.7099991|
|ba78997921bbcdc13...|                 1| credit_card|                   8|   107.779999|           107.779999|
|42fdf880ba16b47b5...|                 1| credit_card|                   2|   128.449997|           128.449997|
|298fcdf1f73eb413e...|                 1| c

### Data transformation.. 
#### replacing datatype of  customer_zip_code_prefix from interger to string

In [0]:
customer_df.show(5)
customer_df.printSchema()

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows

root
 |-- customer_id: string 

In [0]:
customer_df_cleaned = customer_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

customer_df_cleaned.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)




### Remove Duplicates

In [0]:
customer_df_cleaned = customer_df_cleaned.dropDuplicates(['customer_id'])

print(customer_df.count())
print(customer_df_cleaned.count())

99441
99441



### Joining Order_df and Order_item_df

In [0]:
print("The schema of orders_df is:")
orders_df_cleaned.printSchema()

order_with_details = orders_df_cleaned.join(order_items_df,'order_id','left')

# order_with_details.show(5)
print("The schema of joined df is:")
order_with_details.printSchema()

The schema of orders_df is:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)

The schema of joined df is:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: date (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = tru

In [0]:
# Joining Payments_df to the above joined df
order_with_details = order_with_details.join(payments_df_cleaned,'order_id','left')

In [0]:
# Finding the Total Value of the payment value

order_with_total_value = order_with_details.groupBy('order_id')\
    .agg(sum('payment_value').alias('total_order_value'))
order_with_total_value.show(5)

+--------------------+------------------+
|            order_id| total_order_value|
+--------------------+------------------+
|f373335aac9a659de...|        59.1800003|
|118045506e1c1dda0...|1801.9999699999998|
|cc66dee6fbc18bb79...|        136.399994|
|f44cb69655f8e4d13...|        164.320007|
|edcc6b79e8394346b...|        162.630005|
+--------------------+------------------+
only showing top 5 rows




### Advance Transformations

#### Identifying the Outliers and removing them to remove data Skewing

In [0]:
order_items_df.show(5)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30|199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18|12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51|199.9|        18.14|
+--------------------+-------------+------------

In [0]:
# Use the approxQuantile to identify the low cutoff and high cutoff. 

quantiles = order_items_df.approxQuantile('price',[0.01,0.99],0.0)
low_cutoff, high_cutoff = quantiles[0],quantiles[1]
low_cutoff,high_cutoff

(9.99, 890.0)

In [0]:
# However from the dataFrame order_items_df, we can see that the min and max values are 0.85 and 6735. Data is skewed with outliers. 

mx=order_items_df.select('price').summary("max").show()
mn=order_items_df.select('price').summary("min").show()
order_items_df.select('price').summary().show()
mx,mn

+-------+------+
|summary| price|
+-------+------+
|    max|6735.0|
+-------+------+

+-------+-----+
|summary|price|
+-------+-----+
|    min| 0.85|
+-------+-----+

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            112650|
|   mean|120.65373901464986|
| stddev|183.63392805025924|
|    min|              0.85|
|    25%|              39.9|
|    50%|             74.99|
|    75%|             134.9|
|    max|            6735.0|
+-------+------------------+



(None, None)

In [0]:
# Filter the data to exculude the outliers below low_cutoff and above high_cutoff. 

order_items_df_cleaned = order_items_df.filter((col('price') >= low_cutoff) & (col('price') <= high_cutoff))
order_items_df_cleaned.select('price').summary().show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|            110453|
|   mean|108.49213068001424|
| stddev|112.87303173792664|
|    min|              9.99|
|    25%|             39.99|
|    50%|             74.98|
|    75%|             130.0|
|    max|             890.0|
+-------+------------------+



In [0]:
# Lets review the Payments Dataframe now for any outliers. 

payments_df_cleaned.show(5)

+--------------------+------------------+------------+--------------------+-------------+---------------------+
|            order_id|payment_sequential|payment_type|payment_installments|payment_value|payment_value_imputed|
+--------------------+------------------+------------+--------------------+-------------+---------------------+
|b81ef226f3fe1789b...|                 1| Credit Card|                   8|         NULL|                100.0|
|a9810da82917af2d9...|                 1| Credit Card|                   1|   24.3899994|           24.3899994|
|25e8ea4e93396b6fa...|                 1| Credit Card|                   1|   65.7099991|           65.7099991|
|ba78997921bbcdc13...|                 1| Credit Card|                   8|   107.779999|           107.779999|
|42fdf880ba16b47b5...|                 1| Credit Card|                   2|   128.449997|           128.449997|
+--------------------+------------------+------------+--------------------+-------------+---------------

In [0]:
# As seen the min installment is 0 months and Max installment month is 24 months. So looks like dataset is clean. 

payments_df_cleaned.select('payment_installments').summary().show()

+-------+--------------------+
|summary|payment_installments|
+-------+--------------------+
|  count|              103886|
|   mean|   2.853348863176944|
| stddev|   2.687050673856479|
|    min|                   0|
|    25%|                   1|
|    50%|                   1|
|    75%|                   4|
|    max|                  24|
+-------+--------------------+



In [0]:
# Lets review products dataFrame now
products_df.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [0]:
# Adding a new column product_size_category based on product_weight_g. 
products_df_cleaned = products_df.withColumn(
    'product_size_category',
    when(col('product_weight_g') <= 500, lit('Small'))
    .when(col('product_weight_g').between(500,2000), 'Medium')
    .otherwise('Large')
    )
products_df_cleaned.show(5)

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|product_size_category|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|                Small|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|        

### Joining multiple dataframes to form a single dataframe



In [0]:
# Joining the Raw datasets from Bronze folder post Data Cleansing

orders_customers_df = orders_df_cleaned.join(customers_df, orders_df.customer_id == customers_df.customer_id, "left")

orders_payments_df = orders_customers_df.join(payments_df_cleaned, orders_customers_df.order_id == payments_df.order_id, "left")

orders_items_df = orders_payments_df.join(order_items_df_cleaned,"order_id", "left")    ## order_items_df

orders_items_products_df = orders_items_df.join(products_df_cleaned, orders_items_df.product_id == products_df.product_id, "left")

final_df = orders_items_products_df.join(sellers_df, orders_items_products_df.seller_id == sellers_df.seller_id, "left")



In [0]:
display(final_df)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,customer_id.1,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,order_id.1,payment_sequential,payment_type,payment_installments,payment_value,payment_value_imputed,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,product_id.1,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,product_size_category,seller_id.1,seller_zip_code_prefix,seller_city,seller_state
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02,2017-10-02T11:07:15Z,2017-10-04T19:55:00Z,2017-10-10T21:25:13Z,2017-10-18T00:00:00Z,9ef432eb6251297304e76186b10a928d,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP,e481f51cbdc54678b7cc49136f2d6af7,2,voucher,1,18.5900002,18.5900002,1,87285b34884572647811a353c7ac498a,3504c0cb71d7fa48d967e0e4c94d59d9,2017-10-06T11:07:15Z,29.99,8.72,87285b34884572647811a353c7ac498a,utilidades_domesticas,40.0,268.0,4.0,500,19,8,13,Small,3504c0cb71d7fa48d967e0e4c94d59d9,9350,maua,SP
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24,2018-07-26T03:24:27Z,2018-07-26T14:31:00Z,2018-08-07T15:27:45Z,2018-08-13T00:00:00Z,b0830fb4747a6c6d20dea0b8c802d7ef,af07308b275d755c9edb36a90c618231,47813,barreiras,BA,53cdb2fc8bc7dce0b6741e2150273451,1,boleto,1,141.460007,141.460007,1,595fac2a385ac33a80bd5114aec74eb8,289cdb325fb7e7f891c38608bf9e0962,2018-07-30T03:24:27Z,118.7,22.76,595fac2a385ac33a80bd5114aec74eb8,perfumaria,29.0,178.0,1.0,400,19,13,19,Small,289cdb325fb7e7f891c38608bf9e0962,31570,belo horizonte,SP
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08,2018-08-08T08:55:23Z,2018-08-08T13:50:00Z,2018-08-17T18:06:29Z,2018-09-04T00:00:00Z,41ce2a54c0b03bf3443c3d931a367089,3a653a41f6f9fc3d2a113cf8398680e8,75265,vianopolis,GO,47770eb9100c2d0c44946d9cf07ec65d,1,Credit Card,3,179.119995,179.119995,1,aa4383b373c6aca5d8797843e5594415,4869f7a5dfa277a7dca6462dcf3b52b2,2018-08-13T08:55:23Z,159.9,19.22,aa4383b373c6aca5d8797843e5594415,automotivo,46.0,232.0,1.0,420,24,19,21,Small,4869f7a5dfa277a7dca6462dcf3b52b2,14840,guariba,SP
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18,2017-11-18T19:45:59Z,2017-11-22T13:39:59Z,2017-12-02T00:28:42Z,2017-12-15T00:00:00Z,f88197465ea7920adcdbec7375364d82,7c142cf63193a1473d2e66489a9ae977,59296,sao goncalo do amarante,RN,949d5b44dbf5de918fe9c16f97b45f8a,1,Credit Card,1,72.1999969,72.1999969,1,d0b61bfb1de832b15ba9d266ca96e5b0,66922902710d126a0e7d26b0e3805106,2017-11-23T19:45:59Z,45.0,27.2,d0b61bfb1de832b15ba9d266ca96e5b0,pet_shop,59.0,468.0,3.0,450,30,10,20,Small,66922902710d126a0e7d26b0e3805106,31842,belo horizonte,MG
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13,2018-02-13T22:20:29Z,2018-02-14T19:46:34Z,2018-02-16T18:17:02Z,2018-02-26T00:00:00Z,8ab97904e6daea8866dbdbc4fb7aad2c,72632f0f9dd73dfee390c9b22eb56dd6,9195,santo andre,SP,ad21c59c0840e6cb83a9ceb5573f8159,1,Credit Card,1,28.6200008,28.6200008,1,65266b2da20d04dbe00c5c2d3bb7859e,2c9e548be18521d1c43cde1c582c6de8,2018-02-19T20:31:37Z,19.9,8.72,65266b2da20d04dbe00c5c2d3bb7859e,papelaria,38.0,316.0,4.0,250,51,15,15,Small,2c9e548be18521d1c43cde1c582c6de8,8752,mogi das cruzes,SP
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09,2017-07-09T22:10:13Z,2017-07-11T14:58:04Z,2017-07-26T10:57:55Z,2017-08-01T00:00:00Z,503740e9ca751ccdda7ba28e9ab8f608,80bb27c7c16e8f973207a5086ab329e2,86320,congonhinhas,PR,a4591c265e18cb1dcee52889e2d8acc3,1,Credit Card,6,175.259995,175.259995,1,060cb19345d90064d1015407193c233d,8581055ce74af1daba164fdbd55a40de,2017-07-13T22:10:13Z,147.9,27.36,060cb19345d90064d1015407193c233d,automotivo,49.0,608.0,1.0,7150,65,10,65,Large,8581055ce74af1daba164fdbd55a40de,7112,guarulhos,SP
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11,2017-04-13T13:25:17Z,,9999-12-31T00:00:00Z,2017-05-09T00:00:00Z,ed0271e0b7da060a393796590e7b737a,36edbb3fb164b1f16485364b6fb04c73,98900,santa rosa,RS,136cce7faa42fdb2cefd53fdc79a6098,1,Credit Card,1,65.9499969,65.9499969,1,a1804276d9941ac0733cfd409f5206eb,dc8798cbf453b7e0f98745e396cc5616,2017-04-19T13:25:17Z,49.9,16.05,a1804276d9941ac0733cfd409f5206eb,,,,,600,35,35,15,Medium,dc8798cbf453b7e0f98745e396cc5616,5455,sao paulo,SP
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16,2017-05-16T13:22:11Z,2017-05-22T10:07:46Z,2017-05-26T12:55:51Z,2017-06-07T00:00:00Z,9bdf08b4b3b52b5526ff42d37d47f222,932afa1e708222e5821dac9cd5db4cae,26525,nilopolis,RJ,6514b8ad8028c9f2cc2374ded245783f,1,Credit Card,3,75.1600037,75.1600037,1,4520766ec412348b8d4caa5e8a18c464,16090f2ca825584b5a147ab24aa30c86,2017-05-22T13:22:11Z,59.99,15.17,4520766ec412348b8d4caa5e8a18c464,automotivo,59.0,956.0,1.0,50,16,16,17,Small,16090f2ca825584b5a147ab24aa30c86,12940,atibaia,SP
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23,2017-01-25T02:50:47Z,2017-01-26T14:16:31Z,2017-02-02T14:08:10Z,2017-03-06T00:00:00Z,f54a9f0e6b351c431402b8461ea51999,39382392765b6dc74812866ee5ee92a7,99655,faxinalzinho,RS,76c6e866289321a7c93b82b54852dc33,1,boleto,1,35.9500008,35.9500008,1,ac1789e492dcd698c5c10b97a671243a,63b9ae557efed31d1f7687917d248a8d,2017-01-27T18:29:09Z,19.9,16.05,ac1789e492dcd698c5c10b97a671243a,moveis_decoracao,41.0,432.0,2.0,300,35,35,15,Small,63b9ae557efed31d1f7687917d248a8d,13720,sao jose do rio pardo,SP
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29,2017-07-29T12:05:32Z,2017-08-10T19:45:24Z,2017-08-16T17:14:30Z,2017-08-23T00:00:00Z,31ad1d1b63eb9962463f764d4e6e0c9d,299905e3934e9e181bfb2e164dd4b4f8,18075,sorocaba,SP,e69bfb5eb88e0ed6a785585b27e16dbf,1,Credit Card,1,8.34000015,8.34000015,1,9a78fb9862b10749a117f7fc3c31f051,7c67e1448b00f6e969d365cea6b010ab,2017-08-11T12:05:32Z,149.99,19.77,9a78fb9862b10749a117f7fc3c31f051,moveis_escritorio,45.0,527.0,1.0,9750,42,41,42,Large,7c67e1448b00f6e969d365cea6b010ab,8577,itaquaquecetuba,SP


In [0]:
final_df.columns

['order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'payment_value_imputed',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'product_id',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'product_size_category',
 'seller_id',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state']

In [0]:
# Joining the final dataframe with the Mongodb dataframe

final_df = final_df.join(mongo_spark_df,"product_category_name", "left")

In [0]:
final_df.show(5)

+---------------------+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+--------------------+------------------------+--------------------+--------------+--------------------+------------------+------------+--------------------+-------------+---------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+--------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+---------------------+--------------------+----------------------+---------------+------------+-----------------------------+
|product_category_name|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimate

In [0]:
final_df.columns

['product_category_name',
 'order_id',
 'customer_id',
 'order_status',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'order_estimated_delivery_date',
 'customer_id',
 'customer_unique_id',
 'customer_zip_code_prefix',
 'customer_city',
 'customer_state',
 'order_id',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'payment_value_imputed',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value',
 'product_id',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm',
 'product_size_category',
 'seller_id',
 'seller_zip_code_prefix',
 'seller_city',
 'seller_state',
 'product_category_name_english']


#### Copying the tranformed data to Silver Layer in ADLS

In [0]:
# Function to remove duplicate columns before copying the data to Silver Layer
def remove_duplicate_columns(df):
    columns = df.columns

    seen_columns=set()
    columns_to_drop=[]

    for column in columns:
        if column in seen_columns:
            columns_to_drop.append(column)
        else:
            seen_columns.add(column)

    display("Columns to drop are:" ,columns_to_drop)

    df_cleaned = df.drop(*columns_to_drop)
    return df_cleaned

final_df = remove_duplicate_columns(final_df)

'Columns to drop are:'

['customer_id', 'order_id', 'product_id', 'seller_id']


## Ingest the processed data to Silver Layer

In [0]:
# Write the final_df to Silver Layer
final_df.write.mode("overwrite").parquet("abfss://olistdata@olistdatastorageaccountd.dfs.core.windows.net/Silver")