# Transforming Raw Data from Bronze Container

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.fs.ls('mnt/bronze/SalesLT/')

[FileInfo(path='dbfs:/mnt/bronze/SalesLT/Address/', name='Address/', size=0, modificationTime=1734963059000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1734963071000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1734963052000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/Product/', name='Product/', size=0, modificationTime=1734963064000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1734963055000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1734963080000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductModel/', name='ProductModel/', size=0, modificationTime=1734963077000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/ProductModelProductDescription/', name='ProductModelProductDescription/', size=0, modificationTime=1734963071000),
 FileInf

In [0]:
dbutils.fs.ls('mnt/silver/')

[]

In [0]:
df_sales_details = spark.read.format('parquet').load('/mnt/bronze/SalesLT/SalesOrderDetail/')
df_sales_details.limit(10).display()

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
71774,110562,1,836,356.898,0.0,356.898,e3a1994c-7a68-4ce8-96a3-77fdd3bbd730,2008-06-01T00:00:00Z
71774,110563,1,822,356.898,0.0,356.898,5c77f557-fdb6-43ba-90b9-9a7aec55ca32,2008-06-01T00:00:00Z
71776,110567,1,907,63.9,0.0,63.9,6dbfe398-d15d-425e-aa58-88178fe360e5,2008-06-01T00:00:00Z
71780,110616,4,905,218.454,0.0,873.816,377246c9-4483-48ed-a5b9-e56f005364e0,2008-06-01T00:00:00Z
71780,110617,2,983,461.694,0.0,923.388,43a54bcd-536d-4a1b-8e69-24d083507a14,2008-06-01T00:00:00Z
71780,110618,6,988,112.998,0.4,406.7928,12706fab-f3a2-48c6-b7c7-1ccde4081f18,2008-06-01T00:00:00Z
71780,110619,2,748,818.7,0.0,1637.4,b12f0d3b-5b4e-4f1f-b2f0-f7cde99dd826,2008-06-01T00:00:00Z
71780,110620,1,990,323.994,0.0,323.994,f117a449-039d-44b8-a4b2-b12001dacc01,2008-06-01T00:00:00Z
71780,110621,1,926,149.874,0.0,149.874,92e5052b-72d0-4c91-9a8c-42591803667e,2008-06-01T00:00:00Z
71780,110622,1,743,809.76,0.0,809.76,8bd33bed-c4f6-4d44-84fb-a7d04afcd794,2008-06-01T00:00:00Z


In [0]:
df_address = spark.read.format('parquet').load('/mnt/bronze/SalesLT/Address/')
df_address.limit(10).display()

AddressID,AddressLine1,AddressLine2,City,StateProvince,CountryRegion,PostalCode,rowguid,ModifiedDate
9,8713 Yosemite Ct.,,Bothell,Washington,United States,98011,268af621-76d7-4c78-9441-144fd139821a,2006-07-01T00:00:00Z
11,1318 Lasalle Street,,Bothell,Washington,United States,98011,981b3303-aca2-49c7-9a96-fb670785b269,2007-04-01T00:00:00Z
25,9178 Jumping St.,,Dallas,Texas,United States,75201,c8df3bd9-48f0-4654-a8dd-14a67a84d3c6,2006-09-01T00:00:00Z
28,9228 Via Del Sol,,Phoenix,Arizona,United States,85004,12ae5ee1-fc3e-468b-9b92-3b970b169774,2005-09-01T00:00:00Z
32,26910 Indela Road,,Montreal,Quebec,Canada,H1Y 2H5,84a95f62-3ae8-4e7e-bbd5-5a6f00cd982d,2006-08-01T00:00:00Z
185,2681 Eagle Peak,,Bellevue,Washington,United States,98004,7bccf442-2268-46cc-8472-14c44c14e98c,2006-09-01T00:00:00Z
297,7943 Walnut Ave,,Renton,Washington,United States,98055,52410da4-2778-4b1d-a599-95746625ce6d,2006-08-01T00:00:00Z
445,6388 Lake City Way,,Burnaby,British Columbia,Canada,V5A 3A6,53572f25-9133-4a8b-a065-102ff35416ee,2006-09-01T00:00:00Z
446,52560 Free Street,,Toronto,Ontario,Canada,M4B 1V7,801a1dfc-5125-486b-aa84-ccbd2ec57ca4,2005-08-01T00:00:00Z
447,22580 Free Street,,Toronto,Ontario,Canada,M4B 1V7,88cee379-dbb8-433b-b84e-a35e09435500,2006-08-01T00:00:00Z


The date column needs transformation

In [0]:
# Lets first load all the table names
table_name = []

for i in dbutils.fs.ls('mnt/bronze/SalesLT/'):
    table_name.append(i.name.split('/')[0])

table_name

['Address',
 'Customer',
 'CustomerAddress',
 'Product',
 'ProductCategory',
 'ProductDescription',
 'ProductModel',
 'ProductModelProductDescription',
 'SalesOrderDetail',
 'SalesOrderHeader']

In [0]:
# Iterate through each table name in the list
for i in table_name:
    # Construct the input path for the parquet file
    # Format: /mnt/bronze/SalesLT/[table_name]/[table_name].parquet
    path = '/mnt/bronze/SalesLT/' + i + '/' + i + '.parquet'
    
    # Read the parquet file into a Spark DataFrame
    df = spark.read.format('parquet').load(path)
    
    # Get list of all columns in the DataFrame
    column = df.columns
    
    # Iterate through each column in the DataFrame
    for col in column:
        # Check if column name contains 'Date' or 'date'
        if "Date" in col or "date" in col:
            # For date columns, perform the following transformations:
            # 1. Cast the column to TimestampType
            # 2. Convert from UTC timestamp
            # 3. Format the date as 'yyyy-MM-dd'
            df = df.withColumn(
                col, 
                date_format(
                    from_utc_timestamp(
                        df[col].cast(TimestampType()), 
                        "UTC"
                    ), 
                    "yyyy-MM-dd"
                )
            )
    
    # Construct the output path in the silver layer
    output_path = '/mnt/silver/SalesLT/' + i + '/'
    
    # Write the transformed DataFrame to Delta format
    # 'overwrite' mode will replace any existing data in the destination
    df.write.format('delta').mode('overwrite').save(output_path)