In [0]:
from pyspark.sql.functions import *

storage_account_name = "saazlearner"
storage_account_key  = dbutils.secrets.get(scope="olist_secret_scope", key="sa-access-key")

# Set Spark config
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
  storage_account_key
)
src_path = f"abfss://source@{storage_account_name}.dfs.core.windows.net"

trgt_path = f"abfss://silver@{storage_account_name}.dfs.core.windows.net"


In [0]:
# Explicit dataset config with desired keys
datasets = [
    {"key": "customers", "file_name": "olist_customers_dataset.csv", "folder_name": "olist_customers"},
    {"key": "geolocation", "file_name": "olist_geolocation_dataset.csv", "folder_name": "olist_geolocation"},
    {"key": "order_items", "file_name": "olist_order_items_dataset.csv", "folder_name": "olist_order_items"},
    {"key": "order_payments", "file_name": "olist_order_payments_dataset.csv", "folder_name": "olist_order_payments"},
    {"key": "order_reviews", "file_name": "olist_order_reviews_dataset.csv", "folder_name": "olist_order_reviews"},
    {"key": "orders", "file_name": "olist_orders_dataset.csv", "folder_name": "olist_orders"},
    {"key": "products", "file_name": "olist_products_dataset.csv", "folder_name": "olist_products"},
    {"key": "sellers", "file_name": "olist_sellers_dataset.csv", "folder_name": "olist_sellers"},
    {"key": "product_category_name_translation", "file_name": "product_category_name_translation.csv", "folder_name": "product_category_name_translation"}
]

dfs = {}   # dictionary to hold all DataFrames

for dataset in datasets:
    try:
        df = (spark.read.format("csv")
            #   .option("mode", "PERMISSIVE")
            #   .option("rescuedDataColumn", "rescued_data")
              .option("header", "true")
              .option("inferSchema", "true")
              .option("mergeSchema", "true")
              .load(f"{src_path}/{dataset['folder_name']}")
        )
        
        dfs[dataset["key"]] = df
        print(f"✅ {dataset['key']} DataFrame created successfully")
        
    except Exception as e:
        print(f"❌ Failed to load {dataset['file_name']}: {str(e)}")

# Step 2: Break into individual variables
cust_df     = dfs["customers"]
orders_df   = dfs["orders"]
products_df = dfs["products"]
sellers_df  = dfs["sellers"]
geoloc_df   = dfs["geolocation"]
order_items_df     = dfs["order_items"]
order_payments_df  = dfs["order_payments"]
order_reviews_df   = dfs["order_reviews"]
prod_cat_trans_df  = dfs["product_category_name_translation"]

In [0]:
def limit_show(df):
    df.limit(100).display()

def dropDupColRows(df):
    return df.dropDuplicates().dropna("all")

def distinct_cols(df):
    distinct_col = set()
    for c in df.columns:
        distinct_col.add(c)
    return list(distinct_col)

In [0]:
limit_show(prod_cat_trans_df)

### Data Cleaning

In [0]:
orders_df.filter(col("order_status").isNull()).display()

In [0]:
# drop duplicates and null rows

cust_df     = dropDupColRows(dfs["customers"])
orders_df   = dropDupColRows(dfs["orders"])
products_df = dropDupColRows(dfs["products"])
sellers_df  = dropDupColRows(dfs["sellers"])
geoloc_df   = dropDupColRows(dfs["geolocation"])
order_items_df     = dropDupColRows(dfs["order_items"])
order_payments_df  = dropDupColRows(dfs["order_payments"])
order_reviews_df   = dropDupColRows(dfs["order_reviews"])
prod_cat_trans_df  = dropDupColRows(dfs["product_category_name_translation"])
# limit_show(orders_df)

In [0]:
order_reviews_df.printSchema()

In [0]:

