In [1]:
#Tranformation in Silver Layer
#import libraries
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import row_number
import traceback

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 3, Finished, Available, Finished)

In [2]:
#Parameterize workspace and lakehouse name
Workspace = "NewRetailCompany"
Lakehouse = "Staging_Zone"

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 4, Finished, Available, Finished)

In [3]:
#Schema is not enfored on bronze layer as new tables were created during first copy

lz_sales_df = spark.read.table("lz_sales_transaction")

# Assign columns names 
col_names = [
  "SaleTransactionID","SalesRepID","SalesRepName","SalesRepCountry",
  "SalesRepCountryCode","SalesRepRegion","SalesCountryCode","CustomerID",
  "CustomerName","CustomerCountry","CustomerRegion","CustomerCountryCode",
  "UnitQuantity","SaleAmount","SaleDatetime","ProductDetails",
  "ProductCode","SaleLocation","CurrencyCode"
]

for i, col_name in enumerate(lz_sales_df.columns):
    lz_sales_df = lz_sales_df.withColumnRenamed(col_name, col_names[i])


#Casting datatypes
try:
  sz_sales_df = lz_sales_df \
      .withColumn("UnitQuantity", F.col("UnitQuantity").cast(IntegerType())) \
      .withColumn("SaleAmount", F.col("SaleAmount").cast(DecimalType(18,2))) \
      .withColumn("SaleDatetime", F.to_timestamp("SaleDatetime"))

except AnalysisException as e:
    print("Spark AnalysisException occurred:", str(e))
    traceback.print_exc()

except Exception as e:
    print("Unexpected error occurred:", str(e))
    traceback.print_exc()


StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 5, Finished, Available, Finished)

In [4]:
sz_sales_df.printSchema()

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 6, Finished, Available, Finished)

root
 |-- SaleTransactionID: string (nullable = true)
 |-- SalesRepID: string (nullable = true)
 |-- SalesRepName: string (nullable = true)
 |-- SalesRepCountry: string (nullable = true)
 |-- SalesRepCountryCode: string (nullable = true)
 |-- SalesRepRegion: string (nullable = true)
 |-- SalesCountryCode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- CustomerCountry: string (nullable = true)
 |-- CustomerRegion: string (nullable = true)
 |-- CustomerCountryCode: string (nullable = true)
 |-- UnitQuantity: integer (nullable = true)
 |-- SaleAmount: decimal(18,2) (nullable = true)
 |-- SaleDatetime: timestamp (nullable = true)
 |-- ProductDetails: string (nullable = true)
 |-- ProductCode: string (nullable = true)
 |-- SaleLocation: string (nullable = true)
 |-- CurrencyCode: string (nullable = true)



Silver Layer
✅ Schema enforcement → correct types (integer, decimal, timestamp).

✅ Null handling → replaces blanks "" with NULL.

✅ Standardization → uppercase codes (ISO-like), trim all string fields.

✅ Deduplication → keeps only the latest record for each SaleTransactionID.

**Check for NULL values**

In [5]:
#Handling Null values
#Replacing None values with empty string
sz_sales_df = sz_sales_df.select([ 
    F.when(F.col(c)=="", None).otherwise(F.col(c)).alias(c) 
    for c in sz_sales_df.columns
])

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 7, Finished, Available, Finished)

In [6]:
#ensure all country code are in upper case
sz_sales_df = sz_sales_df \
    .withColumn("SalesRepCountryCode", F.upper(F.col("SalesRepCountryCode"))) \
    .withColumn("SalesCountryCode", F.upper(F.col("SalesCountryCode"))) \
    .withColumn("CustomerCountryCode", F.upper(F.col("CustomerCountryCode"))) \
    .withColumn("CurrencyCode", F.upper(F.col("CurrencyCode")))

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 8, Finished, Available, Finished)

In [7]:
# Trim string columns to remove leading/trailing spaces
for col_name, dtype in sz_sales_df.dtypes:
    if dtype == "string":
        sz_sales_df = sz_sales_df.withColumn(col_name, F.trim(F.col(col_name)))

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 9, Finished, Available, Finished)

In [8]:
#Deduplication (keep latest SaleDatetime if duplicates exist)
sz_sales_df = sz_sales_df.withColumn(
    "row_num",
    F.row_number().over(Window.partitionBy("SaleTransactionID").orderBy(F.col("SaleDatetime").desc_nulls_last()))
).filter(F.col("row_num")==1).drop("row_num")

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 10, Finished, Available, Finished)

In [9]:
#why doesnt the hierarcy column deleted - spelling mistake
sz_sales_df.printSchema()


StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 11, Finished, Available, Finished)

root
 |-- SaleTransactionID: string (nullable = true)
 |-- SalesRepID: string (nullable = true)
 |-- SalesRepName: string (nullable = true)
 |-- SalesRepCountry: string (nullable = true)
 |-- SalesRepCountryCode: string (nullable = true)
 |-- SalesRepRegion: string (nullable = true)
 |-- SalesCountryCode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- CustomerCountry: string (nullable = true)
 |-- CustomerRegion: string (nullable = true)
 |-- CustomerCountryCode: string (nullable = true)
 |-- UnitQuantity: integer (nullable = true)
 |-- SaleAmount: decimal(18,2) (nullable = true)
 |-- SaleDatetime: timestamp (nullable = true)
 |-- ProductDetails: string (nullable = true)
 |-- ProductCode: string (nullable = true)
 |-- SaleLocation: string (nullable = true)
 |-- CurrencyCode: string (nullable = true)



In [10]:
#Column ProductDetails contains subcategories
#Normalizing column ProductDetails into sub columns

