In [None]:
import logging
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Helper function to safely execute SQL and log errors
def execute_sql(query: str) -> DataFrame:
    try:
        logger.info(f"Executing SQL: {query}")
        return spark.sql(query)
    except Exception as e:
        logger.error(f"Error executing SQL: {query}, Error: {str(e)}")
        raise

# Step 1: Load Data Sources
try:
    # Load data from Unity Catalog tables
    text_input_df = execute_sql("SELECT * FROM catalog.source_db.text_input_channel")
    manual_date_df = execute_sql("SELECT * FROM catalog.source_db.text_input_manual_date")
    oct_tc3_df = execute_sql("SELECT * FROM catalog.source_db.dbfileinput_oct_tc3")
    week1_df = execute_sql("SELECT * FROM catalog.source_db.dynamic_input_week1")
    week2_df = execute_sql("SELECT * FROM catalog.source_db.dynamic_input_week2")
    week3_df = execute_sql("SELECT * FROM catalog.source_db.dynamic_input_week3")
    week4_df = execute_sql("SELECT * FROM catalog.source_db.dynamic_input_week4")
    week5_df = execute_sql("SELECT * FROM catalog.source_db.dynamic_input_week5")
except Exception as e:
    logger.error(f"Error loading data sources: {str(e)}")
    raise

