### Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import json

In [0]:
tb_name = "order_line_items"
silver_tb_name = "silver_order_line_items"

### Read Raw Tables

In [0]:
order_line_items_schema = StructType([
    StructField("line_item_id", IntegerType(), True),                
    StructField("po_id", IntegerType(), True),          
    StructField("item_id", IntegerType(), True),            
    StructField("item_description", StringType(), True),              
    StructField("item_category", StringType(), True),             
    StructField("quantity_ordered", IntegerType(), True),         
    StructField("unit_price", DoubleType(), True),        
    StructField("total_price", DoubleType(), True),      
    StructField("expected_delivery_date", DateType(), True),     
    StructField("unit_of_measure", StringType(), True),           
    StructField("line_status", StringType(), True),      
    StructField("item_weight_kg", DoubleType(), True),     
    StructField("item_volume_cm3", DoubleType(), True),          
    StructField("discount_applied", StringType(), True),       
    StructField("batch_code", StringType(), True)   
])

audit_schema = StructType([
    StructField("env", StringType(), True),
    StructField("table_name", StringType(), True),
    StructField("source_path", StringType(), True),
    StructField("target_path", StringType(), True),
    StructField("quarantine_path", StringType(), True),
    StructField("load_timestamp", TimestampType(), True),
    StructField("total_records", LongType(), True),
    StructField("passed_records", LongType(), True),
    StructField("quarantine_records", LongType(), True),
    StructField("status", StringType(), True),
    StructField("message", StringType(), True)
])



In [0]:
dbutils.widgets.text("env", "dev")
env = dbutils.widgets.get("env").strip().lower()

In [0]:
with open("/Workspace/Users/avadhootd.business@gmail.com/sum/SCM/config/config.json", "r") as f:
    config = json.load(f)

In [0]:
table_name = config[env][tb_name]["pass_target"]
df_raw = spark.read.table(table_name)

In [0]:
df_raw.display()

