# Prepare and transform data in the Lakehouse


In [3]:
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 5, Finished, Available, Finished)

In [4]:
#read data as a dataframe
df = spark.read.format("parquet").load('Files/imported_files/WideWorldImportersDW/parquet/full/fact_sale_1y_full')
df.head()


StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 6, Finished, Available, Finished)

Row(SaleKey=7250401, CityKey=64141, CustomerKey=0, BillToCustomerKey=0, StockItemKey=139, InvoiceDateKey=datetime.datetime(2000, 2, 18, 0, 0), DeliveryDateKey=datetime.datetime(2000, 2, 19, 0, 0), SalespersonKey=49, WWIInvoiceID=2999, Description='"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:"The Gu" red shirt XML tag t-shirt (White) L:', Package='Each', Quantity=48, UnitPrice=Decimal('18.00'), TaxRate=Decimal('15.000'), TotalExcludingTax=Decimal('864.00'), TaxAmount=Decimal('129.60'), Profit=Decimal('528.00'), TotalIncludingTax=Decimal('993.60

In [None]:
from pyspark.sql.functions import col, year, month, quarter

# Load the raw fact_sale data from the parquet file into a DataFrame.
fact_sale_raw_df = spark.read.format("parquet").load('Files/imported_files/WideWorldImportersDW/parquet/full/fact_sale_1y_full')

# --- Feature Engineering: Add Date-Based Columns ---
# To support time-based analysis, new columns for Year, Quarter, and Month
# are derived from the 'InvoiceDateKey'. Chaining .withColumn() is an efficient
# way to apply multiple transformations in sequence.
fact_sale_transformed_df = fact_sale_raw_df.withColumn("SaleYear", year(col("InvoiceDateKey"))) \
                                           .withColumn("SaleQuarter", quarter(col("InvoiceDateKey"))) \
                                           .withColumn("SaleMonth", month(col("InvoiceDateKey")))

# --- Verification Step ---
# Before writing the data, it's good practice to inspect the new schema
# to ensure the transformations were applied correctly.
print("Schema of the transformed fact_sale table:")
fact_sale_transformed_df.printSchema()

# --- Write to Delta Table ---
# Save the transformed DataFrame as a partitioned Delta table.
# Partitioning by Year and Quarter will significantly speed up queries
# that filter on these date ranges.
OUTPUT_TABLE_NAME = "fact_sale"
fact_sale_transformed_df.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .partitionBy("SaleYear", "SaleQuarter") \
    .save(f"Tables/{OUTPUT_TABLE_NAME}")

print(f"\nSuccessfully created and saved the partitioned '{OUTPUT_TABLE_NAME}' table.")

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 7, Finished, Available, Finished)

In [None]:
from pyspark.sql.types import *

def process_and_load_dimension(table_name: str) -> None:
    """
    Reads a raw dimension table from a parquet file, removes the 'Photo' column,
    and saves the result as a clean Delta table in the Lakehouse.

    Args:
        table_name (str): The name of the dimension table to process.
                          This name is used for both the source file and the target table.
    """
    print(f"--> Starting processing for dimension: '{table_name}'")
    
    # Define paths
    source_path = f"Files/imported_files/WideWorldImportersDW/parquet/full/{table_name}"
    target_table = f"Tables/{table_name}"
    
    # Read the raw data
    dimension_df = spark.read.format("parquet").load(source_path)
    
    # Remove the 'Photo' column as it is not needed for analysis
    cleaned_dimension_df = dimension_df.select([c for c in dimension_df.columns if c != 'Photo'])
    
    # Write the cleaned DataFrame to a Delta table
    cleaned_dimension_df.write \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .format("delta") \
        .save(target_table)
        
    print(f"--- Successfully saved table: '{table_name}'")

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 8, Finished, Available, Finished)

In [None]:
# --- Configuration: Define Dimension Tables ---
# Create a list of all dimension tables that need to be processed.
# This list will be iterated through, calling the processing function for each table.
DIMENSION_TABLES_TO_PROCESS = [
  'dimension_city',
  'dimension_customer',
  'dimension_date',
  'dimension_employee',
  'dimension_stock_item'
]

# --- Execution: Loop and Process Each Dimension ---
# Iterate through the list and call our custom function for each table name.
# The logging/print statements are now inside the function itself,
# which keeps this loop clean and focused on its main job: orchestration.
for dim_table in DIMENSION_TABLES_TO_PROCESS:
    process_and_load_dimension(dim_table)

print("\n All dimension tables have been successfully processed and loaded.")

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 9, Finished, Available, Finished)

