In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_timestamp

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("DIFProcessedHelperTest").getOrCreate()

# Mock the DIFLogger class to simply print logs
class DIFLogger:
    def __init__(self):
        pass

    def info(self, message):
        print(f"INFO: {message}")

    def debug(self, message):
        print(f"DEBUG: {message}")

    def error(self, message):
        print(f"ERROR: {message}")

    def warn(self, message):
        print(f"WARNING: {message}")


# Mock Asset with Fields and Schema
mockAsset = {
    "Processed_Table_Name": "mock_processed_table",
    "Load_Type_Code": "T1-All-Replace",  # Example load type
    "BusinessDate": "2025-04-17",  # Example business date
    "IngestFile": "file1.csv",  # Example source file name
    "Fields": [
        {"Field_Sequence_Number": 1, "Target_Field_Name": "SalesDate", "IsKey_Indicator": "Y", "Target_Data_Type_Code": "STRING"},
        {"Field_Sequence_Number": 2, "Target_Field_Name": "SKU", "IsKey_Indicator": "Y", "Target_Data_Type_Code": "STRING"},
        {"Field_Sequence_Number": 3, "Target_Field_Name": "Units", "IsKey_Indicator": "N", "Target_Data_Type_Code": "INTEGER"},
        {"Field_Sequence_Number": 4, "Target_Field_Name": "DIFSourceFile", "IsKey_Indicator": "N", "Target_Data_Type_Code": "STRING"},
        {"Field_Sequence_Number": 5, "Target_Field_Name": "BusinessDate", "IsKey_Indicator": "N", "Target_Data_Type_Code": "STRING"}
    ]
}

# Create the target DataFrame (dfTarget) to simulate the Processed Table (for two days of data)
data_target = [
    ("2024-01-12", "A", 100, "file1.csv", "2024-01-12"),
    ("2024-01-12", "B", 200, "file1.csv", "2024-01-12"),
    ("2024-01-13", "A", 150, "file2.csv", "2024-01-13"),
    ("2024-01-13", "B", 250, "file2.csv", "2024-01-13")
]

