## Data Transformation for Silver Lakehouse

##### Create Customer_silver dataframe

In [2]:
# Notebook nb_Customers_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *

# define the schema for Customers table

table_name = 'Customers'
customers_schema=StructType([
 StructField('CustomerID', IntegerType(), True), 
 StructField('FirstName', StringType(), True), 
 StructField('LastName', StringType(), True), 
 StructField('FullName', StringType(), True), 
 StructField('DateInserted', StringType(), True)])

# Import the customer data from bronze folder of lakehouse
df = spark.read.format("csv").option("header", "true").schema(customers_schema).load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/customers.csv")

# Adding new columns (IDateInserted, SourceFilename) for data validation for cleanup.
# DateInserted will contain the timestamp when the record was inserted
df = df.withColumn("DateInserted", current_timestamp()) 
df = df.withColumn("SourceFilename",input_file_name())

#Saving the customer data to our Sales_Silver Lakehouse
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/"+table_name)

# Display the first 10 rows of the dataframe to preview your data
display(df.head(10))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0e4c1402-d8e4-48c1-b222-8ae51c71c9ab)

##### Create Employees_silver dataframe

In [3]:

from pyspark.sql.types import *
from pyspark.sql.functions import *

# Import the employees data from bronze folder of lakehouse
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/employees.csv")

 # Create customer_silver dataframe
df = df.withColumn("EmployeeID", df["EmployeeID"].cast("integer"))  
df = df.withColumn("ManagerID", df["ManagerID"].cast("integer"))  
df = df.withColumn("FirstName", df["FirstName"].cast("string"))  
df = df.withColumn("LastName", df["LastName"].cast("string"))  
df = df.withColumn("FullName", df["FullName"].cast("string"))  
df = df.withColumn("JobTitle", df["JobTitle"].cast("string"))  
df = df.withColumn("OrganizationLevel", df["OrganizationLevel"].cast("integer"))  
df = df.withColumn("MaritalStatus", df["MaritalStatus"].cast("string"))  
df = df.withColumn("Gender", df["Gender"].cast("string"))  
df = df.withColumn("Territory", df["Territory"].cast("string"))  
df = df.withColumn("Country", df["Country"].cast("string"))  
df = df.withColumn("Group", df["Group"].cast("string")) 
df = df.withColumn("DateInserted", current_timestamp())  
df = df.withColumn("SourceFilename",input_file_name())

#Writing the employees data to our Sales_Silver Lakehouse
df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Employees")

# Display the first 5 rows of the dataframe to preview your data
display(df.head(5))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 5, Finished, Available, Finished)

##### Create Orders_silver dataframe

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
from pyspark.sql.functions import to_date

# Import the orders data from bronze folder of lakehouse
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/orders.csv")

df = df.withColumn("SalesOrderID", df["SalesOrderID"].cast("integer"))  
df = df.withColumn("SalesOrderDetailID", df["SalesOrderDetailID"].cast("integer"))  
df = df.withColumn('OrderDate', to_date(col('OrderDate'), 'M/d/yyyy').alias('OrderDate').cast('date'))
df = df.withColumn('DueDate', to_date(col('DueDate'), 'M/d/yyyy').alias('DueDate').cast('date'))
df = df.withColumn('ShipDate', to_date(col('ShipDate'), 'M/d/yyyy').alias('ShipDate').cast('date'))
df = df.withColumn("EmployeeID", df["EmployeeID"].cast("integer"))  
df = df.withColumn("CustomerID", df["CustomerID"].cast("integer"))  
df = df.withColumn("SubTotal", df["SubTotal"].cast("double"))  
df = df.withColumn("TaxAmt", df["TaxAmt"].cast("double"))  
df = df.withColumn("Freight", df["Freight"].cast("double"))  
df = df.withColumn("TotalDue", df["TotalDue"].cast("double"))  
df = df.withColumn("ProductID", df["ProductID"].cast("integer"))  
df = df.withColumn("OrderQty", df["OrderQty"].cast("double"))  
df = df.withColumn("UnitPrice", df["UnitPrice"].cast("double"))  
df = df.withColumn("UnitPriceDiscount", df["UnitPriceDiscount"].cast("double"))  
df = df.withColumn("LineTotal", df["LineTotal"].cast("double"))
df = df.withColumn("DateInserted", current_timestamp())  
df = df.withColumn("SourceFilename",input_file_name())

df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Orders")

# Display the first 5 rows of the dataframe to preview your data
display(df.head(5))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 314551e8-b9ba-4454-968f-525a07333ecd)

##### Create Products_silver dataframe

In [6]:
# nb_Product_Bronze_Silver
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Import the product data from bronze folder of lakehouse
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/products.csv")


df = df.withColumn("ProductID", df["ProductID"].cast("integer"))   
df = df.withColumn("ProductNumber", df["ProductNumber"].cast("string"))   
df = df.withColumn("ProductName", df["ProductName"].cast("string"))   
df = df.withColumn("ModelName", df["ModelName"].cast("string"))   
df = df.withColumn("MakeFlag", df["MakeFlag"].cast("smallint"))   
df = df.withColumn("StandardCost", df["StandardCost"].cast("double"))   
df = df.withColumn("ListPrice", df["ListPrice"].cast("double"))  
df = df.withColumn("SubCategoryID", df["SubCategoryID"].cast("smallint"))  
df = df.withColumn("DateInserted", current_timestamp())  
df = df.withColumn("SourceFilename",input_file_name())

df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/Products")

# Display the first 5 rows of the dataframe to preview your data
display(df.head(5))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ae926841-cee5-4ac4-975e-0fa2b69bb098)

##### Create ProductCategories_silver dataframe

In [7]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Import the ProductCategories data from bronze folder of lakehouse
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/productcategories.csv")


df = df.withColumn("CategoryID", df["CategoryID"].cast("integer"))  
df = df.withColumn("CategoryName", df["CategoryName"].cast("string"))  
df = df.withColumn("DateInserted", current_timestamp())   
df = df.withColumn("SourceFilename",input_file_name())

df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/ProductCategories")

# Display the first 5 rows of the dataframe to preview your data
display(df.head(5))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ec1c21f6-0ae7-435b-81d7-8151e94ead08)

##### Create ProductSubCategories_silver dataframe

In [8]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Import the ProductSubCategories data from bronze folder of lakehouse
df = spark.read.format("csv").option("header","true").load("abfss://Sales_Medallion_Project@onelake.dfs.fabric.microsoft.com/Sales_Bronze.Lakehouse/Files/Bronze_raw_data/productsubcategories.csv")
df = df.withColumn("SubCategoryID", df["SubCategoryID"].cast("integer"))  
df = df.withColumn("CategoryID", df["CategoryID"].cast("integer"))  
df = df.withColumn("DateInserted", current_timestamp())  
df = df.withColumn("SourceFilename",input_file_name())


df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("Tables/ProductSubCategories")

# Display the first 5 rows of the dataframe to preview your data
display(df.head(5))

StatementMeta(, 487c4e95-2f84-42a6-95c1-08c5f56d537a, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5ab2f484-a55c-4f8f-9076-5befb8689314)