
# Merge and Upsert


## Create a dataframe for demo

In [0]:
%python

# just creating a dataframe


drivers_day1_df = spark.read\
    .option("inferSchema", "true")\
    .json("abfss://raw@alejandroauedevdl.dfs.core.windows.net/2021-03-28/drivers.json")\
    .filter("driverId<=10")\
    .select("driverId","dob","name.forename", "name.surname")


In [0]:
display(drivers_day1_df)

In [0]:
%python

from pyspark.sql.functions import upper

# note that from driver id 1 to 10 they already exist so they will be updated, from driver id 11 to 15 they will be inserted

drivers_day2_df = spark.read\
    .option("inferSchema", "true")\
    .json("abfss://raw@alejandroauedevdl.dfs.core.windows.net/2021-03-28/drivers.json")\
    .filter("driverId BETWEEN 6 AND 15")\
    .select("driverId","dob",upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))


In [0]:
display(drivers_day2_df)

In [0]:
%python

from pyspark.sql.functions import upper

# This one will be use as example of Pyspark
# note that from driver id 1 to 5 they already exist so they will be updated, from driver id 16 to 20 they will be inserted

drivers_day3_df = spark.read\
    .option("inferSchema", "true")\
    .json("abfss://raw@alejandroauedevdl.dfs.core.windows.net/2021-03-28/drivers.json")\
    .filter("driverId BETWEEN 1 AND 5 OR driverId BETWEEN 16 AND 20")\
    .select("driverId","dob",upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))


## Create Managed Table as target tables and python dataframes as source

In [0]:
%sql

-- Create managed table

CREATE TABLE IF NOT EXISTS f1_demo.drivers_merge(

 driverId INT
,dob DATE
,forename STRING
,surname STRING
,createDate DATE
,updatedDate DATE

)
USING DELTA



# Create temporary view so SQL can reference the dataframes

In [0]:
# Create temporary view so SQL can reference the DataFrame

drivers_day1_df.createOrReplaceTempView("drivers_day1_df")
drivers_day2_df.createOrReplaceTempView("drivers_day2_df")
drivers_day3_df.createOrReplaceTempView("drivers_day3_df")



# SQL Merge statement (first example)

In [0]:
%sql
    

MERGE INTO f1_demo.drivers_merge AS tgt
USING drivers_day1_df AS upd
ON tgt.driverId = upd.driverId

WHEN MATCHED THEN
  
  UPDATE SET tgt.dob = upd.dob,
  tgt.forename = upd.forename,
  tgt.surname = upd.surname,

  tgt.updatedDate = current_timestamp


WHEN NOT MATCHED 

THEN INSERT ( driverId, dob, forename, surname, createDate  )

  VALUES (
     upd.driverId
    ,upd.dob
    ,upd.forename
    ,upd.surname
    ,current_timestamp

  )

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge;

In [0]:
       

display(spark.table("f1_demo.drivers_merge"))


# SQL Merge statement (second example)

In [0]:
%sql
    
MERGE INTO f1_demo.driverS_merge AS tgt
USING drivers_day2_df AS upd
ON tgt.driverId = upd.driverId

WHEN MATCHED THEN
  
  UPDATE SET tgt.dob = upd.dob,
  tgt.forename = upd.forename,
  tgt.surname = upd.surname,

  tgt.updatedDate = current_timestamp


WHEN NOT MATCHED 

THEN INSERT ( driverId, dob, forename, surname, createDate  )

  VALUES (
     upd.driverId
    ,upd.dob
    ,upd.forename
    ,upd.surname
    ,current_timestamp

  )

In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge;


# Merge using Pyspark for the merge/upsert

In [0]:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'f1_demo.drivers_merge')

deltaTable.alias('tgt') \
  .merge(
    drivers_day3_df.alias('upd'),
    'tgt.driverId = upd.driverId'
  ) \
  .whenMatchedUpdate(set =
    {
      "dob": "upd.dob",
      "forename": "upd.forename",
      "surname": "upd.surname",
      "updatedDate": "current_timestamp()"

    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "driverId": "upd.driverId",
      "driverId": "upd.driverId",
      "dob": "upd.dob",
      "forename": "upd.forename",
      "surname": "upd.surname",
      "createDate": "current_timestamp()",


    }
  ) \
  .execute()

In [0]:
%sql

SELECT * FROM f1_demo.drivers_merge