# PySpark Code to Move Data from Silver to Gold

This is an example on how to work with the medallion architecture. From Silver to Gold

## Working with 2020orders_silver information

In [13]:
from pyspark.sql.types import *
import pyspark.sql.functions 
from pyspark.sql import *
from pyspark.sql.functions import sum


StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 15, Finished, Available, Finished)

In [14]:
# Read the data from the silver layer:
df_cleansed_2020orders = spark.read.format("delta").load("abfss://fabric_medallion_arch_demo@onelake.dfs.fabric.microsoft.com/cleansed_test_Silver.Lakehouse/Tables/2020orders_silver")

df_cleansed_2020orders.head(2)

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 16, Finished, Available, Finished)

[Row(ID='SO45376', Count=1, Date='1/5/2020', Name='Edgar Mehta', Style='Mountain-100 Silver, 38', price=3399.99, tax=271.9992),
 Row(ID='SO45381', Count=1, Date='1/6/2020', Name='Jordan Long', Style='Mountain-100 Silver, 38', price=3399.99, tax=271.9992)]

In [15]:
df_cleansed_2020orders = df_cleansed_2020orders.withColumn("tax", df_cleansed_2020orders["tax"].cast("int")) # type to int

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 17, Finished, Available, Finished)

In [16]:
df_cleansed_2020orders.printSchema()

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 18, Finished, Available, Finished)

root
 |-- ID: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tax: integer (nullable = true)



In [17]:
# Group and Aggregate the Data:
df_aggregated = df_cleansed_2020orders.groupBy("Style").agg(sum("price").alias("total_price_vehicles"))
df_aggregated.show(10, truncate=False)
print(df_aggregated)

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 19, Finished, Available, Finished)

+-----------------------+--------------------+
|Style                  |total_price_vehicles|
+-----------------------+--------------------+
|Mountain-200 Black, 42 |196713.42720000003  |
|Mountain-100 Silver, 42|71399.78999999998   |
|Mountain-200 Silver, 42|159499.30919999976  |
|Mountain-100 Silver, 44|71399.78999999998   |
|Road-550-W Yellow, 40  |37016.1875          |
|Mountain-100 Silver, 38|64599.80999999997   |
|Road-250 Red, 48       |256551.75000000044  |
|Road-250 Red, 52       |217458.15000000034  |
|Road-650 Red, 52       |20301.81200000001   |
|Road-250 Black, 52     |253061.25           |
+-----------------------+--------------------+
only showing top 10 rows

DataFrame[Style: string, total_price_vehicles: double]


In [18]:
# Save the cleaned data to the "curated_Gold" table in the Gold lakehouse:
df_aggregated.write.format("delta").mode("overwrite").save("abfss://fabric_medallion_arch_demo@onelake.dfs.fabric.microsoft.com/curated_Gold.Lakehouse/Tables/2020orders_gold")

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 20, Finished, Available, Finished)

## Working with products_silver information

In [19]:
# Read data from the Silver layer
silver_df = spark.read.format("delta").load("abfss://fabric_medallion_arch_demo@onelake.dfs.fabric.microsoft.com/cleansed_test_Silver.Lakehouse/Tables/products_silver")
# Perform transformations (if any)
silver_df = silver_df  # Assuming no transformations for simplicity
# Write data to the Gold layer
silver_df.write.mode("overwrite").format("delta").save("abfss://fabric_medallion_arch_demo@onelake.dfs.fabric.microsoft.com/curated_gold.Lakehouse/Tables/products_silver")

StatementMeta(, 8c75f6dc-9a98-446c-957a-b0fa1e9d3fa5, 21, Finished, Available, Finished)