In [0]:
%run "../includes/configuration"

In [0]:
drivers_day1_df=spark.read\
    .option("inferSchema", "true")\
        .json(f"{incraw_folder_path}/2021-03-28/drivers.json")\
            .filter("driverId<=10")\
                .select("driverId","dob", "name.forename", "name.surname")

In [0]:
drivers_day1_df.createOrReplaceTempView("drivers_day1")

In [0]:
display(drivers_day1_df)

driverId,dob,forename,surname
1,1985-01-07,Lewis,Hamilton
2,1977-05-10,Nick,Heidfeld
3,1985-06-27,Nico,Rosberg
4,1981-07-29,Fernando,Alonso
5,1981-10-19,Heikki,Kovalainen
6,1985-01-11,Kazuki,Nakajima
7,1979-02-28,Sébastien,Bourdais
8,1979-10-17,Kimi,Räikkönen
9,1984-12-07,Robert,Kubica
10,1982-03-18,Timo,Glock


In [0]:
from pyspark.sql.functions import upper
drivers_day2_df=spark.read\
    .option("inferSchema", "true")\
        .json(f"{incraw_folder_path}/2021-03-28/drivers.json")\
            .filter("driverId between 6 and 15")\
                .select("driverId","dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))

In [0]:
drivers_day2_df.createOrReplaceTempView("drivers_day2")

In [0]:
display(drivers_day2_df)

driverId,dob,forename,surname
6,1985-01-11,KAZUKI,NAKAJIMA
7,1979-02-28,SÉBASTIEN,BOURDAIS
8,1979-10-17,KIMI,RÄIKKÖNEN
9,1984-12-07,ROBERT,KUBICA
10,1982-03-18,TIMO,GLOCK
11,1977-01-28,TAKUMA,SATO
12,1985-07-25,NELSON,PIQUET JR.
13,1981-04-25,FELIPE,MASSA
14,1971-03-27,DAVID,COULTHARD
15,1974-07-13,JARNO,TRULLI


In [0]:
from pyspark.sql.functions import upper
drivers_day3_df=spark.read\
    .option("inferSchema", "true")\
        .json(f"{incraw_folder_path}/2021-03-28/drivers.json")\
            .filter("driverId between 1 and 5 or driverId between 16 and 20")\
                .select("driverId","dob", upper("name.forename").alias("forename"), upper("name.surname").alias("surname"))

In [0]:
%sql
create table if not exists f1_delta_lake_demo.drivers_merge(
  driverId int,
  dob date,
  forename string,
  surname string,
  createdDate DATE,
  updatedDate DATE
)using delta

In [0]:
%sql
merge into f1_delta_lake_demo.drivers_merge tgt
using drivers_day1 upd
on tgt.driverId = upd.driverId
when matched then update set tgt.dob = upd.dob,tgt.forename = upd.forename,tgt.surname = upd.surname,tgt.updatedDate = current_timestamp
when not matched then insert (driverId,dob,forename,surname,createdDate) values (driverId,dob,forename,surname,current_timestamp)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10,0,0,10


In [0]:
%sql
select * from f1_delta_lake_demo.drivers_merge

driverId,dob,forename,surname,createdDate,updatedDate
1,1985-01-07,Lewis,Hamilton,2024-12-20,
2,1977-05-10,Nick,Heidfeld,2024-12-20,
3,1985-06-27,Nico,Rosberg,2024-12-20,
4,1981-07-29,Fernando,Alonso,2024-12-20,
5,1981-10-19,Heikki,Kovalainen,2024-12-20,
6,1985-01-11,Kazuki,Nakajima,2024-12-20,
7,1979-02-28,Sébastien,Bourdais,2024-12-20,
8,1979-10-17,Kimi,Räikkönen,2024-12-20,
9,1984-12-07,Robert,Kubica,2024-12-20,
10,1982-03-18,Timo,Glock,2024-12-20,


In [0]:
%sql
merge into f1_delta_lake_demo.drivers_merge tgt
using drivers_day2 upd
on tgt.driverId = upd.driverId
when matched then update set tgt.dob = upd.dob,tgt.forename = upd.forename,tgt.surname = upd.surname,tgt.updatedDate = current_timestamp
when not matched then insert (driverId,dob,forename,surname,createdDate) values (driverId,dob,forename,surname,current_timestamp)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
10,5,0,5


In [0]:
%sql
select * from f1_delta_lake_demo.drivers_merge

driverId,dob,forename,surname,createdDate,updatedDate
6,1985-01-11,KAZUKI,NAKAJIMA,2024-12-20,2024-12-20
7,1979-02-28,SÉBASTIEN,BOURDAIS,2024-12-20,2024-12-20
8,1979-10-17,KIMI,RÄIKKÖNEN,2024-12-20,2024-12-20
9,1984-12-07,ROBERT,KUBICA,2024-12-20,2024-12-20
10,1982-03-18,TIMO,GLOCK,2024-12-20,2024-12-20
11,1977-01-28,TAKUMA,SATO,2024-12-20,
12,1985-07-25,NELSON,PIQUET JR.,2024-12-20,
13,1981-04-25,FELIPE,MASSA,2024-12-20,
14,1971-03-27,DAVID,COULTHARD,2024-12-20,
15,1974-07-13,JARNO,TRULLI,2024-12-20,


In [0]:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/centrallake/deltalakedemo/drivers_merge")

deltaTable.alias("tgt").merge(
    drivers_day3_df.alias("upd"),
    "tgt.driverId = upd.driverId") \
  .whenMatchedUpdate(set = { "dob" : "upd.dob", "forename" : "upd.forename", "surname" : "upd.surname", "updatedDate": "current_timestamp()" } ) \
  .whenNotMatchedInsert(values =
    {
      "driverId": "upd.driverId",
      "dob": "upd.dob",
      "forename" : "upd.forename", 
      "surname" : "upd.surname", 
      "createdDate": "current_timestamp()"
    }
  ) \
  .execute()

In [0]:
%sql
select * from f1_delta_lake_demo.drivers_merge

driverId,dob,forename,surname,createdDate,updatedDate
6,1985-01-11,KAZUKI,NAKAJIMA,2024-12-20,2024-12-20
7,1979-02-28,SÉBASTIEN,BOURDAIS,2024-12-20,2024-12-20
8,1979-10-17,KIMI,RÄIKKÖNEN,2024-12-20,2024-12-20
9,1984-12-07,ROBERT,KUBICA,2024-12-20,2024-12-20
10,1982-03-18,TIMO,GLOCK,2024-12-20,2024-12-20
11,1977-01-28,TAKUMA,SATO,2024-12-20,
12,1985-07-25,NELSON,PIQUET JR.,2024-12-20,
13,1981-04-25,FELIPE,MASSA,2024-12-20,
14,1971-03-27,DAVID,COULTHARD,2024-12-20,
15,1974-07-13,JARNO,TRULLI,2024-12-20,
