In [0]:
# Define Bronze and Silver paths for Batch Ingestion
bronze_batch_path = "/mnt/realtimedeai/bronze/batch/"
silver_output_path = "/mnt/realtimedeai/silver/batch/"


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType
from pyspark.sql.functions import col, current_timestamp, to_timestamp

In [0]:



# Explicit schema definition for products table
products_schema = StructType([
    StructField("product_id", LongType(), False),      
    StructField("product_name", StringType(), False),     
    StructField("category", StringType(), True),          
    StructField("base_price", DoubleType(), False),
    StructField("stock", LongType(), False)           
])




In [0]:

#Read Products Data from Bronze
bronze_products_path = bronze_batch_path + "products/"

products_df = (
    spark.read
    .format("parquet")
    .schema(products_schema)
    .load(bronze_products_path)
)
display(products_df)
print("Products data loaded successfully from Bronze.")


In [0]:


# Remove duplicates based on product_id (keeping the first occurrence)
products_deduped_df = products_df.dropDuplicates(["product_id"])

print("Duplicates removed successfully!")


# Filter out invalid rows where base_price or stock is negative
products_filtered_df = (
    products_deduped_df
    .filter(
        (col("base_price") > 0) &
        (col("stock") >= 0)
    )
)

print("Invalid rows removed (no negative prices or stock)!")



# Add processed_time column
products_ready_df = products_filtered_df.withColumn("processed_time", current_timestamp())

print("processed_time added successfully!")
display(products_ready_df)




In [0]:
# Define the Silver output path for products
products_silver_output_path = silver_output_path + "products"

# Write the transformed Products data into Silver layer (Delta format)
(
    products_ready_df
    .write
    .format("delta")                  
    .mode("overwrite")              
    .save(products_silver_output_path)
)

print("Products batch data successfully written to the Silver layer in Delta format!")


In [0]:
# Explicit schema definition for stores table
stores_schema = StructType([
    StructField("store_id", LongType(), False),         
    StructField("store_name", StringType(), False),     
    StructField("region", StringType(), True),
    StructField("city", StringType(), True),             
    StructField("size", StringType(), True)             
     
])

In [0]:
# Bronze path for stores batch data
bronze_stores_path = bronze_batch_path +"stores"

# Read stores data from Bronze with schema enforcement
stores_df = (
    spark.read
    .format("parquet")
    .schema(stores_schema)
    .load(bronze_stores_path)
)

stores_df.printSchema()
print("Stores data loaded successfully with schema enforcement!")


In [0]:


# Remove duplicates based on store_id
stores_deduped_df = stores_df.dropDuplicates(["store_id"])

print("Duplicates removed successfully from stores data!")


# Filter out rows where store_id or store_name is null
stores_filtered_df = (
    stores_deduped_df
    .filter(
        (col("store_id").isNotNull()) &
        (col("store_name").isNotNull())
    )
)

print("Null rows removed successfully from stores!")




# Add processed_time column
stores_ready_df = stores_filtered_df.withColumn("processed_time", current_timestamp())

print("processed_time added successfully to stores data!")
display(stores_ready_df)



In [0]:

# Write Transformed Stores Data to Silver (Delta Format)


# Define Silver output path for stores
stores_silver_output_path = silver_output_path + "stores"

# Write to Silver (Delta)
(
    stores_ready_df
    .write
    .format("delta")
    .mode("overwrite")                  
    .save(stores_silver_output_path)
)

print("Stores batch data written successfully to the Silver layer!")


In [0]:

promotions_schema = StructType([
    StructField("promo_id", LongType(), False),              
    StructField("product_id", LongType(), False),             
    StructField("discount_percent", LongType(), False),       
    StructField("start_date", StringType(), False),           
    StructField("end_date", StringType(), False)              
])


In [0]:
# Read promotion data from Bronze
bronze_promotions_path = bronze_batch_path + "promotions"


promotions_df = (
    spark.read
    .format("parquet")
    .schema(promotions_schema)
    .load(bronze_promotions_path)
)

promotions_df.printSchema()
print("Promotion data loaded successfully with schema enforcement!")


In [0]:

promotions_deduped_df = promotions_df.dropDuplicates(["promo_id"])


promotions_filtered_df = (
    promotions_deduped_df
    .filter(
        (col("promo_id").isNotNull()) &
        (col("product_id").isNotNull()) &
        (col("discount_percent").isNotNull()) &
        (col("start_date").isNotNull()) &
        (col("end_date").isNotNull())
    )
)


promotions_cleaned_df = promotions_filtered_df.filter(col("discount_percent") >= 0)


promotions_ready_df = promotions_cleaned_df.withColumn("processed_time", current_timestamp())

print("Promotions data transformed successfully!")
display(promotions_ready_df)


In [0]:

# Write Transformed Stores Data to Silver (Delta Format)


# Define Silver output path for stores
promotions_silver_output_path = silver_output_path +"promotions"

# Write to Silver (Delta)
(
    promotions_ready_df
    .write
    .format("delta")
    .mode("overwrite")                  
    .save(promotions_silver_output_path)
)

print("Promotions batch data written successfully to the Silver layer!")


In [0]:


# Sales Transactions Table Schema
sales_transactions_schema = StructType([
    StructField("transaction_id", LongType(), False),         
    StructField("product_id", DoubleType(), False),           # FK to products table
    StructField("store_id", DoubleType(), False),             # FK to stores table
    StructField("quantity", LongType(), False),               # Cannot be null, must be > 0
    StructField("price", DoubleType(), False),                # Price of the item sold, cannot be null
    StructField("total_amount", DoubleType(), False),         # Should equal quantity * price
    StructField("timestamp", StringType(), False)             # Transaction timestamp (string → to be casted)
])


In [0]:
# Read sales_transctions data from Bronze
bronze_sales_transactions_path = bronze_batch_path + "sales_transactions"


sales_transactions_df = (
    spark.read
    .format("parquet")
    .schema(sales_transactions_schema)
    .load(bronze_sales_transactions_path)
)

sales_transactions_df.printSchema()
print("sales_transactions data loaded successfully with schema enforcement!")


In [0]:


# Deduplicate by transaction_id
sales_deduped_df = sales_transactions_df.dropDuplicates(["transaction_id"])

#  Null Handling
sales_filtered_df = (
    sales_deduped_df
    .filter(
        (col("transaction_id").isNotNull()) &
        (col("product_id").isNotNull()) &
        (col("store_id").isNotNull()) &
        (col("quantity").isNotNull()) &
        (col("price").isNotNull()) &
        (col("total_amount").isNotNull()) &
        (col("timestamp").isNotNull())
    )
)

# Filter out invalid data 
sales_valid_df = (
    sales_filtered_df
    .filter(
        (col("quantity") > 0) &
        (col("price") > 0) &
        (col("total_amount") > 0)
    )
)

# Cast timestamp to proper TimestampType
sales_timecasted_df = sales_valid_df.withColumn(
    "transaction_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
).drop("timestamp")



# Add processed_time column (for audit)
sales_ready_df = sales_timecasted_df.withColumn("processed_time", current_timestamp())

print("Sales transactions data transformed successfully!")



In [0]:

# Write Transformed sales_transactions Data to Silver (Delta Format)


# Define Silver output path for stores
sales_transactions_silver_output_path = silver_output_path +"sales_transactions"

# Write to Silver (Delta)
(
    sales_ready_df
    .write
    .format("delta")
    .mode("overwrite")                  
    .save(sales_transactions_silver_output_path)
)

print("Sales_transactions batch data written successfully to the Silver layer!")