In [0]:
df_parsed = df_raw \
    .withColumn("line_item_id", when(col("line_item_id").rlike("^[0-9]+$"), col("line_item_id").cast(IntegerType())).otherwise(None)) \
     .withColumn("po_id", when(col("po_id").rlike("^[0-9]+$"), col("po_id").cast(IntegerType())).otherwise(None)) \
    .withColumn("quantity_ordered",when(col("quantity_ordered").rlike(r"^\d*\.?\d+$"), col("quantity_ordered").cast(DoubleType())).otherwise(None))\
    .withColumn("unit_price",when(col("unit_price").rlike(r"^\d*\.?\d+$"), col("unit_price").cast(DoubleType())).otherwise(None))\
    .withColumn("total_price",when(col("total_price").rlike(r"^\d*\.?\d+$"), col("total_price").cast(DoubleType())).otherwise(None))\
    .withColumn("expected_delivery_date", when(col("expected_delivery_date").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"), col("expected_delivery_date").cast(DateType())).  otherwise(None)) \
    .withColumn("item_weight_kg",when(col("item_weight_kg").rlike(r"^\d*\.?\d+$"), col("item_weight_kg").cast(DoubleType())).otherwise(None))\
    .withColumn("item_volume_cm3",when(col("item_volume_cm3").rlike(r"^\d*\.?\d+$"), col("item_volume_cm3").cast(DoubleType())).otherwise(None))\
    .drop("failed_raw", "failed_reasons", "all_reasons")


### Load DQ rules

In [0]:
with open("/Workspace/Users/avadhootd.business@gmail.com/sum/SCM/config/dq_rules.json", "r") as f:
    dq_config = json.load(f)
rules = dq_config.get(tb_name, {}).get("silver_rules", {})
quarantine_rules = dq_config.get(tb_name, {}).get("quarantine_rules", [])

In [0]:
%run /Workspace/Users/avadhootd.business@gmail.com/sum/SCM/utils/bronze_rule_validation

In [0]:
df_clean, df_quarantine = validate_silver_rules(df_parsed, rules, quarantine_rules)
df_clean = df_clean.drop("quarantine_raw", "quarantine_reasons", "reasons_str")

df_clean.limit(10).display()

In [0]:
df_clean = df_clean.drop("quarantine_raw", "quarantine_reasons", "reasons_str", "payment_terms")\
                   .withColumn("item_category", initcap(lower(col("item_category"))))\
                   .withColumn(
                         "quantity_category",
                         when(col("quantity_ordered") <= 50, "Very Low")
                         .when(col("quantity_ordered") <= 150, "Low")
                         .when(col("quantity_ordered") <= 300, "Medium")
                         .when(col("quantity_ordered") <= 500, "High")
                         .otherwise("Bulk")
                    )\
                    .withColumn(
                         "unit_price_category",
                         when(col("unit_price") < 200, "Cheap")
                         .when(col("unit_price") < 500, "Moderate")
                         .when(col("unit_price") < 800, "Premium")
                         .otherwise("Luxury")
                    )\
                   .withColumn("year", month(col("expected_delivery_date")))\
                   .withColumn("month", month(col("expected_delivery_date")))\
                   .withColumn("day", dayofmonth(col("expected_delivery_date")))\
                   .withColumn(
                         "unit_of_measure_std",
                         when(lower(col("unit_of_measure")).isin("l", "liter", "litre"), "L")
                         .when(lower(col("unit_of_measure")).isin("kg", "kilogram"), "KG")
                         .when(lower(col("unit_of_measure")).isin("unit", "units"), "UNIT")
                         .otherwise(None)
                    )\
                   .withColumn(
                         "line_status_std",
                         when(lower(col("line_status")) == "open", "Open")
                         .when(lower(col("line_status")).isin("cancel", "cancelled"), "Cancelled")
                         .when(lower(col("line_status")) == "close", "Closed")
                         .otherwise(None)
                    )\
                   .withColumn(
                         "item_weight_category",
                         when(col("item_weight_kg") >= 40, "Very Heavy")
                         .when((col("item_weight_kg") >= 20) & (col("item_weight_kg") < 40), "Heavy")
                         .when((col("item_weight_kg") >= 10) & (col("item_weight_kg") < 20), "Medium")
                         .when((col("item_weight_kg") >= 1) & (col("item_weight_kg") < 10), "Light")
                         .otherwise("Unknown")
                    )\
                   .withColumn(
                         "item_volume_category",
                         when(col("item_volume_cm3") >= 1500, "Very Large")
                         .when((col("item_volume_cm3") >= 1000) & (col("item_volume_cm3") < 1500), "Large")
                         .when((col("item_volume_cm3") >= 500) & (col("item_volume_cm3") < 1000), "Medium")
                         .when((col("item_volume_cm3") >= 1) & (col("item_volume_cm3") < 500), "Small")
                         .otherwise("Unknown")
                    )\
                   .withColumn(
                         "discount_applied_clean",
                         when(col("discount_applied").isin("Yes", "Y"), "Yes")
                         .when(col("discount_applied").isin("No", "N"), "No")
                         .otherwise("Unknown") 
                    )\
                   .withColumn("batch_code", upper(col("batch_code"))) \
                         .withColumn("batch_code_valid", 
                                   when(
                                        col("batch_code").rlike(r"^BATCH-[A-Z0-9]{4}$"),
                                        lit(True)
                                   ).otherwise(lit(False)))

In [0]:
df_clean = df_clean.withColumn(
    "watermark_column",
    sha2(concat_ws("||", col("line_item_id").cast("string"), col("po_id"), col("expected_delivery_date").cast("string")), 256)
)

In [0]:
catalog_name = config[env]["catalog"]
silver_table = f"{catalog_name}.silver.silver_order_line_items"
silver_quarantine_table = f"{catalog_name}.silver.silver_order_line_items_quarantine"
silver_supplier_audit_table = f"{catalog_name}.silver.silver_order_line_items_audit"
if spark.catalog.tableExists(silver_table):
    existing_df = spark.table(silver_table).select("watermark_column")
    df_new = df_clean.join(existing_df, on="watermark_column", how="left_anti")
else:
    df_new = df_clean



In [0]:
df_new.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(silver_table)
df_quarantine.write.format("delta").mode("overwrite").saveAsTable(silver_quarantine_table)


In [0]:
# %run /Workspace/Users/avadhootd.business@gmail.com/SCM/utils/send_alert_email


In [0]:
# failed_table = config[env][p_file_name]["failed_target"]
# quarantine_table = config[env][p_file_name]["quarantine_target"]

In [0]:
# app_password = app_password.strip()

# if df_quarantine.count() > 0 or df_failed.count() > 0:
#     send_gmail_email(
#     app_password="qfswthychmohjkim",
#     from_email="avadhootdarbhe@gmail.com",
#     to_email="avadhootd.in@mouritech.com",
#     tables=[failed_table, quarantine_table]
#     )


### Audit Logs

In [0]:
%run /Workspace/Users/avadhootd.business@gmail.com/sum/SCM/utils/audit_status



In [0]:
total = df_parsed.count()
passed = df_clean.count()
quarantine = df_quarantine.count()

status, message = get_audit_status_and_message(total, passed, 0, quarantined=quarantine)

print(status, message)
print(f"Total Records: {total}")
print(f"Passed Records: {passed}")
print(f"Quarantine Records: {quarantine}")

In [0]:
audit_data = [{
    "env": env,
    "table_name": silver_tb_name,
    "source_path": config[env][tb_name]["pass_target"],
    "target_path": silver_table,
    "quarantine_path": silver_quarantine_table,
    "load_timestamp": datetime.now(),
    "total_records": df_parsed.count(),
    "passed_records": df_clean.count(),
    "quarantine_records": df_quarantine.count(),
    "status": status,
    "message": message
}]

audit_df = spark.createDataFrame(audit_data, audit_schema)

audit_df.write.format("delta").mode("append").saveAsTable(silver_supplier_audit_table)