In [0]:
# Create the configurations
configs = {
    "fs.azure.account.auth.type" : "OAuth",
    "fs.azure.account.oauth.provider.type" : "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id" : "114c4fc7-466c-4e9d-afac-ef442c986e0b",
    "fs.azure.account.oauth2.client.secret" : "~dW8Q~zygzgrzHm~bT.~Nyc-q3M1B~2Wws3IXc~d",
    "fs.azure.account.oauth2.client.endpoint" : "https://login.microsoftonline.com/d3591539-e9b7-44f4-9ff2-a98d01a5e5ac/oauth2/token"
}

dbutils.fs.mount(
    source= "abfss://lufthansataskdata@lufthansataskdata.dfs.core.windows.net/", #container@storage account name
    mount_point = "/mnt/lufthansataskdata",
    extra_configs = configs)

In [0]:
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.functions import coalesce, lit
from pyspark.sql.functions import col, sum as spark_sum, datediff
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import avg,sum

In [0]:
# read the data
customers = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_customers_dataset.csv")
geolocation = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_geolocation_dataset.csv")
order_items = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_order_items_dataset.csv")
order_payments = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_order_payments_dataset.csv")
order_reviews = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_order_reviews_dataset.csv")
orders = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_orders_dataset.csv")
products = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_products_dataset.csv")
sellers = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/olist_sellers_dataset.csv")
product_category_name_translation = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/lufthansataskdata/bronze/product_category_name_translation.csv")

# **CUSTOMERS**

In [0]:
customers.show(5)

+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
+--------------------+--------------------+------------------------+--------------------+--------------+
only showing top 5 rows



In [0]:
# check the schema for customers
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [0]:
# number of rows and columns
(customers.count(), len(customers.columns))

(99441, 5)

In [0]:
# missing values
customers_missing_values = customers.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in customers.columns])
display(customers_missing_values)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,0,0,0,0


In [0]:
# check for duplicated records
customers_duplicates = customers.groupBy(customers.columns).count().filter("count > 1")
display(customers_duplicates)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,count


# **ORDER_ITEMS**

In [0]:
# Check the schema of order_items
order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)



In [0]:
# number of rows and columns
(order_items.count(), len(order_items.columns))

(112650, 7)

In [0]:
# missing values
order_items_missing_values = order_items.select([count(when(col(c).isNull(), c)).alias(c) for c in order_items.columns])
display(order_items_missing_values)

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,0,0,0,0,0,0


In [0]:
# check for duplicated records
order_items_duplicates = order_items.groupBy(order_items.columns).count().filter("count > 1")
display(order_items_duplicates)

order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value,count


# **ORDER_PAYMENTS**

In [0]:
# Check the schema of order_payments
order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)



In [0]:
# number of rows and columns
(order_payments.count(), len(order_payments.columns))

(103886, 5)

In [0]:
# missing values
order_payments_missing_values = order_payments.select([count(when(col(c).isNull(), c)).alias(c) for c in order_payments.columns])
display(order_payments_missing_values)

order_id,payment_sequential,payment_type,payment_installments,payment_value
0,0,0,0,0


In [0]:
# check for duplicated records
order_payments_duplicates = order_payments.groupBy(order_payments.columns).count().filter("count > 1")
display(order_payments_duplicates)

order_id,payment_sequential,payment_type,payment_installments,payment_value,count


# **SELLERS**

