In [0]:
from pyspark.sql.functions import col, expr, rand, when, lit

In [0]:
num_rows = 1000000000  

# Generate initial DataFrame with partition column (values from 0 to 9)
df = spark.range(0, num_rows) \
    .withColumn("partition_col", (col("id") % 10)) \
    .withColumn("name", expr("concat('User_', id % 100000)")) \
    .withColumn("department", expr("CASE WHEN id % 3 = 0 THEN 'HR' WHEN id % 3 = 1 THEN 'IT' ELSE 'Finance' END")) \
    .withColumn("salary", (rand() * 10000).cast("int"))

(
    df.repartition(500)
      .write
      .partitionBy("partition_col")
      .format("delta")
      .mode("overwrite")
      .saveAsTable("default.users")
)

spark.sql('CREATE TABLE default.users_2 DEEP CLONE default.users')

In [0]:
merge_df = spark.range(0, 10000000) \
    .withColumn("id", (col("id") * 100).cast("long")) \
    .withColumn("partition_col", when(col("id") % 3 == 0, lit(1))
                                  .when(col("id") % 3 == 1, lit(3))
                                  .otherwise(lit(7))) \
    .withColumn("name", expr("concat('Updated_User_', id % 50000)")) \
    .withColumn("department", expr("CASE WHEN id % 4 = 0 THEN 'Sales' ELSE 'HR' END")) \
    .withColumn("salary", (rand() * 20000).cast("int"))

merge_df.createOrReplaceTempView('merge_source')

In [0]:
# MERGE de um source com 10.000.000 de linhas 
# Com um destination de 1.000.000.000 de linhas
spark.sql("""
MERGE INTO default.users AS dest
USING merge_source AS source
ON dest.id = source.id 
  AND dest.partition_col = source.partition_col
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT (id, partition_col, name, department, salary) 
      VALUES (source.id, source.partition_col, 
        source.name, source.department, source.salary
    )
""").display()

In [0]:
# Generate the list of partitions first
partitions_str = ''
for partition in spark.sql('SELECT DISTINCT partition_col FROM merge_source').collect():
    partitions_str += str(partition['partition_col']) + ','

partitions = partitions_str[:-1]

# MERGE de um source com 10.000.000 de linhas 
# Com um destination de 1.000.000.000 de linhas
spark.sql(f"""
MERGE INTO default.users_2 AS dest
USING merge_source AS source
ON dest.partition_col IN ({partitions})
  AND dest.id = source.id 
WHEN MATCHED THEN
    UPDATE SET *
WHEN NOT MATCHED THEN
    INSERT (id, partition_col, name, department, salary) 
      VALUES (source.id, source.partition_col, source.name, 
        source.department, source.salary
      )
""").display()