## Doing Transformation for all the tables


In [0]:
spark.conf.set("fs.azure.account.auth.type.awssaledatalake.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.awssaledatalake.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.awssaledatalake.dfs.core.windows.net", 
               "3fd7ad3c-3291-46b6-9ce0-c360057c95c1")
spark.conf.set("fs.azure.account.oauth2.client.secret.awssaledatalake.dfs.core.windows.net", 
               "ax48Q~gLSH3ZZDSqSwqsi8k148.rPG8lytSSpcJT")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.awssaledatalake.dfs.core.windows.net", 
               "https://login.microsoftonline.com/07acb355-56bc-489b-b98c-8fea440460e8/oauth2/token")

In [0]:

path = "abfss://brozen@awssaledatalake.dfs.core.windows.net/Sales/"

folders = dbutils.fs.ls(path)

sales_dfs = {}


for folder in folders:
    table_name = folder.name.replace("/", "")
    path = folder.path
    try:
        df = spark.read.format("parquet").load(path)
        sales_dfs[table_name] = df
    except Exception as e:
        print(f"load faill '{table_name}': {e}")


✅ Loaded bảng 'CountryRegionCurrency' vào sales_dfs['CountryRegionCurrency']
✅ Loaded bảng 'CreditCard' vào sales_dfs['CreditCard']
✅ Loaded bảng 'Currency' vào sales_dfs['Currency']
✅ Loaded bảng 'CurrencyRate' vào sales_dfs['CurrencyRate']
✅ Loaded bảng 'Customer' vào sales_dfs['Customer']
✅ Loaded bảng 'PersonCreditCard' vào sales_dfs['PersonCreditCard']
✅ Loaded bảng 'SalesOrderDetail' vào sales_dfs['SalesOrderDetail']
✅ Loaded bảng 'SalesOrderHeader' vào sales_dfs['SalesOrderHeader']
✅ Loaded bảng 'SalesOrderHeaderSalesReason' vào sales_dfs['SalesOrderHeaderSalesReason']
✅ Loaded bảng 'SalesPerson' vào sales_dfs['SalesPerson']
✅ Loaded bảng 'SalesPersonQuotaHistory' vào sales_dfs['SalesPersonQuotaHistory']
✅ Loaded bảng 'SalesReason' vào sales_dfs['SalesReason']
✅ Loaded bảng 'SalesTaxRate' vào sales_dfs['SalesTaxRate']
✅ Loaded bảng 'SalesTerritory' vào sales_dfs['SalesTerritory']
✅ Loaded bảng 'SalesTerritoryHistory' vào sales_dfs['SalesTerritoryHistory']
✅ Loaded bảng 'Shopping

In [0]:
from pyspark.sql.functions import col, to_date, trim
from pyspark.sql.functions import lit

for name in sales_dfs:
    df = sales_dfs[name]  
    
    # 1. Normalize column names (CamelCase -> snake_case)
    for old_col_name in df.columns:
        new_col_name = "".join([
            "_" + char if char.isupper() and not old_col_name[i-1].isupper() else char
            for i, char in enumerate(old_col_name)
        ]).lstrip("_")
        df = df.withColumnRenamed(old_col_name, new_col_name)

    # 2. Remove duplicates
    df = df.dropDuplicates()

    # 3. Standardize data types (dates, strings, numbers)
    for c, t in df.dtypes:
        # Convert string columns that look like date/time
        if t == "string" and ("date" in c.lower() or "time" in c.lower()):
            df = df.withColumn(c, to_date(col(c), "yyyy-MM-dd"))
        # Trim extra spaces in string columns
        elif t == "string":
            df = df.withColumn(c, trim(col(c)))


    # 4. Handle null values 

    for col_name, dtype in df.dtypes:
        if dtype == "string":
            df = df.fillna({col_name: "N/A"})
        elif dtype in ["int", "bigint", "double", "float", "decimal"]:
            df = df.fillna({col_name: 0})

    # 5. Save to Silver (Delta format)
    output_path = f"abfss://sliver@awssaledatalake.dfs.core.windows.net/Sales/{name}/"
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .save(output_path)
    )

    print(f" Processed and saved table {name} into Silver")


✅ Processed and saved table CountryRegionCurrency into Silver
✅ Processed and saved table CreditCard into Silver
✅ Processed and saved table Currency into Silver
✅ Processed and saved table CurrencyRate into Silver
✅ Processed and saved table Customer into Silver
✅ Processed and saved table PersonCreditCard into Silver
✅ Processed and saved table SalesOrderDetail into Silver
✅ Processed and saved table SalesOrderHeader into Silver
✅ Processed and saved table SalesOrderHeaderSalesReason into Silver
✅ Processed and saved table SalesPerson into Silver
✅ Processed and saved table SalesPersonQuotaHistory into Silver
✅ Processed and saved table SalesReason into Silver
✅ Processed and saved table SalesTaxRate into Silver
✅ Processed and saved table SalesTerritory into Silver
✅ Processed and saved table SalesTerritoryHistory into Silver
✅ Processed and saved table ShoppingCartItem into Silver
✅ Processed and saved table SpecialOffer into Silver
✅ Processed and saved table SpecialOfferProduct i

In [0]:

path = "abfss://brozen@awssaledatalake.dfs.core.windows.net/Production/"


folders = dbutils.fs.ls(path)

Production_dfs = {}

for folder in folders:
    table_name = folder.name.replace("/", "")  
    path = folder.path
    try:
        df = spark.read.format("parquet").load(path)
        Production_dfs[table_name] = df
    except Exception as e:
        print(f"load faill '{table_name}': {e}")


