In [0]:
from itertools import chain
from pyspark.sql.functions import to_date, date_format, coalesce, col,count, when, create_map, lit
from pyspark.sql.types import DateType
import os

creating spark section

In [0]:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DeltaTableCheckAndAppend").getOrCreate()

# Customer

In [0]:
# Define paths
bronze_path = "abfss://bronze@scmdataset2025.dfs.core.windows.net/Static_Data/Customer_Data.csv"
silver_path = "abfss://silver@scmdataset2025.dfs.core.windows.net/Customer"

# Function to check if Delta table exists
def delta_table_exists(path):
    try:
        files = dbutils.fs.ls(path)
        return any(file.name == "_delta_log/" for file in files)
    except Exception as e:
        print(f"Failed to access path: {e}")
        return False

# If Delta table already exists, exit early
if delta_table_exists(silver_path):
    print("Delta table already exists in the silver layer.")
else:
    # Read the CSV file
    df_customers = spark.read.format("csv").options(header="true", inferSchema="true").load(bronze_path)

    # Remove duplicates based on Customer_ID
    df_customers = df_customers.dropDuplicates(["Customer_ID"])

    # Drop the "Customer_Name" column
    df_customers = df_customers.drop("Customer_Name")

    # Write the DataFrame to Delta table in the silver layer
    df_customers.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(silver_path)

    # Optimize Delta table to maintain a single file
    spark.sql(f"OPTIMIZE delta.`{silver_path}`")

    print("File saved in the silver layer")
    print("Data processed successfully.")


#product

In [0]:

# Define the Delta table path
delta_path = "abfss://silver@scmdataset2025.dfs.core.windows.net/Product"

# Check if _delta_log directory exists
def delta_table_exists(path):
    try:
        files = dbutils.fs.ls(path)
        return any(file.name == "_delta_log/" for file in files)
    except Exception as e:
        print(f"Failed to access path: {e}")
        return False
    
# if delta_log not exist
if not delta_table_exists(delta_path):
    # Read the CSV file
    df_product = spark.read.format("csv")\
        .options(header="true")\
        .options(inferSchema="true")\
        .load("abfss://bronze@scmdataset2025.dfs.core.windows.net/Static_Data/Product_Data.csv")

    # Create a temporary view for the Product data
    df_product.createOrReplaceTempView("temp_product_view")

    # Drop duplicates based on Product_ID
    df_product = df_product.dropDuplicates(["Product_ID"])
    display(df_product)

    # Write data to Delta table 
    print("Writing data to Delta table...")
    df_product.write.format("delta")\
        .mode("overwrite")\
        .option("mergeSchema", "true")\
        .save(delta_path)

    # Optimize Delta table to maintain a single file
    spark.sql(f"OPTIMIZE delta.`{delta_path}`")

    print("Data processed successfully.")
else:
    # If Delta table already exists, do nothing
    print("Delta table already exists. Skipping data processing.")

# Warehouse

In [0]:
base_path = "abfss://bronze@scmdataset2025.dfs.core.windows.net/Stream_Data/Warehouse/"

df_warehouse = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/warehouse_schema/")
      .option("cloudFiles.inferColumnTypes", "true") 
      .load(base_path)  
     )

#creating tempview
df_warehouse.createOrReplaceTempView("temp_warehouse_view")

# Adjust the date format in the 'Date' column
df_warehouse = df_warehouse.withColumn(
    "Date",
    date_format(
        coalesce(
            to_date(col("Date"), "dd-MM-yyyy"),
            to_date(col("Date"), "dd MM yyyy"),
            to_date(col("Date"), "yyyy-MM-dd"),
            to_date(col("Date"), "yyyy-dd-MM"),
            to_date(col("Date"), "yyyy_dd_MM"),
            to_date(col("Date"), "dd_MM_yyyy")
        ),
        "dd-MM-yyyy"
    )
)
df_warehouse = df_warehouse.withColumn("Date", col("Date").cast(DateType()))


# Create a mapping dictionary for Warehouse ID and Location
warehouse_mapping = {
    "W001": "New_Delhi",
    "W002": "Lucknow",
    "W003": "Chandigarh",
    "W004": "Mumbai",
    "W005": "Ahmedabad",
    "W006": "Jaipur",
    "W007": "Kolkata",
    "W008": "Bhubaneswar",
    "W009": "Patna",
    "W010": "Bhopal",
    "W011": "Raipur",
    "W012": "Ranchi",
    "W013": "Bangalore",
    "W014": "Hyderabad",
    "W015": "Chennai"
}

