In [0]:
%run "../includes/common_functions"

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS f1_demo
MANAGED LOCATION "abfss://temp@dbvsarthi.dfs.core.windows.net"

In [0]:
results_df=spark.read\
    .option('inferSchema','true')\
    .option('header','true')\
    .json(f"{raw_folder_path}/2021-03-28/results.json")

In [0]:
results_df.write.format("delta").mode("overwrite").saveAsTable("f1_demo.results_managed")

In [0]:
results_df.write.format("delta").mode("overwrite").save("abfss://temp@dbvsarthi.dfs.core.windows.net/results_external")

In [0]:
df=spark.read.format("delta").load("abfss://temp@dbvsarthi.dfs.core.windows.net/results_external")
df.display()

In [0]:
%sql
SELECT * FROM f1_demo.results_managed

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.results_external
USING DELTA
LOCATION 'abfss://temp@dbvsarthi.dfs.core.windows.net/results_external'

In [0]:
%sql
SELECT * FROM f1_demo.results_external

In [0]:
df=spark.read.format("delta").load("abfss://temp@dbvsarthi.dfs.core.windows.net/results_external")
df.display()

In [0]:
%sql
UPDATE f1_demo.results_managed
SET points = 11 - position
WHERE position <=10;
    
SELECT * FROM f1_demo.results_managed

In [0]:
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "abfss://temp@dbvsarthi.dfs.core.windows.net/results_external")
deltaTable.update('position <= 10', {'position': '21-position'})

In [0]:
from pyspark.sql.functions import col
drivers_day1=spark.read\
    .option('inferSchema','true')\
    .option('header','true')\
    .json(f"{raw_folder_path}/2021-03-28/drivers.json")\
    .filter("driverId<=10")\
    .select("driverId","dob", col("name.forename").alias("forename"), col("name.surname").alias("surname"))

drivers_day1.display()

In [0]:
from pyspark.sql.functions import upper
driver_day_2=spark.read\
    .option('inferSchema','true')\
    .option('header','true')\
    .json(f"{raw_folder_path}/2021-03-28/drivers.json")\
    .filter("driverId BETWEEN 6 AND 15")\
    .select("driverId","dob",upper("name.forename").alias("forename"),upper("name.surname").alias("surname"))
driver_day_2.display()

In [0]:
driver_day3=spark.read\
    .option('inferSchema','true')\
    .option('header','true')\
    .json(f"{raw_folder_path}/2021-03-28/drivers.json")\
    .filter("driverId BETWEEN 1 AND 5 OR driverId BETWEEN 16 AND 20")\
    .select("driverId","dob",upper("name.forename").alias("forename"),upper("name.surname").alias("surname"))
driver_day3.display()

In [0]:
drivers_day1.createOrReplaceTempView("drivers_day1")
driver_day_2.createOrReplaceTempView("driver_day_2")

In [0]:
%sql
CREATE TABLE IF NOT EXISTS f1_demo.drivers_merge
(
  driverId INT,
  dob DATE, 
  forename STRING, 
  surname STRING, 
  createdDate DATE,
  updatedDate DATE
)
USING DELTA

In [0]:
%sql
SHOW TABLE EXTENDED IN f1_demo LIKE 'drivers_merge'

In [0]:
%sql
MERGE INTO f1_demo.drivers_merge tgt
USING drivers_day1 upd
ON tgt.driverId = upd.driverId
WHEN MATCHED THEN
  UPDATE SET tgt.dob = upd.dob,
             tgt.forename = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp()
WHEN NOT MATCHED THEN
  INSERT (driverId, dob, forename,surname,createdDate)
  VALUES (driverId, dob, forename, surname, current_timestamp())


In [0]:
%sql
SELECT * FROM f1_demo.drivers_merge


In [0]:
%sql 
MERGE INTO f1_demo.drivers_merge tgt
USING driver_day_2 upd
ON tgt.driverId = upd.driverId
WHEN MATCHED THEN
  UPDATE SET tgt.dob = upd.dob,
             tgt.forename = upd.forename,
             tgt.surname = upd.surname,
             tgt.updatedDate = current_timestamp()
WHEN NOT MATCHED THEN
  INSERT (driverId, dob, forename,surname,createdDate)
  VALUES (driverId, dob, forename, surname, current_timestamp())

In [0]:
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable
deltaTable=DeltaTable.forName(spark,"f1_demo.drivers_merge")
deltaTable.alias("tgt").merge(
    driver_day3.alias("upd"),
    "tgt.driverId = upd.driverId")\
    .whenMatchedUpdate(set = {
        "dob": "upd.dob", 
        "forename": "upd.forename", 
        "surname": "upd.surname", 
        "updatedDate": current_timestamp()})\
    .whenNotMatchedInsert(values = {
        "driverId": "upd.driverId", 
        "dob": "upd.dob", 
        "forename": "upd.forename", 
        "surname": "upd.surname",
        "createdDate":current_timestamp()}
    )\
.execute()

In [0]:
%sql
DESCRIBE HISTORY f1_demo.drivers_merge