In [0]:
from pyspark.sql.functions import col, when, lit

print('--------> Loading data -------------------------------------------------')
table_names = [i.name.split('/')[0] for i in dbutils.fs.ls('/mnt/bronze/Sales/')]

for table_name in table_names:
    try:
        df = spark.read.format('parquet').load(f"/mnt/bronze/Sales/{table_name}/{table_name}.parquet")
        df = df.dropDuplicates()
        df = df.withColumnRenamed("ModifiedDate", "Date")

        globals()[f"{table_name}_df"] = df
        print(f"Data loaded for {table_name}:")

    except Exception as e:
        print(f"Error processing {table_name}: {e}")



print('--------> Transforming Data -------------------------------------------------')
# Customer
Customer_df = Customer_df.drop('StoreID')
Customer_df = Customer_df.filter(col('PersonID').isNotNull())
Customer_df = Customer_df.withColumn("PersonID", col("PersonID").cast("int"))

# SalesOrderDetail
SalesOrderDetail_df = SalesOrderDetail_df.withColumn(
    'CarrierTrackingNumber',
    when(col("CarrierTrackingNumber").isNull(), lit('UNKNOWN'))
    .otherwise(col("CarrierTrackingNumber"))
)

# SalesOrderHeader
SalesOrderHeader_df = SalesOrderHeader_df.drop("PurchaseOrderNumber", "SalesPersonID", "Comment", "CurrencyRateID")
SalesOrderHeader_df = SalesOrderHeader_df.filter(col("CreditCardID").isNotNull())
SalesOrderHeader_df = SalesOrderHeader_df.withColumn("CreditCardID", col("CreditCardID").cast("int"))


# SalesPerson
SalesPerson_df = SalesPerson_df.filter(col('TerritoryID').isNotNull())
SalesPerson_df = SalesPerson_df.withColumn("TerritoryID", col("TerritoryID").cast("int"))
SalesPerson_df = SalesPerson_df.withColumn("SalesQuota", col("SalesQuota").cast("double"))


# SalesTerritoryHistory
SalesTerritoryHistory_df = SalesTerritoryHistory_df.drop('EndDate')

# SpecialOffer
SpecialOffer_df = SpecialOffer_df.drop("MaxQty")



print('--------> Saving Data To Gold -------------------------------------------------')
for table_name in table_names:
    try:
        df = globals().get(f"{table_name}_df")

        if df is not None:
            output_path = f"/mnt/gold/Sales/{table_name}/"
            df.write.format("delta").mode("overwrite").save(output_path)

            print(f"DataFrame {table_name} has been saved to Gold at {output_path}")
        else:
            print(f"No DataFrame found for {table_name}")

    except Exception as e:
        print(f"Error while writing {table_name} to Gold: {e}")


--------> Loading data -------------------------------------------------
Data loaded for CountryRegionCurrency:
Data loaded for CreditCard:
Data loaded for Currency:
Data loaded for CurrencyRate:
Data loaded for Customer:
Data loaded for PersonCreditCard:
Data loaded for SalesOrderDetail:
Data loaded for SalesOrderHeader:
Data loaded for SalesOrderHeaderSalesReason:
Data loaded for SalesPerson:
Data loaded for SalesPersonQuotaHistory:
Data loaded for SalesReason:
Data loaded for SalesTaxRate:
Data loaded for SalesTerritory:
Data loaded for SalesTerritoryHistory:
Data loaded for ShoppingCartItem:
Data loaded for SpecialOffer:
Data loaded for SpecialOfferProduct:
Data loaded for Store:
--------> Transforming Data -------------------------------------------------
--------> Saving Data To Gold -------------------------------------------------
DataFrame CountryRegionCurrency has been saved to Gold at /mnt/gold/Sales/CountryRegionCurrency/
DataFrame CreditCard has been saved to Gold at /mnt/