# Import the files from BronzeLayer to a DataFrame 

In [1]:
 from pyspark.sql.types import *
    
 # Create the schema for the table
 orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
     ])
    
 # Import all files from bronze folder of lakehouse
 df = spark.read.format("csv").option("header", "true").schema(orderSchema)\
    .load("abfss://MicrosoftLearn@onelake.dfs.fabric.microsoft.com/MicrosoftLearn_LH.Lakehouse/Files/BronzeLayer/*.csv")
    
 # Display the first 10 rows of the dataframe to preview your data
 display(df.head(10))

 # The code you ran loaded the data from the CSV files in the bronze folder into a Spark dataframe, 
 # and then displayed the first few rows of the dataframe.

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2e874884-e36b-4338-adda-d3bcc10822d8)

In [2]:
 # imports the necessary functions from PySpark
 from pyspark.sql.functions import when, lit, col, current_timestamp, input_file_name
    
 # Add columns IsFlagged, CreatedTS and ModifiedTS
 # the first line is adding new column to the dataframe so you can track the source file name,
 # the secondline is putting a condition to check whether the order was flagged as being a before the fiscal year of interest or after,
 #the 3rd line is and when the row was created and modified.

 df = df.withColumn("FileName", input_file_name()) \
     .withColumn("IsFlagged", when(col("OrderDate") < '2019-08-01',True).otherwise(False)) \
     .withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())
    
 # Update CustomerName to "Unknown" if CustomerName null or empty
 df = df.withColumn("CustomerName", when((col("CustomerName").isNull() | (col("CustomerName")=="")),lit("Unknown")).otherwise(col("CustomerName")))

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 4, Finished, Available)

In [7]:
 display(df.head(10))

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, c5b88500-aa7c-4fb4-a312-96d238b1ce0e)

In [9]:
# Load the transformed data into a Delta table
table_name = "SalesSilverLayer"  # Replace with your desired table name

df.write.format("delta").mode("overwrite").saveAsTable(table_name)
    
# Display results
display(df.limit(25))

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, ef59af9e-f15b-438e-8912-36a52a5ee8de)

# define the schema for the sales_silver table in the sales database using Delta Lake format.

In [11]:
 # Define the schema for the sales_silver table
    
 from pyspark.sql.types import *
 from delta.tables import *
    
 DeltaTable.createIfNotExists(spark) \
     .tableName("MicrosoftLearn_LH.sales_silver") \
     .addColumn("SalesOrderNumber", StringType()) \
     .addColumn("SalesOrderLineNumber", IntegerType()) \
     .addColumn("OrderDate", DateType()) \
     .addColumn("CustomerName", StringType()) \
     .addColumn("Email", StringType()) \
     .addColumn("Item", StringType()) \
     .addColumn("Quantity", IntegerType()) \
     .addColumn("UnitPrice", FloatType()) \
     .addColumn("Tax", FloatType()) \
     .addColumn("FileName", StringType()) \
     .addColumn("IsFlagged", BooleanType()) \
     .addColumn("CreatedTS", DateType()) \
     .addColumn("ModifiedTS", DateType()) \
     .execute()

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 13, Finished, Available)

<delta.tables.DeltaTable at 0x7ee5a2467400>

In [12]:
# Update existing records and insert new ones based on a condition defined by the columns SalesOrderNumber, OrderDate, CustomerName, and Item.

from delta.tables import *
    
deltaTable = DeltaTable.forPath(spark, 'Tables/sales_silver')
    
dfUpdates = df
    
deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.SalesOrderNumber = updates.SalesOrderNumber and silver.OrderDate = updates.OrderDate and silver.CustomerName = updates.CustomerName and silver.Item = updates.Item'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "SalesOrderNumber": "updates.SalesOrderNumber",
      "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
      "OrderDate": "updates.OrderDate",
      "CustomerName": "updates.CustomerName",
      "Email": "updates.Email",
      "Item": "updates.Item",
      "Quantity": "updates.Quantity",
      "UnitPrice": "updates.UnitPrice",
      "Tax": "updates.Tax",
      "FileName": "updates.FileName",
      "IsFlagged": "updates.IsFlagged",
      "CreatedTS": "updates.CreatedTS",
      "ModifiedTS": "updates.ModifiedTS"
    }
  ) \
  .execute()

StatementMeta(, 874c0a50-d076-494e-b222-39872533e77d, 14, Finished, Available)