Loading data for 'dimension_city'...
--- Done loading 'dimension_city'.
Loading data for 'dimension_customer'...
--- Done loading 'dimension_customer'.
Loading data for 'dimension_date'...
--- Done loading 'dimension_date'.
Loading data for 'dimension_employee'...
--- Done loading 'dimension_employee'.
Loading data for 'dimension_stock_item'...
--- Done loading 'dimension_stock_item'.

All dimension tables have been loaded successfully!


Data Transformation - Business Aggregates (PySpark Approach)
The first business aggregation we need is a summary of sales totals for each city, by day. This will be created by joining the `fact_sale` table with the `dimension_date` and `dimension_city` tables.

In [None]:
# Load the necessary tables from the Lakehouse
fact_sale_df = spark.read.table("medium1.fact_sale")
dim_date_df = spark.read.table("medium1.dimension_date")
dim_city_df = spark.read.table("medium1.dimension_city")

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 10, Finished, Available, Finished)

Successfully loaded the three tables into DataFrames.


In [None]:
# --- Step 1: Join the fact table with its dimensions ---
# This brings all the necessary descriptive information together with the sales numbers.
joined_df = fact_sale_df.alias("sales") \
    .join(dim_date_df.alias("date"), col("sales.InvoiceDateKey") == col("date.Date"), "inner") \
    .join(dim_city_df.alias("city"), col("sales.CityKey") == col("city.CityKey"), "inner")

# Optional: You can display this intermediate DataFrame to verify the join worked
# display(joined_df)

StatementMeta(, 6a6c6c0d-6a3b-46c0-871d-509544a1a67b, 11, Finished, Available, Finished)

Transformation complete. The aggregated data is now in the 'sale_by_date_city' DataFrame.


In [None]:
# --- Step 2: Group by date/city and calculate sums ---
# This performs the main aggregation to get our business metrics.
city_sales_agg_df = joined_df \
    .groupBy("date.Date", "date.CalendarMonthLabel", "city.City", "city.StateProvince") \
    .sum("sales.TotalIncludingTax", "sales.Profit") \
    .withColumnRenamed("sum(TotalIncludingTax)", "TotalSales") \
    .withColumnRenamed("sum(Profit)", "TotalProfit") \
    .orderBy("date.Date")

# --- Step 3: Display the final result before saving ---
# This verification step lets us see our final summary table.
print("Displaying final aggregated data:")
display(city_sales_agg_df)

In [None]:
# --- Step 4: Save the final aggregated data as a new table ---
city_sales_agg_df.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("Tables/aggregate_sale_by_date_city")

print("Successfully saved the 'aggregate_sale_by_date_city' table.")

Create Employee-Level Sales Aggregate (Spark SQL Approach)
As an alternative approach, we can use Spark SQL to answer a different business question: "What are the total sales for each employee on each day?". This demonstrates the flexibility of using SQL for complex aggregations.

First, a temporary view is created to define the query logic.


In [None]:
%%sql
-- This view joins sales with employee and date dimensions to aggregate sales data at the employee level.
-- Using a temporary view is a good practice as it separates the complex query logic from the final save operation.

CREATE OR REPLACE TEMPORARY VIEW view_employee_sales_by_date
AS
SELECT 
    -- Date and Employee Attributes for grouping
    DD.Date,
    DD.CalendarMonthLabel,
    DE.Employee,
    DE.PreferredName,
    
    -- Aggregated Metrics
    SUM(FS.TotalIncludingTax) AS TotalSales,
    SUM(FS.Profit) AS TotalProfit

-- Define table sources and aliases
FROM medium1.fact_sale AS FS
-- Join Logic
INNER JOIN medium1.dimension_date AS DD ON FS.InvoiceDateKey = DD.Date
INNER JOIN medium1.dimension_employee AS DE ON FS.SalespersonKey = DE.EmployeeKey

-- Grouping by all non-aggregated columns
GROUP BY
    DD.Date,
    DD.CalendarMonthLabel,
    DE.Employee,
    DE.PreferredName

In [None]:
# --- Step 1: Materialize the View ---
# The SQL cell only defined the view; this command executes the query
# and loads the results into a DataFrame.
employee_sales_agg_df = spark.sql("SELECT * FROM view_employee_sales_by_date ORDER BY Date, PreferredName")


# --- Step 2: Save the final aggregated data as a new table ---
employee_sales_agg_df.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("Tables/aggregate_sale_by_date_employee")

print("Successfully saved the 'aggregate_sale_by_date_employee' table.")