sz_sales_df = sz_sales_df.withColumn(
    "ProductHierarchy", 
    F.split(F.col("ProductDetails"),"\\|")
    )
sz_sales_df = sz_sales_df \
    .withColumn("Category1", F.col("ProductHierarchy")[0]) \
    .withColumn("Category2", F.col("ProductHierarchy")[1]) \
    .withColumn("Category3", F.col("ProductHierarchy")[2]) \
    .withColumn("ProductName", F.col("ProductHierarchy")[3])



StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 12, Finished, Available, Finished)

In [11]:
#Column Sales Location contains continent and country
#Normalizing column Sales Location into sub columns

sz_sales_df = sz_sales_df.withColumn(
    "SaleLocationHierarchy", 
    F.split(F.col("SaleLocation"),"\\|")
    )
sz_sales_df = sz_sales_df \
    .withColumn("SaleRegion", F.col("SaleLocationHierarchy")[0]) \
    .withColumn("SaleCountry", F.col("SaleLocationHierarchy")[1]) 
   



StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 13, Finished, Available, Finished)

In [12]:
#Column ProductCode contains subcategories
#Normalizing column ProductCode into sub columns

sz_sales_df = sz_sales_df.withColumn(
    "ProductCodeHierarchy", 
    F.split(F.col("ProductCode"),"\\|")
    )
sz_sales_df = sz_sales_df \
    .withColumn("Level1Code", F.col("ProductCodeHierarchy")[0]) \
    .withColumn("Level2Code", F.col("ProductCodeHierarchy")[1]) \
    .withColumn("Level3Code", F.col("ProductCodeHierarchy")[2]) \
    .withColumn("SpecificProductCode", F.col("ProductCodeHierarchy")[3])



StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 14, Finished, Available, Finished)

In [13]:
#drop extra columns
sz_sales_df = sz_sales_df.drop("ProductHierarchy","ProductDetails")
sz_sales_df = sz_sales_df.drop("SaleLocationHierarchy","SaleLocation")
sz_sales_df = sz_sales_df.drop("ProductCodeHierarchy","ProductCode")


StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 15, Finished, Available, Finished)

In [14]:
sz_sales_df.printSchema()

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 16, Finished, Available, Finished)

root
 |-- SaleTransactionID: string (nullable = true)
 |-- SalesRepID: string (nullable = true)
 |-- SalesRepName: string (nullable = true)
 |-- SalesRepCountry: string (nullable = true)
 |-- SalesRepCountryCode: string (nullable = true)
 |-- SalesRepRegion: string (nullable = true)
 |-- SalesCountryCode: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- CustomerCountry: string (nullable = true)
 |-- CustomerRegion: string (nullable = true)
 |-- CustomerCountryCode: string (nullable = true)
 |-- UnitQuantity: integer (nullable = true)
 |-- SaleAmount: decimal(18,2) (nullable = true)
 |-- SaleDatetime: timestamp (nullable = true)
 |-- CurrencyCode: string (nullable = true)
 |-- Category1: string (nullable = true)
 |-- Category2: string (nullable = true)
 |-- Category3: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- SaleRegion: string (nullable = true)
 |-- SaleCountry: string (nullable = true)
 |

In [15]:
sz_sales_df.count()

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 17, Finished, Available, Finished)

27000

In [16]:
#saleamount cotains outlier(eg.300billions of saleamount for 4 unit of xerox)
sz_sales_df = sz_sales_df.where(F.col("SaleAmount") <= 3000)


StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 18, Finished, Available, Finished)

In [17]:
sz_sales_df.count()

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 19, Finished, Available, Finished)

26865

In [18]:

# write transformed data to silver layer
try:
    sz_sales_df.write \
        .format("delta") \
        .mode("overwrite") \
        .save("abfss://"+Workspace+"@onelake.dfs.fabric.microsoft.com/"+Lakehouse+".Lakehouse/Tables/sz_sales_transaction")

    print(" Silver layer sales transformation completed successfully!")


except Exception as e:
    print("Unexpected error occurred:", str(e))
    traceback.print_exc()
    

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 20, Finished, Available, Finished)

 Silver layer transformation completed successfully


In [19]:
#test to check is data is saved in golden layer
try:
    spark.read.format("delta")\
              .load("abfss://"+Workspace+"@onelake.dfs.fabric.microsoft.com/"+Lakehouse+".Lakehouse/Tables/sz_sales_transaction")\
              .show(5)

except Exception as e:
    print("Unexpected error occurred:", str(e))
    traceback.print_exc()

StatementMeta(, ea0e78da-c707-4356-8525-a25b2096f02e, 21, Finished, Available, Finished)

+-----------------+----------+-----------------+---------------+-------------------+--------------+----------------+----------+-----------------+---------------+--------------+-------------------+------------+----------+-------------------+------------+---------+----------+-----------+--------------------+----------+-----------+----------+----------+----------+-------------------+
|SaleTransactionID|SalesRepID|     SalesRepName|SalesRepCountry|SalesRepCountryCode|SalesRepRegion|SalesCountryCode|CustomerID|     CustomerName|CustomerCountry|CustomerRegion|CustomerCountryCode|UnitQuantity|SaleAmount|       SaleDatetime|CurrencyCode|Category1| Category2|  Category3|         ProductName|SaleRegion|SaleCountry|Level1Code|Level2Code|Level3Code|SpecificProductCode|
+-----------------+----------+-----------------+---------------+-------------------+--------------+----------------+----------+-----------------+---------------+--------------+-------------------+------------+----------+------------