In [None]:
## Merge in Python: Which One Has the Nicest Syntax and Is the Fastest?
# https://databrickster.medium.com/merge-in-python-which-one-has-the-nicest-syntax-and-is-the-fastest-845799729c23
https://databrickster.medium.com/merge-in-python-which-one-has-the-nicest-syntax-and-is-the-fastest-845799729c23

In [1]:
import time
import pyspark.sql.functions as F
from delta.tables import DeltaTable

# ------------------------------------------------------------------
# 0.  TEST RUNTIME SETTINGS
# ------------------------------------------------------------------
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)   # force shuffle joins
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning", "true")

# ------------------------------------------------------------------
# 1.  BUILD WIDE, COMPLEX, SKEWED DATAFRAMES
#      – many columns
#      – nested/array column
#      – heavy computed column
#      – 90 % skew on tenant_id
# ------------------------------------------------------------------
NUM_SOURCE_ROWS  = 100_000_000
NUM_TARGET_ROWS  = 200_000_000           # 50 M overlap + 150 M new
SKEW_FRACTION    = 0.90                  # 90 % rows get same tenant_id

def build_df(start: int, end: int):
    df = spark.range(start, end).repartition(512)            # plenty of partitions
    df = (df
          .withColumn("date", F.expr("current_date() - cast(rand()*100 as int)"))
          .withColumn("status", F.when(F.rand() > 0.5, "active").otherwise("inactive"))
          .withColumn("tenant_id",
                      F.when(F.rand() < SKEW_FRACTION, F.lit("TenantA"))
                       .otherwise(F.concat(F.lit("Tenant"), F.expr("cast(rand()*1000 as int)"))))
          # --- width boosters ---
          .withColumn("value1",  F.rand()*1000)
          .withColumn("value2",  F.col("id")*5 + F.lit(123))
          .withColumn("status_flag", F.when(F.col("status") == "active", 1).otherwise(0))
          .withColumn("id_str",  F.concat(F.lit("ID-"), F.col("id")))
          .withColumn("address",
                      F.struct(F.lit("123 Main St").alias("street"),
                               F.lit("Metropolis").alias("city")))
          .withColumn("random_values", F.array(F.rand(), F.rand(), F.rand()))
          .withColumn("complex_calc",
                      F.pow(F.col("id").cast("double"), 2) * F.log(F.col("id")+1))
          .withColumn("last_updated", F.lit(None).cast("timestamp"))
    )
    return df

source_df  = build_df(1, NUM_SOURCE_ROWS + 1)
target_df  = build_df(50_000_000, 50_000_000 + NUM_TARGET_ROWS)

# ------------------------------------------------------------------
# 2.  (RE)CREATE TARGET TABLES
# ------------------------------------------------------------------
for tbl in ("target1", "target2", "target3"):
    spark.sql(f"DROP TABLE IF EXISTS {tbl}")
    target_df.write.mode("overwrite").saveAsTable(tbl)
    spark.sql(f"OPTIMIZE {tbl}") 

# ------------------------------------------------------------------
# 3.  DEFINE A HELPER TO TIME MERGES CLEANLY
# ------------------------------------------------------------------
def time_it(label, fn):
    start = time.perf_counter()
    fn()
    elapsed = time.perf_counter() - start
    print(f"{label:<12s}  {elapsed:,.1f}  seconds")

# ------------------------------------------------------------------
# 4.  MERGE PATTERN 1  – DeltaTable API
# ------------------------------------------------------------------
def merge_pattern_1():
    src = build_df(1, NUM_SOURCE_ROWS + 1)               # fresh source (avoid cache)
    DeltaTable.forName(spark, "target1") \
      .alias("t") \
      .merge(src.alias("s"), "t.id = s.id") \
      .whenMatchedUpdate(set = {
          # heavy conditional update to stress CPU
          "status": "CASE WHEN t.status = 'inactive' AND s.status = 'active' "
                    "THEN 'reactivated' ELSE t.status END",
          "value1": "t.value1 + s.value1",
          "status_flag": "CASE WHEN s.status = 'active' THEN 1 ELSE 0 END",
          "last_updated": "current_timestamp()"
      }) \
      .whenNotMatchedInsertAll() \
      .execute()

# ------------------------------------------------------------------
# 5.  MERGE PATTERN 2  – DataFrameWriter.mergeInto
# ------------------------------------------------------------------
def merge_pattern_2():
    src = build_df(1, NUM_SOURCE_ROWS + 1)
    (src.alias("s")
        .mergeInto("target2", F.expr("target2.id = s.id"))
        .whenMatched()
            .update({
                "status":      F.expr("CASE WHEN target2.status = 'inactive' "
                                      "AND s.status = 'active' "
                                      "THEN 'reactivated' ELSE target2.status END"),
                "value1":      F.expr("target2.value1 + s.value1"),
                "status_flag": F.when(src.status == "active", 1).otherwise(0),
                "last_updated": F.current_timestamp()
            })
        .whenNotMatched()
            .insertAll()
        .merge())
    
# ------------------------------------------------------------------
# 6.  MERGE PATTERN 3  – SQL in PySpark
# ------------------------------------------------------------------
def merge_pattern_3():
    src = build_df(1, NUM_SOURCE_ROWS + 1)
    spark.sql("""
        MERGE INTO target3 t
        USING {src_df} s
        ON   t.id = s.id
        WHEN MATCHED THEN UPDATE SET
            status       = CASE WHEN t.status = 'inactive' AND s.status = 'active'
                                THEN 'reactivated' ELSE t.status END,
            value1       = t.value1 + s.value1,
            status_flag  = CASE WHEN s.status = 'active' THEN 1 ELSE 0 END,
            last_updated = current_timestamp()
        WHEN NOT MATCHED THEN
            INSERT *
    """, src_df=src)

ModuleNotFoundError: No module named 'delta'