# Step 2: Apply Transformations
try:
    # Generate current date
    current_date_df = spark.createDataFrame([(datetime.now(),)], ["CurrentDate"])

    # Format current date
    formatted_date_df = current_date_df.withColumn("DateTime_Out", F.date_format("CurrentDate", "yyyy-MM-dd"))

    # Summarize data
    summarized_df = oct_tc3_df.groupBy("BILL_DTE").agg(
        F.sum("Invoices").alias("Sum_Invoices"),
        F.sum("Invoice_Lines").alias("Sum_Invoice_Lines"),
        F.sum("LANDED_COST").alias("Sum_LANDED_COST"),
        F.sum("EXT_FINAL_PRICE").alias("Sum_EXT_FINAL_PRICE"),
        F.sum("Trans_Charge_Amt").alias("Sum_Trans_Charge_Amt"),
        F.sum("RESTOCK_Fee").alias("Sum_RESTOCK_Fee"),
        F.sum("Special_Hndl_Amt").alias("Sum_Special_Hndl_Amt"),
        F.sum("Vendor_Hndl_Amt").alias("Sum_Vendor_Hndl_Amt"),
        F.sum("MOC_Amt").alias("Sum_MOC_Amt"),
        F.sum("Fuel_Surcharge").alias("Sum_Fuel_Surcharge")
    )

    # Calculate date fields
    date_calculations_df = formatted_date_df.withColumn("Prior_Week_Start", F.date_sub("CurrentDate", 7)) \
        .withColumn("Prior_Week_End", F.date_sub("CurrentDate", 1)) \
        .withColumn("Yesterday", F.date_sub("CurrentDate", 1)) \
        .withColumn("Today", F.current_date()) \
        .withColumn("P1M_Start", F.add_months("CurrentDate", -1)) \
        .withColumn("P1M_End", F.current_date()) \
        .withColumn("P2M_Start", F.add_months("CurrentDate", -2)) \
        .withColumn("P2M_End", F.add_months("CurrentDate", -1)) \
        .withColumn("P3M_Start", F.add_months("CurrentDate", -3)) \
        .withColumn("P3M_End", F.add_months("CurrentDate", -2))

    # Select and rename fields
    selected_df = date_calculations_df.select(
        F.col("DateTime_Out").alias("DateTime_Out"),
        F.col("Prior_Week_Start").alias("Start Date"),
        F.col("Prior_Week_End").alias("End Date")
    )

    # Convert date fields to text
    start_txt_df = selected_df.withColumn("StartTXT", F.date_format("Start Date", "yyyy-MM-dd"))
    end_txt_df = selected_df.withColumn("EndTXT", F.date_format("End Date", "yyyy-MM-dd"))

    # Rename fields
    renamed_df = end_txt_df.select(
        F.col("DateTime_Out").alias("Run Date"),
        F.col("EndTXT").alias("EndTXT"),
        F.col("StartTXT").alias("StartTXT"),
        F.col("Start Date").alias("Start Date"),
        F.col("End Date").alias("End Date")
    )

    # Calculate week start and end dates
    week_calculations_df = renamed_df.withColumn("END_1WK", F.date_sub("CurrentDate", 7)) \
        .withColumn("START_2WK", F.date_sub("CurrentDate", 14)) \
        .withColumn("END_2WK", F.date_sub("CurrentDate", 21)) \
        .withColumn("START_3WK", F.date_sub("CurrentDate", 28)) \
        .withColumn("END_3WK", F.date_sub("CurrentDate", 35)) \
        .withColumn("START_4WK", F.date_sub("CurrentDate", 42)) \
        .withColumn("End_4Wk", F.date_sub("CurrentDate", 49)) \
        .withColumn("START_5WK", F.date_sub("CurrentDate", 56))

    # Union data streams
    union_df = week1_df.union(week2_df).union(week3_df).union(week4_df).union(week5_df)

    # Select fields
    final_selected_df = union_df.select(
        "BILL_DATE", "Sum_Invoices", "Sum_Invoice_Lines", "Sum_LANDED_COST", "Sum_EXT_FINAL_PRICE",
        "Sum_Trans_Charge_Amt", "Sum_RESTOCK_Fee", "Sum_Special_Hndl_Amt", "Sum_Vendor_Hndl_Amt",
        "Sum_MOC_Amt", "Sum_Fuel_Surcharge", "SO_AUDAT", "FKDAT", "WERKS", "VTWEG", "ZZFINCLASS",
        "BEZEK", "SOLDTO_KUNNR", "SHIPTO_KUNNR", "VGBEL", "Distinct_VBELN_Count", "BILL_ITM_COUNT",
        "UNIT_LAND_COST", "SO_NETWR", "SO_NETPR", "FKLMG", "FKIMG", "ZTV2", "ZSS2_M2", "ZSSH", "ZSM2",
        "ZTR2", "ZTR1", "ZSRF", "ZSRM", "ZH01", "ZTHM", "ZSMO", "ZSSM_F_ZTHM", "ZMT1", "ZVC12M1M3NM",
        "ZMGO", "ZMGB", "EXTND_LAND_CST", "XTND_INVOICE_PRICE_PER_ITM", "EXTND_FNL_PRICE",
        "EXTND_SERVC_FEE", "EXTND_SHPNG_HNDLNG", "XTND_STATE_TX", "EXTND_LCL_TX",
        "RF_TRNSCT_ABSRB_CHRGE_AMT_ZTV2", "VNDR_DRP_SHP_ABSRB_AMT_ZSS2_M2", "EX_HDNL_DRP_SHP_VAL_ZSSH",
        "RF_VENDOR_MOC_ABSRB_AMT_ZSM2", "RF_TRNSCT_ABSORB_CHARGE_AMT_ZTR2", "TRANS_CHRGS_FRT_ZTR1",
        "RESTOOCKING_FEE_ZSRF", "RESTOCK_FEE_MANUAL_ZSRM", "SPCL_HNDLNG_CHRG_FX_ZH01",
        "VNDR_HNDLNG_AMT_ZTHM", "MIN_ORDER_CHARGE_USD_ZSMO", "MOC_DROP_SHP_VAL_ZSSM",
        "FUEL_SURCHARGE_ZSDF", "FUEL_SURCHARGE_OVERIDE_ZSDO", "VENDR_TRANS_CHRG_FRT_ZTV1",
        "RF_VNDR_DRP_SHP_FEE_AMT_ZSSM_F_ZTHM", "MARKUP_VENDOR_TRANS_FEE_AMT_ZMT1", "MARK_UP_DND_ZMVT",
        "ADDTN_TRANS_FEE_ZSRH", "ADDTN_TRANS_FEE_OVRRIDE_ZSRO", "DROPSHIP_FEE_VALUE_ZSSF",
        "XTND_N_VLNK_SVC_FEE_ZVC12M1M3NM", "ONSITE_REP_FEE_DLR_ZMGO", "BULK_DIST_FEE_DLR_ZMGB",
        "HLDY_DLVRY_FEE_DLR_ZMGD", "LOW_UOM_DIST_FEE_DLR_ZMGL", "GOV_DIST_FEE_DLR_ZMGN",
        "BCF_RF_EXTND_VLINK_SVC_FEE", "lines", "AVG_INVOICE_PRICE", "EXTND_FNL_PRICE1", "VBRP_BRGEW",
        "VBRP_VOLUM", "RF_TRNSCT_CHARGE_AMT_ZTRM", "RF_SPECIAL_HNDLNG_CHARGE_AMT_ZH01",
        "RF_MIN_ORDER_CHARGE_AMT_ZSMO"
    )

    # Rename fields
    renamed_fields_df = final_selected_df.withColumnRenamed("SO_AUDAT", "SO_Date") \
        .withColumnRenamed("FKDAT", "BILL_DATE") \
        .withColumnRenamed("WERKS", "Whs") \
        .withColumnRenamed("VTWEG", "DIST_CHNL_ID") \
        .withColumnRenamed("ZZFINCLASS", "FNC_ID") \
        .withColumnRenamed("BEZEK", "FNC_DESC") \
        .withColumnRenamed("SOLDTO_KUNNR", "SOLDTO") \
        .withColumnRenamed("SHIPTO_KUNNR", "SHIPTO") \
        .withColumnRenamed("VGBEL", "RFRNC_DOC_NUM") \
        .withColumnRenamed("lines", "Invoice_lines")

    # Apply formula to replace null or empty values
    formula_df = renamed_fields_df.fillna(0)

    # Transform fields to uppercase
    uppercase_df = formula_df.select([F.upper(F.col(c)).alias(c) for c in formula_df.columns])

    # Calculate rush order fee
    rush_fee_df = uppercase_df.withColumn("Rush_Order_Fee", F.col("ADDTN_TRANS_FEE_OVRRIDE_ZSRO"))

    # Perform summarization
    final_summarized_df = rush_fee_df.groupBy("SO_Date", "BILL_DATE", "Whs", "DIST_CHNL_ID", "FNC_ID", "FNC_DESC", "SOLDTO", "SHIPTO", "RFRNC_DOC_NUM").agg(
        F.sum("Invoice_lines").alias("Sum_Invoice_Lines"),
        F.sum("Rush_Order_Fee").alias("Sum_Rush_Order_Fee"),
        F.sum("AVG_INVOICE_PRICE").alias("Sum_AVG_INVOICE_PRICE"),
        F.sum("EXTND_FNL_PRICE1").alias("Sum_EXTND_FNL_PRICE1"),
        F.sum("VBRP_BRGEW").alias("Sum_VBRP_BRGEW"),
        F.sum("VBRP_VOLUM").alias("Sum_VBRP_VOLUM"),
        F.sum("RF_TRNSCT_ABSRB_CHRGE_AMT_ZTV2").alias("Sum_RF_TRNSCT_ABSRB_CHRGE_AMT_ZTV2"),
        F.sum("VNDR_DRP_SHP_ABSRB_AMT_ZSS2_M2").alias("Sum_VNDR_DRP_SHP_ABSRB_AMT_ZSS2_M2"),
        F.sum("EX_HDNL_DRP_SHP_VAL_ZSSH").alias("Sum_EX_HDNL_DRP_SHP_VAL_ZSSH"),
        F.sum("RF_VENDOR_MOC_ABSRB_AMT_ZSM2").alias("Sum_RF_VENDOR_MOC_ABSRB_AMT_ZSM2"),
        F.sum("RF_TRNSCT_ABSORB_CHARGE_AMT_ZTR2").alias("Sum_RF_TRNSCT_ABSORB_CHARGE_AMT_ZTR2"),
        F.sum("RF_TRNSCT_CHARGE_AMT_ZTRM").alias("Sum_RF_TRNSCT_CHARGE_AMT_ZTRM"),
        F.sum("RF_SPECIAL_HNDLNG_CHARGE_AMT_ZH01").alias("Sum_RF_SPECIAL_HNDLNG_CHARGE_AMT_ZH01"),
        F.sum("RF_MIN_ORDER_CHARGE_AMT_ZSMO").alias("Sum_RF_MIN_ORDER_CHARGE_AMT_ZSMO"),
        F.sum("TRANS_CHRGS_FRT_ZTR1").alias("Sum_TRANS_CHRGS_FRT_ZTR1"),
        F.sum("RESTOOCKING_FEE_ZSRF").alias("Sum_RESTOOCKING_FEE_ZSRF"),
        F.sum("RESTOCK_FEE_MANUAL_ZSRM").alias("Sum_RESTOCK_FEE_MANUAL_ZSRM"),
        F.sum("SPCL_HNDLNG_CHRG_FX_ZH01").alias("Sum_SPCL_HNDLNG_CHRG_FX_ZH01"),
        F.sum("VNDR_HNDLNG_AMT_ZTHM").alias("Sum_VNDR_HNDLNG_AMT_ZTHM"),
        F.sum("MIN_ORDER_CHARGE_USD_ZSMO").alias("Sum_MIN_ORDER_CHARGE_USD_ZSMO"),
        F.sum("MOC_DROP_SHP_VAL_ZSSM").alias("Sum_MOC_DROP_SHP_VAL_ZSSM"),
        F.sum("FUEL_SURCHARGE_ZSDF").alias("Sum_FUEL_SURCHARGE_ZSDF"),
        F.sum("FUEL_SURCHARGE_OVERIDE_ZSDO").alias("Sum_FUEL_SURCHARGE_OVERIDE_ZSDO"),
        F.sum("VENDR_TRANS_CHRG_FRT_ZTV1").alias("Sum_VENDR_TRANS_CHRG_FRT_ZTV1"),
        F.sum("RF_VNDR_DRP_SHP_FEE_AMT_ZSSM_F_ZTHM").alias("Sum_RF_VNDR_DRP_SHP_FEE_AMT_ZSSM_F_ZTHM"),
        F.sum("MARKUP_VENDOR_TRANS_FEE_AMT_ZMT1").alias("Sum_MARKUP_VENDOR_TRANS_FEE_AMT_ZMT1"),
        F.sum("MARK_UP_DND_ZMVT").alias("Sum_MARK_UP_DND_ZMVT"),
        F.sum("ADDTN_TRANS_FEE_ZSRH").alias("Sum_ADDTN_TRANS_FEE_ZSRH"),
        F.sum("ADDTN_TRANS_FEE_OVRRIDE_ZSRO").alias("Sum_ADDTN_TRANS_FEE_OVRRIDE_ZSRO"),
        F.sum("DROPSHIP_FEE_VALUE_ZSSF").alias("Sum_DROPSHIP_FEE_VALUE_ZSSF"),
        F.sum("XTND_N_VLNK_SVC_FEE_ZVC12M1M3NM").alias("Sum_XTND_N_VLNK_SVC_FEE_ZVC12M1M3NM"),
        F.sum("ONSITE_REP_FEE_DLR_ZMGO").alias("Sum_ONSITE_REP_FEE_DLR_ZMGO"),
        F.sum("BULK_DIST_FEE_DLR_ZMGB").alias("Sum_BULK_DIST_FEE_DLR_ZMGB"),
        F.sum("HLDY_DLVRY_FEE_DLR_ZMGD").alias("Sum_HLDY_DLVRY_FEE_DLR_ZMGD"),
        F.sum("LOW_UOM_DIST_FEE_DLR_ZMGL").alias("Sum_LOW_UOM_DIST_FEE_DLR_ZMGL"),
        F.sum("GOV_DIST_FEE_DLR_ZMGN").alias("Sum_GOV_DIST_FEE_DLR_ZMGN"),
        F.sum("BCF_RF_EXTND_VLINK_SVC_FEE").alias("Sum_BCF_RF_EXTND_VLINK_SVC_FEE"),
        F.sum("Sum_Invoices").alias("Sum_Sum_Invoices"),
        F.sum("Sum_Invoice_Lines").alias("Sum_Sum_Invoice_Lines"),
        F.sum("Sum_LANDED_COST").alias("Sum_Sum_LANDED_COST"),
        F.sum("Sum_EXT_FINAL_PRICE").alias("Sum_Sum_EXT_FINAL_PRICE"),
        F.sum("Sum_Trans_Charge_Amt").alias("Sum_Sum_Trans_Charge_Amt"),
        F.sum("Sum_RESTOCK_Fee").alias("Sum_Sum_RESTOCK_Fee"),
        F.sum("Sum_Special_Hndl_Amt").alias("Sum_Sum_Special_Hndl_Amt"),
        F.sum("Sum_Vendor_Hndl_Amt").alias("Sum_Sum_Vendor_Hndl_Amt"),
        F.sum("Sum_MOC_Amt").alias("Sum_Sum_MOC_Amt"),
        F.sum("Sum_Fuel_Surcharge").alias("Sum_Sum_Fuel_Surcharge")
    )

    # Remove prefix from fields
    dynamic_rename_df = final_summarized_df.select(
        [F.col(c).alias(c.replace("Sum_", "")) for c in final_summarized_df.columns]
    )

    # Append fields
    appended_df = dynamic_rename_df.withColumn("BIA_SHIP_HNDL_AMT", F.col("Trans_Charge_Amt")) \
        .withColumn("COE_SHIP_HNDL_AMT", F.col("Trans_Charge_Amt"))

    # Calculate invoice sales
    invoice_sales_df = appended_df.withColumn("Invoice_Sales", F.col("EXT_FINAL_PRICE"))

    # Join data
    joined_df = invoice_sales_df.join(
        execute_sql("SELECT * FROM catalog.source_db.dist_channel"),
        invoice_sales_df["DIST_CHNL_ID"] == execute_sql("SELECT * FROM catalog.source_db.dist_channel")["DIST_CHNL_ID"],
        "inner"
    ).select(
        invoice_sales_df["DIST_CHNL_ID"],
        execute_sql("SELECT * FROM catalog.source_db.dist_channel")["DIST_CHNL"],
        execute_sql("SELECT * FROM catalog.source_db.dist_channel")["DIST_CHNL_DESC"],
        invoice_sales_df["Invoice_Sales"]
    )

    # Union data
    union_final_df = joined_df.union(
        execute_sql("SELECT * FROM catalog.source_db.additional_data")
    )

    # Apply formula for null identification
    null_identification_df = union_final_df.withColumn("null_yn", F.when(F.col("DIST_CHNL_ID").isNull(), 1).otherwise(0))

    # Filter data
    filtered_df = null_identification_df.filter(F.col("null_yn") == 0)

    # Update null values
    updated_null_df = filtered_df.withColumn("FNC_ID", F.when(F.col("FNC_ID").isNull(), "UNKNOWN").otherwise(F.col("FNC_ID"))) \
        .withColumn("FNC_DESC", F.when(F.col("FNC_DESC").isNull(), "UNKNOWN").otherwise(F.col("FNC_DESC")))

    # Union data by name
    union_by_name_df = updated_null_df.unionByName(
        execute_sql("SELECT * FROM catalog.source_db.final_data")
    )

    # Select fields
    final_output_df = union_by_name_df.select(
        "DIST_CHNL_ID", "DIST_CHNL", "DIST_CHNL_DESC", "Invoice_Sales", "FNC_ID", "FNC_DESC"
    )

    # Select and rename fields
    final_renamed_df = final_output_df.select(
        F.col("Run Date").alias("Run Date"),
        F.col("SO_Date").alias("SO_Date"),
        F.col("BILL_DATE").alias("BILL_DATE"),
        F.col("Whs").alias("Whs"),
        F.col("DIST_CHNL_ID").alias("DIST_CHNL_ID"),
        F.col("FNC_ID").alias("FNC_ID"),
        F.col("FNC_DESC").alias("FNC_DESC"),
        F.col("SOLDTO").alias("SOLDTO"),
        F.col("SHIPTO").alias("SHIPTO"),
        F.col("RFRNC_DOC_NUM").alias("RFRNC_DOC_NUM"),
        F.col("Rush_Order_Fee").alias("Rush_Order_Fee"),
        F.col("COE_SHIP_HNDL_AMT").alias("COE_SHIP_HNDL_AMT")
    )
except Exception as e:
    logger.error(f"Error during transformations: {str(e)}")
    raise

# Step 3: Output Data
try:
    # Ensure schema exists before creating table
    target_catalog = "catalog_name"
    target_schema = "schema_name"
    target_table = "table_name"

    # Create schema if it doesn't exist
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{target_schema}")
    logger.info(f"Schema {target_catalog}.{target_schema} ensured")

    # Write to Unity Catalog target table (overwrite mode handles table replacement)
    final_renamed_df.write.format("delta").mode("overwrite").saveAsTable(f"{target_catalog}.{target_schema}.{target_table}")
    logger.info(f"Data written to {target_catalog}.{target_schema}.{target_table}")
except Exception as e:
    logger.error(f"Error writing output data: {str(e)}")
    raise
