In [0]:
from delta.tables import *

stage_table_name = "incremental_load.default.orders_stage"
target_table_name = "incremental_load.default.orders_target"

In [0]:
# Read the data from the stage table
stage_df = spark.read.table(stage_table_name)

In [0]:

# if not spark._jsparkSession.catalog().tableExists(target_table_name):
#     stage_df.write.format("delta").saveAsTable(target_table_name)
    
# else:
#     # Perform delta table merge query for upsert based on tracking_num column
#     target_table = DeltaTable.forName(spark, target_table_name)

#     # Define the merge condition based on the tracking_num column
#     merge_condition = "stage.tracking_num = target.tracking_num"

#     # Execute the merge operation
#     target_table.alias("target") \
#         .merge(stage_df.alias("stage"), merge_condition) \
#         .whenMatchedDelete() \
#         .execute()

#     stage_df.write.format("delta").mode("append").saveAsTable(target_table_name)

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException


# Helper: Check if a table exists
def table_exists(table_name: str) -> bool:
    try:
        spark.table(table_name)
        return True
    except AnalysisException:
        return False

# Helper: Check if a table is a Delta table
def is_delta_table(table_name: str) -> bool:
    try:
        return DeltaTable.isDeltaTable(spark, table_name)
    except:
        return False

# Main logic
if not table_exists(target_table_name):
    print("Target table does not exist. Creating as Delta table...")
    stage_df.write.format("delta").saveAsTable(target_table_name)

elif not is_delta_table(target_table_name):
    print("Target table exists but is NOT a Delta table. Dropping and recreating...")
    spark.sql(f"DROP TABLE IF EXISTS {target_table_name}")
    stage_df.write.format("delta").saveAsTable(target_table_name)

else:
    print("Target table exists and is a Delta table. Performing SCD1 merge...")
    target_table = DeltaTable.forName(spark, target_table_name)

    merge_condition = "stage.tracking_num = target.tracking_num"

    target_table.alias("target") \
        .merge(
            stage_df.alias("stage"),
            merge_condition
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