# Convert the dictionary to a PySpark map
mapping_expr = create_map([lit(x) for x in chain(*warehouse_mapping.items())])

# Replace values directly in the Warehouse_Location column
df_warehouse = df_warehouse.withColumn(
    "Warehouse_Location",
    when(
        col("Warehouse_ID").isNotNull() & (df_warehouse["Warehouse_Location"] != mapping_expr.getItem(col("Warehouse_ID"))),
        mapping_expr.getItem(col("Warehouse_ID"))
    ).otherwise(col("Warehouse_Location"))
)

# save file in silver layer
df_warehouse.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/warehouse_load/") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .start("abfss://silver@scmdataset2025.dfs.core.windows.net/Warehouse/") \
    .awaitTermination()


#Supply dataset

In [0]:
base_path = "abfss://bronze@scmdataset2025.dfs.core.windows.net/Stream_Data/Supply/"

df_supply = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Supply_schema/")
      .option("cloudFiles.inferColumnTypes", "true")  
      .load(base_path)  
     )

df_supply.createOrReplaceTempView("temp_view_supply")

#drop supplier name column
df_1= df_supply.drop("Supplier_Name")

# Adjust the date format in the 'Date' column
df_supply = df_1.withColumn(
    "Date",
    date_format(
        coalesce(
            to_date(col("Date"), "dd-MM-yyyy"),
            to_date(col("Date"), "dd MM yyyy"),
            to_date(col("Date"), "yyyy-MM-dd"),
            to_date(col("Date"), "yyyy-dd-MM"),
            to_date(col("Date"), "yyyy_dd_MM"),
            to_date(col("Date"), "dd_MM_yyyy")
        ),
        "dd-MM-yyyy"
    )
)
df_supply = df_supply.withColumn("Date", col("Date").cast(DateType()))



# Define the correct mapping of Supplier_ID to Supplier_State
correct_mapping = {
    "S001": "Delhi",
    "S002": "Maharashtra",
    "S003": "West_Bengal",
    "S004": "Madhya_Pradesh",
    "S005": "Karnataka"
}

# Create a DataFrame with the correct mapping
mapping_df = spark.createDataFrame(
    [(k, v) for k, v in correct_mapping.items()],
    ["Supplier_ID", "Correct_Supplier_State"]
)

# Join the original DataFrame with the mapping DataFrame
df_check = df_supply.join(mapping_df, on="Supplier_ID", how="left")

# Replace incorrect values in 'Supplier_State' directly
df_check = df_check.withColumn(
    "Supplier_State",
    when(df_check["Supplier_State"] != df_check["Correct_Supplier_State"], df_check["Correct_Supplier_State"])
    .otherwise(df_check["Supplier_State"])
).drop("Correct_Supplier_State")  # Remove the extra column



df_check.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Supply_load/") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .start("abfss://silver@scmdataset2025.dfs.core.windows.net/Supply/") \
    .awaitTermination()



#Orders

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType, StructType, StructField, StringType, IntegerType, DoubleType
import json

base_path = "abfss://bronze@scmdataset2025.dfs.core.windows.net/Stream_Data/Orders/"
df_orders = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Orders_schema/")
      .option("cloudFiles.inferColumnTypes", "true")
      .load(base_path)  
     )

df_orders.createOrReplaceTempView("temp_view_supply")

df_orders = df_orders.withColumn("Order_Date", col("Order_Date").cast(DateType()))

df_orders = df_orders.dropDuplicates()

df_orders.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Orders_load/") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .start("abfss://silver@scmdataset2025.dfs.core.windows.net/Orders/") \
    .awaitTermination()

#Logistics

In [0]:
base_path = "abfss://bronze@scmdataset2025.dfs.core.windows.net/Stream_Data/Logistics/"

df_Logistics = (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.schemaLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Logistics_schema/")
      .option("cloudFiles.inferColumnTypes", "true") 
      .load(base_path)  
     )


df_Logistics.writeStream \
    .format("delta") \
    .option("checkpointLocation", "abfss://table@scmdataset2025.dfs.core.windows.net/Logistics_load/") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .start("abfss://silver@scmdataset2025.dfs.core.windows.net/Logistics/") \
    .awaitTermination()