# Bronze layer - Fact table

This notebook implements the Bronze layer for fact tables. It is designed to be reusable: it takes the table name as a parameter to identify which fact table to load. It also uses a year parameter to process data for a single year at a time.

Parameters used by the pipeline to control execution:

* **in_parameter_run_id**: unique identifier for the pipeline run
* **in_parameter_process_date**: execution date for lineage
* **in_parameter_table_name**: table name
* **in_parameter_columns**: columns of the table
* **in_parameter_year**: year partition
* **out_parameter_count_processed**: output parameter

In [None]:
in_parameter_run_id = 0
in_parameter_process_date = ""
in_parameter_table_name = ""
in_parameter_columns = ""
in_parameter_year = 2010
out_parameter_count_processed = 0

## 1. Load validation rules

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
df_validation_rules = spark.read.table("control.validation_rules").filter(col("table_name") == in_parameter_table_name).toPandas()

## 2. Extract data

Read the Parquet data for the selected table and year.

In [None]:
df = spark.read.parquet(f"Files/adventure_works_dw_2022/{in_parameter_table_name}/year={in_parameter_year}/*.parquet")
df = df.select(in_parameter_columns.split(","))

Apply the schema (cast columns based on validation rules) to enforce consistent data types in the Bronze layer using metadata.

In [None]:
for index, row in df_validation_rules.iterrows():
    if row["column_type"] == "INT":
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(IntegerType()))
    elif row["column_type"] == "TINYINT" or row["column_type"] == "SMALLINT":
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(ShortType()))
    elif row["column_type"] == "DECIMAL":
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(DecimalType(row["column_size"], row["column_size_scale"])))
    elif row["column_type"] == "DATETIME":
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(TimestampType()))
    elif row["column_type"] == "DATE":
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(DateType()))
    else:
        df = df.withColumn(row["column_name_original"], col(row["column_name_original"]).cast(StringType()))

Add run metadata to track when the data was loaded and which run loaded it.

In [None]:
df = df.withColumn("year", lit(in_parameter_year))
df = df.withColumn("process_date", lit(in_parameter_process_date))
df = df.withColumn("process_date", to_date("process_date", "yyyy-MM-dd"))
df = df.withColumn("run_id", lit(in_parameter_run_id))

## 3. Load data

Save the data by overwriting only the partition for the selected year (replaceWhere), making the load rerunnable without duplicates.

In [None]:
df.write.format("delta").mode("overwrite").option("replaceWhere", f"year = {in_parameter_year}").saveAsTable(f"bronze.{in_parameter_table_name}")

Calculate the total number of rows processed.

In [None]:
out_parameter_count_processed = df.count()

In [None]:
mssparkutils.notebook.exit(out_parameter_count_processed)