✅ Loaded bảng 'BillOfMaterials' vào Production_dfs['BillOfMaterials']
✅ Loaded bảng 'Location' vào Production_dfs['Location']
✅ Loaded bảng 'Product' vào Production_dfs['Product']
✅ Loaded bảng 'ProductCategory' vào Production_dfs['ProductCategory']
✅ Loaded bảng 'ProductCostHistory' vào Production_dfs['ProductCostHistory']
✅ Loaded bảng 'ProductSubcategory' vào Production_dfs['ProductSubcategory']
✅ Loaded bảng 'ScrapReason' vào Production_dfs['ScrapReason']
✅ Loaded bảng 'TransactionHistory' vào Production_dfs['TransactionHistory']
✅ Loaded bảng 'TransactionHistoryArchive' vào Production_dfs['TransactionHistoryArchive']
✅ Loaded bảng 'UnitMeasure' vào Production_dfs['UnitMeasure']
✅ Loaded bảng 'WorkOrder' vào Production_dfs['WorkOrder']
✅ Loaded bảng 'WorkOrderRouting' vào Production_dfs['WorkOrderRouting']


In [0]:
from pyspark.sql.functions import col, to_date, trim
from pyspark.sql.functions import lit

for name in Production_dfs:
    df = Production_dfs[name]   # Load each DataFrame from Bronze
    
    # 1. Normalize column names (CamelCase -> snake_case)
    for old_col_name in df.columns:
        new_col_name = "".join([
            "_" + char if char.isupper() and not old_col_name[i-1].isupper() else char
            for i, char in enumerate(old_col_name)
        ]).lstrip("_")
        df = df.withColumnRenamed(old_col_name, new_col_name)

    # 2. Remove duplicates
    df = df.dropDuplicates()

    # 3. Standardize data types (dates, strings, numbers)
    for c, t in df.dtypes:
        # Convert string columns that look like date/time
        if t == "string" and ("date" in c.lower() or "time" in c.lower()):
            df = df.withColumn(c, to_date(col(c), "yyyy-MM-dd"))
        # Trim extra spaces in string columns
        elif t == "string":
            df = df.withColumn(c, trim(col(c)))


    # 4. Handle null values 

    for col_name, dtype in df.dtypes:
        if dtype == "string":
            df = df.fillna({col_name: "N/A"})
        elif dtype in ["int", "bigint", "double", "float", "decimal"]:
            df = df.fillna({col_name: 0})

    # 5. Save to Silver (Delta format)
    output_path = f"abfss://sliver@awssaledatalake.dfs.core.windows.net/Production/{name}/"
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .save(output_path)
    )

    print(f" Processed and saved table {name} into Silver")


✅ Processed and saved table BillOfMaterials into Silver
✅ Processed and saved table Location into Silver
✅ Processed and saved table Product into Silver
✅ Processed and saved table ProductCategory into Silver
✅ Processed and saved table ProductCostHistory into Silver
✅ Processed and saved table ProductSubcategory into Silver
✅ Processed and saved table ScrapReason into Silver
✅ Processed and saved table TransactionHistory into Silver
✅ Processed and saved table TransactionHistoryArchive into Silver
✅ Processed and saved table UnitMeasure into Silver
✅ Processed and saved table WorkOrder into Silver
✅ Processed and saved table WorkOrderRouting into Silver


In [0]:

path = "abfss://brozen@awssaledatalake.dfs.core.windows.net/Person/"

folders = dbutils.fs.ls(path)

Person_dfs = {}

for folder in folders:
    table_name = folder.name.replace("/", "")  
    path = folder.path
    try:
        df = spark.read.format("parquet").load(path)
        Person_dfs[table_name] = df
    except Exception as e:
        print(f"load faill '{table_name}': {e}")


✅ Loaded bảng 'CountryRegion' vào Person_dfs['CountryRegion']
✅ Loaded bảng 'Person' vào Person_dfs['Person']


In [0]:
from pyspark.sql.functions import col, to_date, trim
from pyspark.sql.functions import lit

for name in Person_dfs:
    df = Person_dfs[name]   # Load each DataFrame from Bronze
    
    # 1. Normalize column names (CamelCase -> snake_case)
    for old_col_name in df.columns:
        new_col_name = "".join([
            "_" + char if char.isupper() and not old_col_name[i-1].isupper() else char
            for i, char in enumerate(old_col_name)
        ]).lstrip("_")
        df = df.withColumnRenamed(old_col_name, new_col_name)

    # 2. Remove duplicates
    df = df.dropDuplicates()

    # 3. Standardize data types (dates, strings, numbers)
    for c, t in df.dtypes:
        # Convert string columns that look like date/time
        if t == "string" and ("date" in c.lower() or "time" in c.lower()):
            df = df.withColumn(c, to_date(col(c), "yyyy-MM-dd"))
        # Trim extra spaces in string columns
        elif t == "string":
            df = df.withColumn(c, trim(col(c)))


    # 4. Handle null values 

    for col_name, dtype in df.dtypes:
        if dtype == "string":
            df = df.fillna({col_name: "N/A"})
        elif dtype in ["int", "bigint", "double", "float", "decimal"]:
            df = df.fillna({col_name: 0})

    # 5. Save to Silver (Delta format)
    output_path = f"abfss://sliver@awssaledatalake.dfs.core.windows.net/Person/{name}/"
    (
        df.write
          .format("delta")
          .mode("overwrite")
          .save(output_path)
    )

    print(f" Processed and saved table {name} into Silver")


✅ Processed and saved table CountryRegion into Silver
✅ Processed and saved table Person into Silver
