## Transforming Bronze to Silver Tables
###  This script performs transformations on all tables in the "bronze" layer and writes the processed data to the "silver" layer in Delta format. The transformations include:
### a.- Adjusting date columns to a standard "yyyy-MM-dd" format.
### b.- Saving the transformed tables to their respective directories in the "silver" layer.

In [9]:
# Microsoft Fabric Notebook: Transforming Bronze to Silver Layer

from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType


StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 11, Finished, Available, Finished)

In [10]:
# Define the Bronze Lakehouse folder path
bronze_path = "Files/bronze/SalesLT/"
silver_path = "Files/silver/SalesLT/"

StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 12, Finished, Available, Finished)

In [11]:
# Initialize an empty list to store table names
table_name = []

# List all directories (tables) in the Bronze layer
for item in mssparkutils.fs.ls(bronze_path):
    #print(item.name)  # Print directory names for debugging
    table_name.append(item.name.split('/')[0])  # Extract and store table names

StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 13, Finished, Available, Finished)

In [12]:
# Process and transform each table
for i in table_name:
    # Define path to the Parquet file in the Bronze layer
    path = f"{bronze_path}{i}/{i}.parquet"
    
    # Read the Parquet file into a Spark DataFrame
    df = spark.read.format("parquet").load(path)
    columns = df.columns  # Get a list of column names

    # Iterate through columns to find and transform date fields
    for col in columns:
        if "Date" in col or "date" in col:  # Check for columns containing "Date"
            # Convert UTC timestamps to "yyyy-MM-dd" format
            df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

    # Define the output path for the transformed table in the Silver layer
    output_path = f"{silver_path}{i}/"

    # Write the transformed DataFrame to the Silver layer in Delta format
    df.write.format("delta").mode("overwrite").save(output_path)

StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 14, Finished, Available, Finished)

In [13]:
#df = spark.read.parquet("Files/bronze/SalesLT/Customer/Customer.parquet")
# df now is a Spark DataFrame containing parquet data from "Files/bronze/SalesLT/Customer/Customer.parquet".
#display(df)

StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 15, Finished, Available, Finished)

In [17]:
#df = spark.read.format("parquet").load('Files/bronze/SalesLT/SalesOrderDetail')
#display(df)

StatementMeta(, 0768609a-4c34-451a-9ac5-3c0732b5c3fe, 19, Finished, Available, Finished)