## Setup and Config

In [0]:
# spark.conf.set("spark.sql.adaptive.enabled","true")         # AQE ON
spark.conf.set("spark.sql.shuffle.partitions", "200")      # tune as needed

In [0]:
!ls

## Bronze Layer - Data Ingestion

In [0]:
bronze_base = "/Volumes/workspace/olist/delta/bronze/"
def create_df_delta(file):
    name = file.split("/")[-1].split(".")[0].split("_")[1]
    df = spark.read.format("csv").option("header", "true").load(file)
    df = spark.read.option("header","true").option("inferSchema","true").csv(file)
    delta_loc = bronze_base + name
    print(f"Writing to Delta {delta_loc}")
    (df
    .write
    .format("delta")
    .mode("overwrite")             # or "append" for incremental loads
    .option("overwriteSchema", "true")
    .save(delta_loc))
    return df

In [0]:
from pyspark.sql.functions import to_date, date_format, col

orders_path = "/Volumes/workspace/olist/data/olist_orders_dataset.csv"
orders_df = spark.read.option("header","true").option("inferSchema","true").csv(orders_path)

# Add partitioning column
orders_df = orders_df.withColumn("purchase_yyyy_mm", date_format(to_date(col("order_purchase_timestamp")), "yyyy-MM"))


In [0]:
display(orders_df)

In [0]:
from pyspark.sql.functions import to_date, date_format, col

# Write as Delta, partitioned by yyyy-MM, control file size
orders_loc = bronze_base + "orders"
(orders_df.write  .option("mergeSchema", "true")
  .option("overwriteSchema", "true")
   .format("delta")
   .mode("overwrite")
   .option("maxRecordsPerFile", 250000)
   .partitionBy("purchase_yyyy_mm")
   .save(orders_loc)
)


In [0]:
from pathlib import Path
base_path = Path("/Volumes/workspace/olist/data")
for file in base_path.glob("**/*.csv"):
  print(file)


In [0]:
# customer df
customers_df = create_df_delta("/Volumes/workspace/olist/data/olist_customers_dataset.csv")
# products df
products_df = create_df_delta("/Volumes/workspace/olist/data/olist_products_dataset.csv")
# sellers df
sellers_df = create_df_delta("/Volumes/workspace/olist/data/olist_sellers_dataset.csv")
# order_items df
order_items_df = create_df_delta("/Volumes/workspace/olist/data/olist_order_items_dataset.csv")

## Silver Layer

Cleanse and standardize text fields

In [0]:
from pyspark.sql.functions import lower, trim

customers_df = customers_df.withColumn("customer_city_norm", lower(trim(col("customer_city"))))
customers_df = customers_df.withColumn("customer_state_norm", lower(trim(col("customer_state"))))

In [0]:
display(customers_df)

deduplication

In [0]:
print(customers_df.count())

customers_dedup_df = customers_df.dropDuplicates(["customer_unique_id"])
print(customers_dedup_df.count())

In [0]:
print(sellers_df.columns)
print(sellers_df.count())

sellers_dedup_df = sellers_df.dropDuplicates(["seller_id"])
print(sellers_dedup_df.count())

In [0]:
# Save deduplicated customers and sellers as Delta in the silver layer
silver_base = "/Volumes/workspace/olist/delta/silver/"
customers_dedup_loc = silver_base + "customers_dedup"
sellers_dedup_loc = silver_base + "sellers_dedup"

(customers_dedup_df.write
    .format("delta")
    .mode("overwrite")
    .save(customers_dedup_loc)
)

(sellers_dedup_df.write
    .format("delta")
    .mode("overwrite")
    .save(sellers_dedup_loc)
)

## Joins

Size of each table 

In [0]:
import pandas as pd

# Define the locations of your Delta tables
# (Assuming this is the structure you used to save them)
base_path = "/Volumes/workspace/olist/delta/"
tables = [
    "bronze/orders",
    "bronze/order_items",
    "bronze/customers",
    "bronze/products",
    "bronze/sellers",
    "silver/customers_dedup",
    "silver/sellers_dedup"
]

table_sizes = []

# Loop through each table path, get its details, and extract the size
for table_path_suffix in tables:
    full_path = base_path + table_path_suffix
    try:
        # Get the table's metadata
        detail_df = spark.sql(f"DESCRIBE DETAIL '{full_path}'")
        
        # Extract the size in bytes and convert to megabytes
        size_in_bytes = detail_df.select("sizeInBytes").first()[0]
        size_in_mb = size_in_bytes / (1024 * 1024)
        
        table_sizes.append((table_path_suffix, size_in_mb))
        
    except Exception as e:
        print(f"Could not get size for {full_path}: {e}")
        table_sizes.append((table_path_suffix, 0.0))

