In [6]:
from pyspark.sql.functions import lit

# 1. Define the list of CSV files and their corresponding staging table names.
#    We will use a simple naming convention: Stg_{Table_Name}
files_to_process = {
    "olist_customers_dataset.csv": "Stg_Customers",
    "olist_orders_dataset.csv": "Stg_Orders",
    "olist_order_items_dataset.csv": "Stg_OrderItems",
    "olist_order_payments_dataset.csv": "Stg_OrderPayments",
    "olist_products_dataset.csv": "Stg_Products",
    "olist_sellers_dataset.csv": "Stg_Sellers",
    "olist_geolocation_dataset.csv": "Stg_Geolocation",
    "olist_order_reviews_dataset.csv": "Stg_OrderReviews",
    "product_category_name_translation.csv": "Stg_ProductCategoryTranslation"
}

# Define a single record source for all tables
record_source = "Kaggle_Olist"

# 2. Loop through each file, read it into a DataFrame, and save it as a table.
for file_name, table_name in files_to_process.items():
    print(f"Processing file: {file_name} -> Saving to table: {table_name}")
    try:
        # Construct the file path within the Lakehouse 'Files' section
        file_path = f"Files/{file_name}"

        # Read the CSV file into a DataFrame
        # The 'header' option is set to true because the first row contains column names.
        df_raw = spark.read.format("csv").option("header", "true").load(file_path)

        # Add the 'record_source' column to the DataFrame.
        # This is a key part of Data Vault for auditing and traceability.
        df_staged = df_raw.withColumn("Record_Source", lit(record_source))

        # Write the DataFrame to a Delta table in the Lakehouse 'Tables' section.
        # 'overwrite' mode ensures that if the table exists, it's recreated.
        df_staged.write.mode("overwrite").format("delta").saveAsTable(table_name)
        
        print(f"Successfully saved {table_name}.")
    except Exception as e:
        print(f"Error processing {file_name}: {e}")
        # Continue to the next file even if one fails.

print("\nAll files have been processed. You can now see the tables in your Lakehouse.")



StatementMeta(, eaea3dc9-14e9-4fab-a49a-ebe3ee7f9852, 8, Finished, Available, Finished)

Processing file: olist_customers_dataset.csv -> Saving to table: Stg_Customers
Error processing olist_customers_dataset.csv: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 490c751b-0342-4c92-b769-b4818a74c3bd).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- customer_id: string (nullable = true)
-- customer_unique_id: string (nullable = true)
-- customer_zip_code_prefix: string (nullable = true)
-- customer_city: string (nullable = true)
-- customer_state: string (nullable = true)


