In [ ]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


In [1]:
df = spark.read.format("csv").option("header","true").load("Files/Bronzebronze_raw_material_purchase/raw_material_purchase.csv")
# df now is a Spark DataFrame containing CSV data from "Files/Bronzebronze_raw_material_purchase/raw_material_purchase.csv".
display(df)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 3, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 05102ca7-01e2-4a22-8090-755c22526f5d)

In [2]:
from pyspark.sql.functions import col, trim, upper
from pyspark.sql.types import IntegerType, DoubleType

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 4, Finished, Available, Finished, False)

In [4]:
df_silver = (
    df
    .withColumn("PurchaseID", trim(upper(col("PurchaseID"))))
    .withColumn("PlantID", trim(upper(col("PlantID"))))
    .withColumn("SupplierID", trim(upper(col("SupplierID"))))
    .withColumn("QtyKG",col("QtyKG").cast(DoubleType()))
    .withColumn("RatePerKG",col("RatePerKG").cast(DoubleType()))
    .withColumn("FreightCost",col("FreightCost").cast(DoubleType()))
)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 6, Finished, Available, Finished, False)

In [5]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 7, Finished, Available, Finished, False)

In [7]:
window_ = Window.partitionBy("PurchaseID").orderBy(desc("Date"))

df_silver = (
    df_silver
    .withColumn("rn", row_number().over(window_))
    .filter(col("rn") == 1)
    .drop("rn")
)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 9, Finished, Available, Finished, False)

In [8]:
df_silver = df_silver.filter(col("QtyKG") > 0)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 10, Finished, Available, Finished, False)

In [9]:
df_silver = df_silver.withColumn(
    "LandedCostPerKG",
    (col("QtyKG") * col("RatePerKG") + col("FreightCost")) / col("QtyKG")
)


StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 11, Finished, Available, Finished, False)

In [10]:
from pyspark.sql.functions import lag
from pyspark.sql.window import Window

window_cost = Window.partitionBy("PlantID", "SupplierID") \
                    .orderBy("Date")

df_silver = df_silver.withColumn(
    "PrevRatePerKG",
    lag("RatePerKG").over(window_cost)
)


StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 12, Finished, Available, Finished, False)

In [11]:
from pyspark.sql.functions import when

df_silver = df_silver.withColumn(
    "CostSpikeFlag",
    when(
        col("PrevRatePerKG").isNotNull() &
        (col("RatePerKG") > col("PrevRatePerKG") * 1.2),
        1
    ).otherwise(0)
)


StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 13, Finished, Available, Finished, False)

In [13]:
display(df_silver)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 15, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, 95e30ade-2aa6-480e-93bc-c61ae9f13260)

In [15]:
silver_path = "abfss://Fabric_Test@onelake.dfs.fabric.microsoft.com/Test_Lakehouse.Lakehouse/Files/Silver Layer Files"
df_silver.write.mode("overwrite") \
    .parquet(f"{silver_path}/silver_RawMaterialPurchase")


StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 17, Finished, Available, Finished, False)

In [16]:
df_check = spark.read.parquet(f"{silver_path}/silver_RawMaterialPurchase")
display(df_check)

StatementMeta(, f79a412f-a051-4a77-a329-14e52e85cd69, 18, Finished, Available, Finished, False)

SynapseWidget(Synapse.DataFrame, f7939876-d1ae-47f2-b3a8-317ab551b0c1)