In [0]:
# Check the schema of sellers
sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: integer (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [0]:
# number of rows and columns
(sellers.count(), len(sellers.columns))

(3095, 4)

In [0]:
# missing values
sellers_missing_values = sellers.select([count(when(col(c).isNull(), c)).alias(c) for c in sellers.columns])
display(sellers_missing_values)

seller_id,seller_zip_code_prefix,seller_city,seller_state
0,0,0,0


In [0]:
# check for duplicated records
sellers_duplicates = sellers.groupBy(sellers.columns).count().filter("count > 1")
display(sellers_duplicates)

seller_id,seller_zip_code_prefix,seller_city,seller_state,count


# **PRODUCT_CATEGORY_NAME_TRANSLATION**

In [0]:
# check the schema of product_category_name_translation
product_category_name_translation.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [0]:
# number of rows and columns
(product_category_name_translation.count(), len(product_category_name_translation.columns))

(71, 2)

In [0]:
# missing values
product_category_name_translation_missing_values = product_category_name_translation.select([count(when(col(c).isNull(), c)).alias(c) for c in product_category_name_translation.columns])
display(product_category_name_translation_missing_values)

product_category_name,product_category_name_english
0,0


In [0]:
# check for duplicated records
product_category_name_translation_duplicates = product_category_name_translation.groupBy(product_category_name_translation.columns).count().filter("count > 1")
display(product_category_name_translation_duplicates)

product_category_name,product_category_name_english,count


# **ORDERS**

In [0]:
# print schema for orders
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [0]:
# number of rows and columns
(orders.count(), len(orders.columns))

(99441, 8)

In [0]:
# missing values
orders_missing_values = orders.select([count(when(col(c).isNull(), c)).alias(c) for c in orders.columns])
display(orders_missing_values)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,0,0,0,160,1783,2965,0


In [0]:
# check for duplicated records
orders_duplicates = orders.groupBy(orders.columns).count().filter("count > 1")
display(orders_duplicates)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,count


In [0]:
# drop missing values from orders
orders = orders.na.drop()
(orders.count(), len(orders.columns))

(96461, 8)

# **ORDER REVIEWS**

In [0]:
# check order reviews schema
order_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title3: string (nullable = true)
 |-- review_comment_title4: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



In [0]:
# number of rows and columns
(order_reviews.count(), len(order_reviews.columns))

(99224, 7)

In [0]:
# missing values
order_reviews_missing_values = order_reviews.select([count(when(col(c).isNull(), c)).alias(c) for c in order_reviews.columns])
display(order_reviews_missing_values)

review_id,order_id,review_score,review_comment_title3,review_comment_title4,review_creation_date,review_answer_timestamp
0,0,0,87656,87656,0,0


In [0]:
# check for duplicated records
order_reviews_duplicates = order_reviews.groupBy(order_reviews.columns).count().filter("count > 1")
display(order_reviews_duplicates)

review_id,order_id,review_score,review_comment_title3,review_comment_title4,review_creation_date,review_answer_timestamp,count


In [0]:
# replace missing values with 'No title' and 'No comment'
order_reviews = order_reviews.withColumn(
    "review_comment_title", coalesce(order_reviews["review_comment_title3"], lit("No title"))
).withColumn(
    "review_comment_message", coalesce(order_reviews["review_comment_title4"], lit("No comment"))
)

# Drop the old columns to avoid duplication (optional)
order_reviews = order_reviews.drop("review_comment_title3", "review_comment_title4")

# Display the updated DataFrame
display(order_reviews)

review_id,order_id,review_score,review_creation_date,review_answer_timestamp,review_comment_title,review_comment_message
7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,1/18/2018 0:00,1/18/2018 21:46,No title,No comment
80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,3/10/2018 0:00,3/11/2018 3:05,No title,No comment
228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,2/17/2018 0:00,2/18/2018 14:36,No title,No comment
e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,4/21/2017 0:00,4/21/2017 22:02,No title,No comment
f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,3/1/2018 0:00,3/2/2018 10:26,No title,No comment
15197aa66ff4d0650b5434f1b46cda19,b18dcdf73be66366873cd26c5724d1dc,1,4/13/2018 0:00,4/16/2018 0:39,No title,No comment
07f9bee5d1b850860defd761afa7ff16,e48aa0d2dcec3a2e87348811bcfdf22b,5,7/16/2017 0:00,7/18/2017 19:30,No title,No comment
7c6400515c67679fbee952a7525281ef,c31a859e34e3adac22f376954e19b39d,5,8/14/2018 0:00,8/14/2018 21:36,No title,No comment
a3f6f7f6f433de0aefbb97da197c554c,9c214ac970e84273583ab523dfafd09b,5,5/17/2017 0:00,5/18/2017 12:05,No title,No comment
8670d52e15e00043ae7de4c01cc2fe06,b9bf720beb4ab3728760088589c62129,4,5/22/2018 0:00,5/23/2018 16:45,recomendo,recomendo


# **PRODUCTS**

In [0]:
# check for products print schema
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [0]:
# number of rows and columns
(products.count(), len(products.columns))

(32951, 9)

In [0]:
# missing values
products_missing_values = products.select([count(when(col(c).isNull(), c)).alias(c) for c in products.columns])
display(products_missing_values)

product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,610,610,610,610,2,2,2,2


In [0]:
# check for duplicated records
products_duplicates = products.groupBy(products.columns).count().filter("count > 1")
display(products_duplicates)

product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,count


In [0]:
# drop missing values from products
products = products.na.drop()
(products.count(), len(products.columns))

(32340, 9)

# **GEOLOCATION**

In [0]:
# check for geolocation print schema
geolocation.printSchema()

root
 |-- geolocation_zip_code_prefix: integer (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [0]:
# number of rows and columns
(geolocation.count(), len(geolocation.columns))

(19015, 5)

In [0]:
# missing values
geolocation_missing_values = geolocation.select([count(when(col(c).isNull(), c)).alias(c) for c in geolocation.columns])
display(geolocation_missing_values)

geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,0,0,0,0


In [0]:
# check for duplicated records
geolocation_duplicates = geolocation.groupBy(geolocation.columns).count().filter("count > 1")
display(geolocation_duplicates)

geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state,count


In [0]:
# merge customers with geolocation and sellers with geolocation
customers = customers.join(geolocation, customers.customer_zip_code_prefix == geolocation.geolocation_zip_code_prefix, how="left").drop(geolocation.geolocation_zip_code_prefix)
sellers = sellers.join(geolocation, sellers.seller_zip_code_prefix == geolocation.geolocation_zip_code_prefix, how="left").drop(geolocation.geolocation_zip_code_prefix)

# **CREATING CALCULATED COLUMNS**

### Total Price: Sum of product price and freight value.

In [0]:
# total price: Sum of product price and freight value.
order_items = order_items.withColumn("total_price", order_items["price"] + order_items["freight_value"])

### Delivery Time: Difference between the delivery date and the order purchase date.

In [0]:
# merge order_items and orders
order_items_orders = order_items.join(
    orders, 
    order_items.order_id == orders.order_id,
how="left").drop(orders.order_id)

# delivery time: difference between the delivery date and the order purchase date
order_items_orders = order_items_orders.withColumn("delivery_time", datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")))

### Payment Count: Sum of payment installments for each order.

In [0]:
# create the column 'Payment Count' in order_payments
payment_count = order_payments.groupBy("order_id").agg(
    F.sum("payment_installments").alias("payment_count")
)

# merge order_payments with order_items_orders
order_items_orders_payments = order_items_orders.join(payment_count, on="order_id", how="left").drop(order_payments.order_id)

### **Profit Margin: Subtract freight value from product price to calculate a rough profit estimate.**

In [0]:
# calculate the profit margin
order_items_orders_payments = order_items_orders_payments.withColumn("profit_margin", order_items_orders_payments["price"] - order_items_orders_payments["freight_value"])

# **Using Window Functions Over Partitions (Pandas)**

### **Total Sales per Customer: A running total of product price for each customer partitioned by Customer ID**

In [0]:
# Merge order_full with customers
order_items_orders_payments_customers = order_items_orders_payments.join(customers, on="customer_id", how="left")

In [0]:
# define a window specification for cumulative sum per customer
window_spec = Window.partitionBy("customer_id").orderBy("order_id", "order_item_id")  # or another unique column

# calculate cumulative sum of total_price per customer
order_items_orders_payments_customers = order_items_orders_payments_customers.withColumn(
    "total_sales_per_customer",
    F.sum("total_price").over(window_spec)
)
order_items_orders_payments_customers.show(5)

+-----------+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+-----------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------+------------------+------------------+------------------------+-------------+--------------+---------------+---------------+----------------+-----------------+------------------------+
|customer_id|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|      total_price|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|delivery_time|payment_count|     profit_margin|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|geolocation_lat|geolocation_lng|geolocation_city|geolocation_state|total_sales_pe

### **Average Delivery Time per Product Category: A rolling average of delivery time partitioned by product category.**

In [0]:
# merge with products
order_items_orders_payments_customers_products = order_items_orders_payments_customers.join(
    products, on="product_id", how="left"
)

In [0]:
# define a window specification for rolling average per product category
window_spec = Window.partitionBy("product_category_name").orderBy("delivery_time").rowsBetween(-4, 0)

# calculate rolling average of delivery time
order_items_orders_payments_customers_products = order_items_orders_payments_customers_products.withColumn(
    "avg_delivery_time_per_product_category",
    F.mean("delivery_time").over(window_spec)
)

In [0]:
order_items_orders_payments_customers_products = order_items_orders_payments_customers_products.dropna()
order_items_orders_payments_customers_products.show(5)

+--------------------+--------------------+--------------------+-------------+--------------------+-------------------+-----+-------------+------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------+-------------+------------------+--------------------+------------------------+---------------+--------------+---------------+---------------+----------------+-----------------+------------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+--------------------------------------+
|          product_id|         customer_id|            order_id|order_item_id|           seller_id|shipping_limit_date|price|freight_value|       total_price|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_

# **Transformed data to silver**

In [0]:
customers.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/customers.csv")
sellers.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/sellers.csv")
product_category_name_translation.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/product_category_name_translation.csv")
order_reviews.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/order_reviews.csv")
geolocation.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/geolocation.csv")
order_items_orders_payments_customers_products.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/silver/order_items_orders_payments_customers_products.csv")

# **Ready for decision to gold**

In [0]:
# Customer Dimension Table
customer_dimension = order_items_orders_payments_customers_products.select(
    "customer_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state"
).dropDuplicates()

# Product Dimension Table
product_dimension = order_items_orders_payments_customers_products.select(
    "product_id",
    "product_category_name",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
).dropDuplicates()

# Seller Dimension Table
seller_dimension = sellers.select(
    "seller_id",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state"
).dropDuplicates()

# Date Dimension Table
date_dimension = order_items_orders_payments_customers_products.select(
    "order_id",
    "order_purchase_timestamp",
    "order_delivered_customer_date"
).dropDuplicates()

In [0]:
# Order_items Fact Table
order_items_fact = order_items_orders_payments_customers_products.select(
    "order_id", 
    "order_item_id", 
    "product_id", 
    "seller_id", 
    "customer_id", 
    "total_price", 
    "delivery_time", 
    "payment_count", 
    "profit_margin", 
    "total_sales_per_customer", 
    "avg_delivery_time_per_product_category"
)

In [0]:
# Write to Gold
customer_dimension.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/gold/customers_dimension.csv")
seller_dimension.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/gold/sellers_dimension.csv")
date_dimension.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/gold/date_dimension.csv")
product_dimension.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/gold/product_dimension.csv")

order_items_fact.write.mode("overwrite").option("header", "true").csv("/mnt/lufthansataskdata/gold/order_items_fact.csv")

# **SQL Validation and Reporting**

In [0]:
%sql
create database if not exists `lufthansatask-gold-db`;
use `lufthansatask-gold-db`;

In [0]:
customer_dimension.write.saveAsTable("customer_dimension")
seller_dimension.write.saveAsTable("seller_dimension")
date_dimension.write.saveAsTable("date_dimension")
product_dimension.write.saveAsTable("product_dimension")

order_items_fact.write.saveAsTable("order_items_fact")