Data schema:
root
-- customer_id: string (nullable = true)
-- customer_unique_id: string (nullable = true)
-- customer_zip_code_prefix: string (nullable = true)
-- customer_city: string (nullabl

In [9]:
# This script creates all the Hub tables for the Data Vault model from the staged data.
# Hubs represent the core business entities of the e-commerce dataset.
# The script uses a consistent pattern to generate a unique hash key and add metadata
# for each hub, which is essential for data traceability and consistency.

from pyspark.sql.functions import sha2, lit, current_timestamp

# Define a single record source for all hub tables
record_source = "Kaggle_Olist"

# --- Create Hub_Customer ---
print("Creating Hub_Customer table...")
df_stg_customers = spark.read.table("Stg_Customers")
df_hub_customer = df_stg_customers.select(
    sha2("customer_unique_id", 256).alias("Customer_HKEY"),
    "customer_unique_id",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()
df_hub_customer.write.mode("overwrite").format("delta").saveAsTable("Hub_Customer")
print("Hub_Customer created successfully.")

# --- Create Hub_Order ---
print("Creating Hub_Order table...")
df_stg_orders = spark.read.table("Stg_Orders")
df_hub_order = df_stg_orders.select(
    sha2("order_id", 256).alias("Order_HKEY"),
    "order_id",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()
df_hub_order.write.mode("overwrite").format("delta").saveAsTable("Hub_Order")
print("Hub_Order created successfully.")

# --- Create Hub_Product ---
print("Creating Hub_Product table...")
# Note: olist_products_dataset.csv contains information about products
df_stg_products = spark.read.table("Stg_Products")
df_hub_product = df_stg_products.select(
    sha2("product_id", 256).alias("Product_HKEY"),
    "product_id",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()
df_hub_product.write.mode("overwrite").format("delta").saveAsTable("Hub_Product")
print("Hub_Product created successfully.")

# --- Create Hub_Seller ---
print("Creating Hub_Seller table...")
# Note: olist_sellers_dataset.csv contains information about sellers
df_stg_sellers = spark.read.table("Stg_Sellers")
df_hub_seller = df_stg_sellers.select(
    sha2("seller_id", 256).alias("Seller_HKEY"),
    "seller_id",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()
df_hub_seller.write.mode("overwrite").format("delta").saveAsTable("Hub_Seller")
print("Hub_Seller created successfully.")

print("\nAll hub tables have been created and are now available in your Lakehouse.")


StatementMeta(, eaea3dc9-14e9-4fab-a49a-ebe3ee7f9852, 11, Finished, Available, Finished)

Creating Hub_Customer table...
Hub_Customer created successfully.
Creating Hub_Order table...
Hub_Order created successfully.
Creating Hub_Product table...
Hub_Product created successfully.
Creating Hub_Seller table...
Hub_Seller created successfully.

All hub tables have been created and are now available in your Lakehouse.


In [10]:
# This script creates all the Satellite tables for the Data Vault model from the staged data.
# Satellites hold descriptive attributes and track the history of changes for each hub.
# The code joins the raw data with the corresponding hub table to link the attributes to the
# correct hash key and adds auditing metadata.

from pyspark.sql.functions import lit, current_timestamp

# Define a single record source for all satellite tables
record_source = "Kaggle_Olist"

# --- Create Sat_Customer_Details ---
print("Creating Sat_Customer_Details table...")
df_stg_customers = spark.read.table("Stg_Customers")
df_hub_customer = spark.read.table("Hub_Customer")

df_sat_customer = df_stg_customers.join(df_hub_customer, on="customer_unique_id").select(
    "Customer_HKEY",
    "customer_city",
    "customer_state",
    "customer_zip_code_prefix",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
)
df_sat_customer.write.mode("overwrite").format("delta").saveAsTable("Sat_Customer_Details")
print("Sat_Customer_Details created successfully.")

# --- Create Sat_Product_Details ---
print("Creating Sat_Product_Details table...")
df_stg_products = spark.read.table("Stg_Products")
df_hub_product = spark.read.table("Hub_Product")

df_sat_product = df_stg_products.join(df_hub_product, on="product_id").select(
    "Product_HKEY",
    "product_category_name",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
)
df_sat_product.write.mode("overwrite").format("delta").saveAsTable("Sat_Product_Details")
print("Sat_Product_Details created successfully.")

# --- Create Sat_Seller_Details ---
print("Creating Sat_Seller_Details table...")
df_stg_sellers = spark.read.table("Stg_Sellers")
df_hub_seller = spark.read.table("Hub_Seller")

df_sat_seller = df_stg_sellers.join(df_hub_seller, on="seller_id").select(
    "Seller_HKEY",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
)
df_sat_seller.write.mode("overwrite").format("delta").saveAsTable("Sat_Seller_Details")
print("Sat_Seller_Details created successfully.")

# --- Create Sat_Order_Details ---
print("Creating Sat_Order_Details table...")
df_stg_orders = spark.read.table("Stg_Orders")
df_hub_order = spark.read.table("Hub_Order")

df_sat_order = df_stg_orders.join(df_hub_order, on="order_id").select(
    "Order_HKEY",
    "customer_id", # customer_id here links to the order, not the unique customer
    "order_status",
    "order_purchase_timestamp",
    "order_approved_at",
    "order_delivered_carrier_date",
    "order_delivered_customer_date",
    "order_estimated_delivery_date",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
)
df_sat_order.write.mode("overwrite").format("delta").saveAsTable("Sat_Order_Details")
print("Sat_Order_Details created successfully.")

# --- Create Sat_Order_Reviews ---
print("Creating Sat_Order_Reviews table...")
df_stg_reviews = spark.read.table("Stg_OrderReviews")
df_hub_order = spark.read.table("Hub_Order")

df_sat_reviews = df_stg_reviews.join(df_hub_order, on="order_id").select(
    "Order_HKEY",
    "review_id",
    "review_score",
    "review_comment_title",
    "review_comment_message",
    "review_creation_date",
    "review_answer_timestamp",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
)
df_sat_reviews.write.mode("overwrite").format("delta").saveAsTable("Sat_Order_Reviews")
print("Sat_Order_Reviews created successfully.")

print("\nAll satellite tables have been created and are now available in your Lakehouse.")


StatementMeta(, eaea3dc9-14e9-4fab-a49a-ebe3ee7f9852, 12, Finished, Available, Finished)

Creating Sat_Customer_Details table...
Sat_Customer_Details created successfully.
Creating Sat_Product_Details table...
Sat_Product_Details created successfully.
Creating Sat_Seller_Details table...
Sat_Seller_Details created successfully.
Creating Sat_Order_Details table...
Sat_Order_Details created successfully.
Creating Sat_Order_Reviews table...
Sat_Order_Reviews created successfully.

All satellite tables have been created and are now available in your Lakehouse.


In [13]:
# This script creates the Link and Link Satellite tables for the Data Vault model.
# Links model the relationships between hubs, while link satellites store descriptive
# attributes about those relationships.

from pyspark.sql.functions import sha2, lit, current_timestamp, concat_ws

# Define a single record source for all tables
record_source = "Kaggle_Olist"

# --- Read all necessary Hub and Staging tables ---
# We will read all the tables needed to build the links in one go.
print("Reading necessary Hub and Staging tables...")
df_hub_customer = spark.read.table("Hub_Customer")
df_hub_order = spark.read.table("Hub_Order")
df_hub_product = spark.read.table("Hub_Product")
df_hub_seller = spark.read.table("Hub_Seller")

df_stg_orders = spark.read.table("Stg_Orders")
df_stg_customers = spark.read.table("Stg_Customers")
df_stg_order_items = spark.read.table("Stg_OrderItems")
df_stg_order_reviews = spark.read.table("Stg_OrderReviews")


# --- Create Link_Customer_Order ---
# This link connects a unique customer to an order they placed.
# We first join Stg_Orders with Stg_Customers to get the customer_unique_id.
print("Creating Link_Customer_Order...")
df_link_customer_order = df_stg_orders.join(df_stg_customers, on="customer_id") \
                                     .join(df_hub_customer, on="customer_unique_id") \
                                     .join(df_hub_order, on="order_id") \
                                     .select(
                                        sha2(concat_ws("||", "customer_unique_id", "order_id"), 256).alias("Customer_Order_HKEY"),
                                        "Customer_HKEY",
                                        "Order_HKEY",
                                        lit(current_timestamp()).alias("Load_Date"),
                                        lit(record_source).alias("Record_Source")
                                     ).distinct()
df_link_customer_order.write.mode("overwrite").format("delta").saveAsTable("Link_Customer_Order")
print("Link_Customer_Order created successfully.")


# --- Create Link_Order_Review ---
# This link connects an order to its review. The original code was correct.
print("Creating Link_Order_Review...")
df_link_order_review = df_stg_order_reviews.join(df_hub_order, on="order_id") \
                                            .select(
                                                sha2("order_id", 256).alias("Order_Review_HKEY"),
                                                "Order_HKEY",
                                                lit(current_timestamp()).alias("Load_Date"),
                                                lit(record_source).alias("Record_Source")
                                            ).distinct()
df_link_order_review.write.mode("overwrite").format("delta").saveAsTable("Link_Order_Review")
print("Link_Order_Review created successfully.")


# --- Create Link_OrderItem_Product_Seller (and its satellite) ---
# This is a more complex link that connects an item from an order to a product and a seller.
print("Creating Link_OrderItem_Product_Seller and its satellite...")

# Step 1: Create a temporary DataFrame with all necessary keys and attributes.
# The 'df_joined_item_data' DataFrame will hold all the columns needed for both the link and its satellite.
df_joined_item_data = df_stg_order_items.join(df_hub_order, on="order_id") \
                                 .join(df_hub_product, on="product_id") \
                                 .join(df_hub_seller, on="seller_id") \
                                 .withColumn("OrderItem_Product_Seller_HKEY",
                                             sha2(concat_ws("||", "order_id", "product_id", "seller_id"), 256))

# Step 2: Create the Link table from the temporary DataFrame.
# The link table only contains the hash keys and metadata.
df_link_item = df_joined_item_data.select(
    "OrderItem_Product_Seller_HKEY",
    "Order_HKEY",
    "Product_HKEY",
    "Seller_HKEY",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()

df_link_item.write.mode("overwrite").format("delta").saveAsTable("Link_OrderItem_Product_Seller")
print("Link_OrderItem_Product_Seller created successfully.")

# Step 3: Create the Link Satellite table from the temporary DataFrame.
# The satellite table contains the link's hash key and the descriptive attributes.
df_sat_link_item = df_joined_item_data.select(
    "OrderItem_Product_Seller_HKEY",
    "order_id",
    "order_item_id",
    "shipping_limit_date",
    "price",
    "freight_value",
    lit(current_timestamp()).alias("Load_Date"),
    lit(record_source).alias("Record_Source")
).distinct()

df_sat_link_item.write.mode("overwrite").format("delta").saveAsTable("Sat_Link_OrderItem_Product_Seller")
print("Sat_Link_OrderItem_Product_Seller created successfully.")

print("\nAll link tables and their satellites have been created and are now available in your Lakehouse.")


StatementMeta(, eaea3dc9-14e9-4fab-a49a-ebe3ee7f9852, 15, Finished, Available, Finished)

Reading necessary Hub and Staging tables...
Creating Link_Customer_Order...
Link_Customer_Order created successfully.
Creating Link_Order_Review...
Link_Order_Review created successfully.
Creating Link_OrderItem_Product_Seller and its satellite...
Link_OrderItem_Product_Seller created successfully.
Sat_Link_OrderItem_Product_Seller created successfully.

All link tables and their satellites have been created and are now available in your Lakehouse.


In [15]:
# This script creates a corrected Information Mart for customer spending.
# It now properly joins the Data Vault tables by connecting through the Link table
# that bridges the relationship between orders and their item details.

from pyspark.sql.functions import sum, desc

# --- Read the necessary Data Vault tables ---
# We need information about customers, orders, and the items in those orders.
print("Reading Data Vault tables...")
df_hub_customer = spark.read.table("Hub_Customer")
df_link_customer_order = spark.read.table("Link_Customer_Order")
# We need the Link table to bridge the relationship
df_link_order_item = spark.read.table("Link_OrderItem_Product_Seller")
df_sat_link_item = spark.read.table("Sat_Link_OrderItem_Product_Seller")


# --- Join the tables to get all the necessary data ---
# 1. Join Link_Customer_Order with the new Link_OrderItem_Product_Seller table on Order_HKEY.
#    This connects the customer to each specific item within their order.
df_combined = df_link_customer_order.join(df_link_order_item, on="Order_HKEY")

# 2. Join the result with the satellite (Sat_Link_OrderItem_Product_Seller) on its hash key.
#    This pulls in the price and freight value for each item.
df_final = df_combined.join(df_sat_link_item, on="OrderItem_Product_Seller_HKEY")

# 3. Join with Hub_Customer to get the customer's unique ID.
df_final_with_customer = df_final.join(df_hub_customer, on="Customer_HKEY")

# --- Aggregate the data to create the Information Mart ---
# Group by the unique customer ID and sum the total price and freight value.
print("Aggregating customer spending data...")
df_customer_spending_mart = df_final_with_customer.groupBy("customer_unique_id").agg(
    sum("price").alias("total_price"),
    sum("freight_value").alias("total_freight_value"),
    (sum("price") + sum("freight_value")).alias("total_spending")
).orderBy(desc("total_spending"))

# --- Save the Information Mart to the Lakehouse ---
# The final table is denormalized and ready for direct consumption by a reporting tool.
print("Saving Customer Spending Information Mart...")
df_customer_spending_mart.write.mode("overwrite").format("delta").saveAsTable("Mart_Customer_Spending")
print("Mart_Customer_Spending created successfully.")

print("\nInformation Mart created. The data is now ready for analysis!")


StatementMeta(, eaea3dc9-14e9-4fab-a49a-ebe3ee7f9852, 17, Finished, Available, Finished)

Reading Data Vault tables...
Aggregating customer spending data...
Saving Customer Spending Information Mart...
Mart_Customer_Spending created successfully.

Information Mart created. The data is now ready for analysis!