orders_df = (orders_df
 .withColumn("order_purchase_timestamp", to_date(col("order_purchase_timestamp"), "yyyy-MM-dd HH:mm:ss"))
 .withColumn("order_approved_at", to_date(col("order_approved_at"), "yyyy-MM-dd HH:mm:ss"))
 .withColumn("order_delivered_carrier_date", to_date(col("order_delivered_carrier_date"), "yyyy-MM-dd HH:mm:ss"))
 .withColumn("order_delivered_customer_date", to_date(col("order_delivered_customer_date"), "yyyy-MM-dd HH:mm:ss"))
 .withColumn("order_estimated_delivery_date", to_date(col("order_estimated_delivery_date"), "yyyy-MM-dd HH:mm:ss"))
 .withColumn("actual_delivery_time", datediff(col("order_delivered_customer_date"), col("order_purchase_timestamp")))
 .withColumn("estimated_delivery_time", datediff(col("order_estimated_delivery_date"), col("order_purchase_timestamp")))
 .withColumn("order_delay_flag", col("actual_delivery_time") > col("estimated_delivery_time"))
)


In [0]:
order_items_df.display()

In [0]:
order_items_df = order_items_df.withColumn("shipping_limit_date", to_date(col("shipping_limit_date"), "yyyy-MM-dd HH:mm:ss"))

In [0]:
order_reviews_df = order_reviews_df.withColumn("review_creation_date", to_date(col("review_creation_date"), "yyyy-MM-dd HH:mm:ss"))


In [0]:
order_items_df.orderBy("order_id").display()

### Data Joining

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

orders_cust_df = (orders_df.alias("o").join(cust_df.alias("c"), "customer_id","left")
)

ord_cust_pymnt_df = (orders_cust_df.alias("o").join(order_payments_df.alias("p"), "order_id","left")
                     .join(order_items_df.alias("i"), "order_id","left")
                     .join(products_df.alias("pr"), "product_id","left")
                     .join(sellers_df.alias("s"), "seller_id","left")
                     .join(prod_cat_trans_df.alias("pc"), "product_category_name","left")

)
#
ord_cust_pymnt_df = ord_cust_pymnt_df.select(distinct_cols(ord_cust_pymnt_df))




In [0]:
ord_cust_pymnt_df.printSchema()

In [0]:
ord_cust_pymnt_df.cache()

In [0]:
ord_cust_pymnt_df.unpersist()

In [0]:
# limit_show(ord_cust_pymnt_df)
ord_cust_pymnt_df.display()

In [0]:
ord_cust_pymnt_df.dropDuplicates().count()

In [0]:
print((ord_cust_pymnt_df.columns))


In [0]:
ord_cust_pymnt_df.write.mode("overwrite").option("path", trgt_path + "/transformed")

In [0]:
ord_cust_pymnt_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("adb_az_learner.silver.olist_joined")

In [0]:
ord_cust_pymnt_df.columns

In [0]:
from pyspark.sql.functions import date_format, col, sum as sum_f

olist_joined = spark.read.table("adb_az_learner.silver.olist_joined")





In [0]:
month_map = [
    ("01", "Jan"), ("02", "Feb"), ("03", "Mar"), ("04", "Apr"), ("05", "May"), ("06", "Jun"),
    ("07", "Jul"), ("08", "Aug"), ("09", "Sep"), ("10", "Oct"), ("11", "Nov"), ("12", "Dec")
]

month_map = spark.createDataFrame(month_map, ["month", "month_name"])

monthly_sales = olist_joined.withColumn("order_purchase_month", date_format(col("order_purchase_timestamp"), "MM"))\
    .select("order_id", "order_purchase_month", "payment_value")\
    .groupBy("order_purchase_month")\
    .agg(sum_f("payment_value").alias("total_sales"))\
    .orderBy("order_purchase_month")

monthly_sales = monthly_sales.join(month_map, monthly_sales["order_purchase_month"] == month_map["month"], "inner").select("order_purchase_month", "month_name", "total_sales").orderBy("order_purchase_month")


In [0]:
# monthly_sales.display()

monthly_sales.createOrReplaceTempView("monthly_sales")

# spark.sql("""
#           create or replace view adb_az_learner.gold.vw_monthly_sales
#           as
# select * from monthly_sales
#           """)


In [0]:
%sql

create or replace table adb_az_learner.gold.vw_monthly_sales
as
select * from monthly_sales

In [0]:
mon