# Define schema based on mockAsset["Fields"]
schema_target = StructType([
    StructField("SalesDate", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Units", IntegerType(), True),
    StructField("DIFSourceFile", StringType(), True),
    StructField("BusinessDate", StringType(), True)
])

dfTarget = spark.createDataFrame(data_target, schema_target)

# Create the incoming DataFrame (dfIncoming) to simulate the new incoming data (for two days of data)
data_incoming = [
    ("2024-01-12", "A", 110, "file1.csv", "2024-01-12"),
    ("2024-01-13", "B", 220, "file1.csv", "2024-01-13"),
    ("2024-01-14", "C", 300, "file2.csv", "2024-01-14")
]

# Define schema for incoming data
schema_incoming = StructType([
    StructField("SalesDate", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("Units", IntegerType(), True),
    StructField("DIFSourceFile", StringType(), True),
    StructField("BusinessDate", StringType(), True)
])

dfIncoming = spark.createDataFrame(data_incoming, schema_incoming)

# Define the DIFProcessedTableHelperTest class (for testing)
class DIFProcessedTableHelperTest:
    def __init__(self, pEnvConfig, pAssetGroupConfig, pLogger, pSpark, pAsset):
        self.aEnvConfig = pEnvConfig
        self.aAssetGroupConfig = pAssetGroupConfig
        self.aLogger = pLogger
        self.aSpark = pSpark
        self.aAsset = pAsset
        self.aInstructions = {"Description": "T1-All-Replace", "Load_Type_Code": "T1-All-Replace", "FromToColumns": True, "Delete": "All"}

    def applyTempRecordsToTTable(self, pTimeStamp, dfIncoming, dfTarget):
        try:
            self.aLogger.info("applyTempRecordsToTTable.Start")

            # Get the main processed table (T Table)
            mainTable = self.aAsset["Processed_Table_Name"]
            self.aLogger.info("Processing table: " + mainTable)

            # Add CDC_LOAD_CODE and LOAD_TS to incoming data before processing
            dfIncoming = dfIncoming.withColumn("CDC_LOAD_CODE", lit("I"))\
                .withColumn("LOAD_TS", to_timestamp(lit(pTimeStamp), 'yyyy-MM-dd HH:mm:ss'))

            # Handle From/To and Updates (if applicable)
            if self.aInstructions["FromToColumns"]:
                # Update the records where SalesDate and SKU match
                dfUpdates = dfIncoming.join(dfTarget, on=["SKU", "SalesDate"], how="inner")\
                    .filter(dfTarget["Units"] != dfIncoming["Units"])

                dfUpdates = dfUpdates.withColumn("CDC_LOAD_CODE", lit("U"))\
                    .withColumn("SalesDate", lit(self.aAsset["BusinessDate"]))\
                    .withColumn("LOAD_TS", to_timestamp(lit(pTimeStamp), 'yyyy-MM-dd HH:mm:ss'))

                # Remove old data and add the updated data
                dfTarget = dfTarget.subtract(dfUpdates)  # Remove old data that's being updated
                dfTarget = dfTarget.union(dfUpdates)  # Add updated data

            # Handle From/To + Deletes (if applicable)
            if self.aInstructions["FromToColumns"]:
                # Mark records for deletion where CDC_LOAD_CODE = 'I' and END_TS = '2999-12-31'
                dfDeletes = dfIncoming.join(dfTarget, on=["SKU", "SalesDate"], how="left_anti")

                dfDeletes = dfDeletes.withColumn("CDC_LOAD_CODE", lit("D"))\
                    .withColumn("SalesDate", lit(self.aAsset["BusinessDate"]))\
                    .withColumn("LOAD_TS", to_timestamp(lit(pTimeStamp), 'yyyy-MM-dd HH:mm:ss'))

                # Remove deleted records from T Table
                dfTarget = dfTarget.subtract(dfDeletes)  # Remove records to be deleted
                dfTarget = dfTarget.union(dfDeletes)  # Add deleted records

            # Handle Missing Keys: Mark as Deleted (if applicable)
            if self.aInstructions["Delete"] == "MissingKeys":
                missingKeys = dfTarget.join(dfIncoming, on=["SKU"], how="left_anti")

                missingKeys = missingKeys.withColumn("CDC_LOAD_CODE", lit("D"))\
                    .withColumn("LOAD_TS", to_timestamp(lit(pTimeStamp), 'yyyy-MM-dd HH:mm:ss'))\
                    .withColumn("SalesDate", lit(self.aAsset["BusinessDate"]))

                # Update T Table for missing keys (soft delete)
                dfTarget = dfTarget.subtract(missingKeys)  # Remove missing keys
                dfTarget = dfTarget.union(missingKeys)  # Add missing keys as deleted

            # Finally, add the new data to the T Table (Insert)
            dfInserts = dfIncoming.withColumn("CDC_LOAD_CODE", lit("I"))\
                .withColumn("LOAD_TS", to_timestamp(lit(pTimeStamp), 'yyyy-MM-dd HH:mm:ss'))

            # Add the new data to T Table
            dfTarget = dfTarget.union(dfInserts)

            self.aLogger.info("applyTempRecordsToTTable.End")
            return dfTarget

        except Exception as ex:
            self.aLogger.error("applyTempRecordsToTTable.Error: " + str(ex))
            raise Exception(str(ex))


# Initialize the mock logger
mockLogger = DIFLogger()

# Instantiate the helper class with mock configurations
helper = DIFProcessedTableHelperTest(mockEnvConfig, mockAssetGroupConfig, mockLogger, spark, mockAsset)

# Run the test example
processedFinal = helper.applyTempRecordsToTTable("2024-01-15 00:00:00", dfIncoming, dfTarget)

# Show the processed result
processedFinal.show()

INFO: applyTempRecordsToTTable.Start
INFO: Processing table: mock_processed_table
ERROR: applyTempRecordsToTTable.Error: [NUM_COLUMNS_MISMATCH] EXCEPT can only be performed on inputs with the same number of columns, but the first input has 5 columns and the second input has 10 columns.;
'Except false
:- LogicalRDD [SalesDate#97, SKU#98, Units#99, DIFSourceFile#100, BusinessDate#101], false
+- Project [SKU#108, SalesDate#153, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#142, to_timestamp(2024-01-15 00:00:00, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Asia/Kolkata), false) AS LOAD_TS#164, Units#177, DIFSourceFile#178, BusinessDate#179]
   +- Project [SKU#108, 2025-04-17 AS SalesDate#153, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#142, LOAD_TS#124, Units#177, DIFSourceFile#178, BusinessDate#179]
      +- Project [SKU#108, SalesDate#107, Units#109, DIFSourceFile#110, BusinessDate#111, U AS CDC_LOAD_CODE#142, LOAD_TS#124, Units#177, DIFSourceFile#178,

Exception: [NUM_COLUMNS_MISMATCH] EXCEPT can only be performed on inputs with the same number of columns, but the first input has 5 columns and the second input has 10 columns.;
'Except false
:- LogicalRDD [SalesDate#97, SKU#98, Units#99, DIFSourceFile#100, BusinessDate#101], false
+- Project [SKU#108, SalesDate#153, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#142, to_timestamp(2024-01-15 00:00:00, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Asia/Kolkata), false) AS LOAD_TS#164, Units#177, DIFSourceFile#178, BusinessDate#179]
   +- Project [SKU#108, 2025-04-17 AS SalesDate#153, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#142, LOAD_TS#124, Units#177, DIFSourceFile#178, BusinessDate#179]
      +- Project [SKU#108, SalesDate#107, Units#109, DIFSourceFile#110, BusinessDate#111, U AS CDC_LOAD_CODE#142, LOAD_TS#124, Units#177, DIFSourceFile#178, BusinessDate#179]
         +- Filter NOT (Units#177 = Units#109)
            +- Project [SKU#108, SalesDate#107, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#117, LOAD_TS#124, Units#177, DIFSourceFile#178, BusinessDate#179]
               +- Join Inner, ((SKU#108 = SKU#176) AND (SalesDate#107 = SalesDate#175))
                  :- Project [SalesDate#107, SKU#108, Units#109, DIFSourceFile#110, BusinessDate#111, CDC_LOAD_CODE#117, to_timestamp(2024-01-15 00:00:00, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Asia/Kolkata), false) AS LOAD_TS#124]
                  :  +- Project [SalesDate#107, SKU#108, Units#109, DIFSourceFile#110, BusinessDate#111, I AS CDC_LOAD_CODE#117]
                  :     +- LogicalRDD [SalesDate#107, SKU#108, Units#109, DIFSourceFile#110, BusinessDate#111], false
                  +- LogicalRDD [SalesDate#175, SKU#176, Units#177, DIFSourceFile#178, BusinessDate#179], false
