In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/paths"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import *
results_schema = StructType([StructField("constructorId", IntegerType(), True),
                            StructField("driverId", IntegerType(), True),
                            StructField("fastestLap", IntegerType(), True),
                            StructField("fastestLapSpeed", StringType(), True),
                            StructField("fastestLapTime", StringType(), True),
                            StructField("grid", IntegerType(), True),
                            StructField("laps", IntegerType(), True),
                            StructField("milliseconds", IntegerType(), True),
                            StructField("number", IntegerType(), True),
                            StructField("points", FloatType(), True),
                            StructField("position", IntegerType(), True),
                            StructField("positionOrder", IntegerType(), True),
                            StructField("positionText", StringType(), True),
                            StructField("raceId", IntegerType(), True),
                            StructField("rank", IntegerType(), True),
                            StructField("resultId", IntegerType(), True),
                            StructField("statusId", IntegerType(), True),
                            StructField("time", StringType(), True)])

In [0]:
results_df = spark.read.schema(results_schema).json(f"{raw_container_folder_path}/{v_file_date}/results.json")

In [0]:
from pyspark.sql.functions import *

results_final_df  = results_df.withColumnRenamed("constructorId", "constructor_id")\
                                .withColumnRenamed("driverId", "driver_id")\
                                .withColumnRenamed("fastestLap", "fastest_lap")\
                                .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\
                                .withColumnRenamed("fastestLapTime", "fastest_lap_time")\
                                .withColumnRenamed("positionText", "position_text")\
                                .withColumnRenamed("raceId", "race_id")\
                                .withColumnRenamed("resultId", "result_id")\
                                .withColumnRenamed("positionOrder", "position_order") \
                                .withColumn("ingestion_date",current_timestamp())\
                                .withColumn("data_source", lit(v_data_source))\
                                .withColumn("file_date", lit(v_file_date))\
                                .drop("statusId")

In [0]:
result_no_dup = results_final_df.dropDuplicates(["race_id","driver_id"])

In [0]:
# #incremental load method 1 - with append mode. 
# # Repair the table to update the Hive metastore with partition information
# #spark.sql("MSCK REPAIR TABLE f1_processed_db.results")

# #iterating through the list of distinct race_id present in final df
# for race_id_list in results_final_df.select("race_id").distinct().collect(): 
#     #checks if the table exists, command will not run on first run as table will be there so this can avoid any unwanted errors
#     if (spark._jsparkSession.catalog().tableExists("f1_processed_db.results")): 
#         #this checks for the partation col ie race id, if the race id exists it will drop it, then as per code in next cell new partation will be created and data will be loaded 
#         spark.sql(f"ALTER TABLE f1_processed_db.results DROP If exists PARTITION (race_id = {race_id_list.race_id})") 


In [0]:
#method 2 - using overwrite mode and insterinto instead of save as table
# we use 2 write statements on a normal one which will load the data for the first time then a incremental load using insterInto. ie.. else block runs for fresh run then once table is created if block will be executed
# also we need to sepify partation as dynamic

# if (spark._jsparkSession.catalog().tableExists("f1_processed_db.results")):
#     results_final_df.write.mode("overwrite").insertInto("f1_processed_db.results") # we do not have a option to specify partition in insertInto, by default spark will consider last columnof df as partation column so need to adjust schema so that race is comes last using select in above cell
# else:
#     results_final_df.write.mode("overwrite").partitionBy("race_id").format("parquet").saveAsTable("f1_processed_db.results")

# added this to common function will be calling the function 

In [0]:
# spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning", "true")
# from delta.tables import DeltaTable
# if (spark._jsparkSession.catalog().tableExists("f1_processed_db.results")):
#   deltaTable = DeltaTable.forPath(spark, f"{processed_container_folder_path}/results")
#   deltaTable.alias("target_tab")\
#     .merge(results_final_df.alias("source_tab"),\
#       "target_tab.result_id = source_tab.result_id AND target_tab.race_id = source_tab.race_id")\
#       .whenMatchedUpdateAll()\
#         .whenNotMatchedInsertAll()\
#           .execute()
# else:
#   results_final_df.write.mode("overwrite").partitionBy("race_id").format("delta").saveAsTable("f1_processed_db.results")

# converting the above code to a function so that we can use it in other notebooks function is in commom functions notebook
#in the merge condition we have added the partation col check ie target_tab.race_id = source_tab.race_id bcz it will help spark to check for result_id in the partation, this a process of improving the performance

In [0]:
merge_condition  = "target_tab.result_id = source_tab.result_id AND target_tab.race_id = source_tab.race_id"
merge_incremental_data(result_no_dup,processed_container_folder_path, merge_condition, "race_id", "f1_processed_db", "results")

In [0]:
spark.read.format("delta").load(f"{processed_container_folder_path}/results").count()

In [0]:
dbutils.notebook.exit("success")