In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import current_timestamp, lit

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
results_schema = StructType(fields = [
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", FloatType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", IntegerType(), True)
])

In [0]:
df_results = spark.read\
    .schema( results_schema)\
    .option("header", True)\
    .json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
df_results_renamed = df_results.withColumn("ingestion_date", current_timestamp()) \
    .withColumnRenamed("resultId", "result_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("positionText", "position_text") \
    .withColumnRenamed("positionOrder", "position_order") \
    .withColumnRenamed("fastestLap", "fastest_lap") \
    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")

In [0]:
df_results_final = df_results_renamed.drop("statusId").withColumn("file_date", lit(v_file_date))

Method 1 - Inefficient due to partition drops

In [0]:
# for race_id_row in df_results_final.select("race_id").distinct().collect():
#     if (spark.catalog.tableExists("f1_processed.results")):
#         spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id = {race_id_row.race_id})")

In [0]:
# df_results_final.write.mode("append").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")

Method 2 - Efficient than previous method because of partition overwrite

In [0]:
# spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

In [0]:
# df_results_final = df_results_final.select("result_id",
#     "driver_id",
#     "constructor_id",
#     "number",
#     "grid",
#     "position",
#     "position_text",
#     "position_order",
#     "points",
#     "laps",
#     "time",
#     "milliseconds",
#     "fastest_lap",
#     "rank",
#     "fastest_lap_time",
#     "fastest_lap_speed",
#     "ingestion_date",
#     "file_date",
#     "race_id")

In [0]:
# if spark.catalog.tableExists("f1_processed.results"):
#     df_results_final.write.mode("overwrite").insertInto("f1_processed.results")
# else:
#     df_results_final.write.mode("overwrite").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")

In [0]:
# %run "../includes/common_functions"

In [0]:
# apply_incremental_load(df_results_final, "f1_processed.results", "race_id") # Method 2 as a Function

Method 3 - For Delta Lake(Delata Tables) using Upsert

In [0]:
# from delta.tables import DeltaTable

# def merge_delta_data(input_df, target_db_name, target_table_name, target_folder_path, merge_condition, partition_column):
#     spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning.enabled", "True")

#     if spark.catalog.tableExists(f"{target_db_name}.{target_table_name}"):
#         delta_table = DeltaTable.forPath(spark, f"{target_folder_path}/{target_table_name}")

#         delta_table.alias("tgt").merge(
#             input_df.alias("src"),
#             merge_condition) \
#             .whenNotMatchedInsertAll() \
#             .whenMatchedUpdateAll() \
#             .execute()
#     else:
#         df_results_final.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{target_db_name}.{target_table_name}")

In [0]:
df_results_deduped = df_results_final.dropDuplicates(['race_id','driver_id'])

In [0]:
%run "../includes/common_functions"

In [0]:
merge_delta_data(df_results_deduped, 'f1_processed','results',processed_folder_path, 'tgt.result_id = src.result_id AND tgt.race_id = src.race_id','race_id')