In [0]:
from pyspark.sql.functions import lit
from delta.tables import DeltaTable

# Step 1: Create DataFrame 1 with a null column
df1 = spark.createDataFrame([
    (1, "A", None),
    (2, "B", "foo"),
    (3, "C", "bar")
], ["id", "name", "value"])

# Step 2: Write DataFrame 1 as a Delta table
table_name = "merge_null_demo"
df1.write.format("delta").mode("overwrite").saveAsTable(table_name)

# Step 3: Create DataFrame 2 for merge (potential duplicate)
df2 = spark.createDataFrame([
    (1, "A", None),   # duplicate of row 1
    (4, "D", "baz")   # new row
], ["id", "name", "value"])

# Register DeltaTable
delta_table = DeltaTable.forName(spark, table_name)

# Case 1: Merge using '=' (will not match nulls)
delta_table.alias("t").merge(
    df2.alias("s"),
    "t.id = s.id AND t.value = s.value"
).whenNotMatchedInsertAll().execute()

print("After merge with '=' (nulls not matched):")
display(spark.table(table_name))

# Reset table for next case
df1.write.format("delta").mode("overwrite").saveAsTable(table_name)
delta_table = DeltaTable.forName(spark, table_name)

# Case 2: Merge using '<=>' (null-safe equality)
delta_table.alias("t").merge(
    df2.alias("s"),
    "t.id = s.id AND t.value <=> s.value"
).whenNotMatchedInsertAll().execute()

print("After merge with '<=>' (null-safe equality):")
display(spark.table(table_name))

After merge with '=' (nulls not matched):


id,name,value
1,A,
2,B,foo
3,C,bar
4,D,baz
1,A,


After merge with '<=>' (null-safe equality):


id,name,value
1,A,
2,B,foo
3,C,bar
4,D,baz
