##### 3.Merge Statements


In [0]:
drivers_day1_df = spark.read \
  .option("inferSchema", True) \
  .json('/mnt/formula1dl/raw/2021-03-28/drivers.json') \
  .filter('driverId <= 10') \
  .select('driverId', 'dob', 'name.forename', 'name.surname')

display(drivers_day1_df)

In [0]:
drivers_day1_df.createOrReplaceTempView("drivers_day1")

In [0]:
drivers_day2_df = spark.read \
  .option("inferSchema", True) \
  .json('/mnt/formula1dl/raw/2021-03-28/drivers.json') \
  .filter('driverId between 6 and 15') \
  .select('driverId', 'dob', 'name.forename', 'name.surname')

display(drivers_day2_df)

In [0]:
drivers_day2_df.createOrReplaceTempView("drivers_day2")

In [0]:
from pyspark.sql.functions import upper

drivers_day3_df = spark.read \
  .option("inferSchema", True) \
  .json('/mnt/formula1dl/raw/2021-03-28/drivers.json') \
  .filter('driverId between 1 and 5 or driverId between 16 and 20') \
  .select('driverId', 'dob', upper('name.forename').alias('forename'), upper('name.surname').alias('surname'))

display(drivers_day3_df)

In [0]:
%sql
create table if not exists f1_demo.drivers_merge (
driverId  INT,
dob DATE,
forname STRING,
surname STRING,
createDate DATE,
updatedDate DATE

)
using delta
location '/mnt/formula1dl/demo/drivers_merge'

In [0]:
%sql

MERGE INTO f1_demo.drivers_merge tgt
USING drivers_day1 upd ON tgt.driverId = upd.driverId
when matched then 
  update set tgt.dob = upd.dob,
             tgt.forname = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp
when not matched
  then insert (driverId,dob,forname,surname,createDate) values (driverId,dob,forename,surname,current_timestamp)

In [0]:
%sql
select *
from f1_demo.drivers_merge

In [0]:
%sql

MERGE INTO f1_demo.drivers_merge tgt
USING drivers_day2 upd ON tgt.driverId = upd.driverId
when matched then 
  update set tgt.dob = upd.dob,
             tgt.forname = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp
when not matched
  then insert (driverId,dob,forname,surname,createDate) values (driverId,dob,forename,surname,current_timestamp)


In [0]:
%sql
select * 
from f1_demo.drivers_merge

In [0]:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1dl/demo/drivers_merge")


deltaTable.alias("tgt").merge(
    drivers_day3_df.alias("upd"),
    "tgt.driverId = upd.driverId") \
  .whenMatchedUpdate(set = { "dob" : "upd.dob", "forname" : "upd.forename", "surname" : "upd.surname", "updatedDate": "current_timestamp()" } ) \
  .whenNotMatchedInsert(values =
    {
      "driverId": "upd.driverId",
      "dob": "upd.dob",
      "forname" : "upd.forename", 
      "surname" : "upd.surname", 
      "createDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
select *
from f1_demo.drivers_merge