# Create and display a DataFrame with the results
sizes_df = spark.createDataFrame(table_sizes, ["table", "size_mb (on disk)"])
display(sizes_df.orderBy("size_mb (on disk)", ascending=False)) 

salting

In [0]:
from pyspark.sql.functions import col, concat, lit, rand, broadcast

SALT_FACTOR = 5


salted_order_items_df = order_items_df.withColumn(
    "salted_seller_id",
    concat(col("seller_id"), lit("_"), (rand() * SALT_FACTOR).cast("int"))
)

salt_df = spark.range(SALT_FACTOR).withColumnRenamed("id", "salt_value")

salted_sellers_df = sellers_dedup_df.crossJoin(broadcast(salt_df)).withColumn(
    "salted_seller_id",
    concat(col("seller_id"), lit("_"), col("salt_value"))
).drop("salt_value")



print("Executing the final join with salted tables...")

sales_df = (orders_df
    # Join the prepared salted_order_items_df
    .join(salted_order_items_df, "order_id")
    
    # Continue with other standard joins
    .join(products_df, "product_id")
    
    # Perform the skewed join using the new salted key and prepared sellers table
    .join(salted_sellers_df, "salted_seller_id")
    
    # Continue with other standard joins
    .join(customers_dedup_df, "customer_id")
    
    # Clean up the temporary key after all joins are complete
    .drop("salted_seller_id")
)

# Display the schema of the final, successfully joined DataFrame
print("Final DataFrame schema:")
sales_df.printSchema()

# You can now proceed to write sales_df to your silver layer

In [0]:
sales_df.show(5)

Caching sales table

In [0]:
sales_df.cache()

In [0]:
from pyspark.sql.functions import broadcast
sellers_loc = bronze_base + "sellers"
sellers = spark.read.format("delta").load(sellers_loc)
products_loc = bronze_base + "products"
products = spark.read.format("delta").load(products_loc)


# get both tables size

sellers_size = sellers.count()
products_size = products.count()
print("Products: ", products_size)
print("Sellers: ", sellers_size)




In [0]:
# Print the headers for both tables
print("Products columns: ", products.columns)
print("Sellers columns: ", sellers.columns)

# Gold Layer

In [0]:
gold_base = "/Volumes/workspace/olist/delta/gold/"
# Join the tables
sales_df = (orders_df
    .join(order_items_df, "order_id")
    .join(products_df, "product_id")
    .join(sellers_dedup_df, "seller_id")
    .join(customers_dedup_df, "customer_id")
)


# Create the daily sales table
daily_sales_df = (sales_df
  .groupBy("order_purchase_timestamp", "product_category_name", "customer_state")
  .sum("price")
  .withColumnRenamed("sum(price)", "total_revenue")
)

sales_loc = gold_base + "daily_sales"
# Write to Gold layer, partitioned by date and Z-ORDERED by state and category
(daily_sales_df.write
  .format("delta")
  .mode("overwrite")
  .partitionBy("order_purchase_timestamp")
  .option("delta.autoOptimize.optimizeWrite", "true")
  .option("delta.autoOptimize.autoCompact", "true")
  .save(sales_loc)
)



Optimize

In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/olist/delta/gold/daily_sales`;


OPTIMIZE delta.`/Volumes/workspace/olist/delta/gold/daily_sales`
  ZORDER BY (customer_state, product_category_name);

Vacuum

In [0]:
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DeltaMaintenanceAllLayers") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


base_paths = [
    "/Volumes/workspace/olist/delta/bronze/",
    "/Volumes/workspace/olist/delta/silver/",
    "/Volumes/workspace/olist/delta/gold/"
]

all_tables_to_vacuum = []
print("Discovering Delta tables...")

# Loop through each base path to find subdirectories (the tables).
for path in base_paths:
    try:
        # List all entries in the directory.
        table_names = os.listdir(path)
        for table_name in table_names:
            full_path = os.path.join(path, table_name)
            # Check if the entry is a directory, assuming it's a Delta table.
            if os.path.isdir(full_path):
                all_tables_to_vacuum.append(full_path)
    except FileNotFoundError:
        print(f"Warning: Directory not found, skipping: {path}")

print("\nFound the following tables to vacuum:")
for table in all_tables_to_vacuum:
    print(f"- {table}")


print("\nStarting VACUUM operations...")


spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled = false")

for table_path in all_tables_to_vacuum:
    try:
        print(f"Vacuuming table: {table_path}...")
        

        spark.sql(f"VACUUM '{table_path}' RETAIN 0 HOURS")
        
        print(f"  -> Successfully vacuumed.")
    except Exception as e:
        print(f"  -> Error vacuuming {table_path}: {e}")


spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")

print("\nAll maintenance operations complete.")