In [0]:
# Mounting the Bronze Zone and Checking all the files in it. 
dbutils.fs.ls("/mnt/bronze/SalesLT/")

[FileInfo(path='dbfs:/mnt/bronze/SalesLT/Address/', name='Address/', size=0, modificationTime=1727245580000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1727245589000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1727245589000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/Product/', name='Product/', size=0, modificationTime=1727245581000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1727245581000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1727245606000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductModel/', name='ProductModel/', size=0, modificationTime=1727245606000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductModelProductDescription/', name='ProductModelProductDescription/', size=0, modificationTime=1727245581000),
 FileInf

In [0]:
# Mounting the Silver Zone and Checking all the files in it. 
dbutils.fs.ls("/mnt/silver/SalesLT/")

[FileInfo(path='dbfs:/mnt/silver/SalesLT/Address/', name='Address/', size=0, modificationTime=1727760039000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1727760056000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1727760060000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/Product/', name='Product/', size=0, modificationTime=1727760064000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1727760067000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1727760070000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/ProductModel/', name='ProductModel/', size=0, modificationTime=1727760073000),
 FileInfo(path='dbfs:/mnt/silver/SalesLT/ProductModelProductDescription/', name='ProductModelProductDescription/', size=0, modificationTime=1727760076000),
 FileInf

Doing the same for all Tables where Date is ther with Timestamp

In [0]:
# Importing all the packages 
from pyspark.sql.functions import col, regexp_replace, from_utc_timestamp, date_format, round, when, lit, isnan
from pyspark.sql.types import TimestampType

In [0]:
# Performining all the transformations

# List of columns to drop
col_drop_list = ['NameStyle', 'PasswordHash', 'PasswordSalt', 'rowguid', 'ThumbNailPhoto', 'ThumbnailPhotoFileName']

# Function to process each table
def process_table(df):
    # Drop unnecessary columns
    for col_name in col_drop_list:
        if col_name in df.columns:
            df = df.drop(col_name)

    # Rename columns (convert CamelCase to snake_case)
    for old_col in df.columns:
        new_col_name = "".join(["_" + char if char.isupper() and not old_col[i-1].isupper() else char for i, char in enumerate(old_col)]).lstrip("_")
        df = df.withColumnRenamed(old_col, new_col_name)
    
    # Format timestamp and date columns
    for col_name in df.columns:
        if "date" in col_name.lower():
            df = df.withColumn(col_name, date_format(from_utc_timestamp(df[col_name].cast(TimestampType()), 'UTC'), 'yyyy-MM-dd'))
    
    # Round decimal columns to 2 decimal places
    for col_name, col_type in df.dtypes:
        if "decimal" in col_type:
            df = df.withColumn(col_name, round(df[col_name], 2))

    # Check for null values and replace them based on column type
    for col_name, col_type in df.dtypes:
        if col_type == 'boolean':
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit(False)).otherwise(col(col_name)))
        elif col_type in ['int', 'bigint', 'double', 'float', 'long', 'short']:
            df = df.withColumn(col_name, when(col(col_name).isNull() | isnan(col(col_name)), lit(0)).otherwise(col(col_name)))
        elif col_type == 'string':
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit("NA")).otherwise(col(col_name)))
        elif col_type in ['timestamp', 'date']:
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit(None)).otherwise(col(col_name)))
    
    return df



In [0]:
# Get list of table names from the bronze zone
# First we will create a list of all the tables in our bronze zone
table_names = []
# We will iterate through all the folder and load all the table name to the list
for i in dbutils.fs.ls('mnt/bronze/SalesLT'):
    table_names.append(i.name.split('/')[0])

# Process each table
for table_name in table_names:
    # Define the path for the table
    path = f'/mnt/bronze/SalesLT/{table_name}/{table_name}.parquet'
    
    # Load the DataFrame
    df = spark.read.format('parquet').load(path)
    
    # Process the table
    df_processed = process_table(df)
    
    # Write the processed DataFrame back to the bronze zone or another zone
    output_path = f'/mnt/silver/SalesLT/' + table_name +'/'
    df_processed.write.format('delta').mode('overwrite').save(output_path)

    # Write the cured data to the gold zone
    output_path = f'/mnt/gold/SalesLT/' + table_name +'/'
    df_processed.write.format('delta').mode('overwrite').save(output_path)