#### passing parameters to the notebook (adding a widget)

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "")
v_file_date = dbutils.widgets.get("p_file_date")

#### invoke a notebook from another notebook using the %run command

In [0]:
%run "../Includes/configuration"

In [0]:
%run  "../Includes/common_functions"

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                   StructField("raceId", IntegerType(), True),
                                   StructField("driverId", IntegerType(), True),
                                   StructField("constructorId", IntegerType(), True),
                                   StructField("number", IntegerType(), True),
                                   StructField("grid", IntegerType(), True),
                                   StructField("position", IntegerType(), True),
                                   StructField("positionText", StringType(), True),
                                   StructField("positionOrder", IntegerType(), True),
                                   StructField("points", StringType(), True),
                                   StructField("laps", IntegerType(), True),
                                   StructField("time", StringType(), True),
                                   StructField("milliseconds", IntegerType(), True),
                                   StructField("fastestLap", IntegerType(), True),
                                   StructField("rank", IntegerType(), True),
                                   StructField("fastestLapTime", StringType(), True),
                                   StructField("fastestLapSpeed", StringType(), True),
                                   StructField("statusId", StringType(), True)])

In [0]:
result_df = spark.read\
    .option("header", True)\
    .schema(results_schema)\
    .json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
results_final_df= result_df.withColumnRenamed("resultId", "result_id")\
    .withColumnRenamed("raceId", "race_id")\
    .withColumnRenamed("driverId", "driver_id")\
    .withColumnRenamed("constructorId", "constructor_id")\
    .withColumnRenamed("positionText", "position_text")\
    .withColumnRenamed("positionOrder", "position_order")\
    .withColumnRenamed("fastestLap", "fastest_lap")\
    .withColumnRenamed("fastestLapTime", "fastest_lap_time")\
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")\
    .withColumn("ingestion_date", current_timestamp())\
    .withColumn("data_source", lit(v_data_source))\
    .drop("statusId")\
    .withColumn("file_date", lit(v_file_date))

In [0]:
results_deduped=results_final_df.dropDuplicates(['race_id', 'driver_id'])


##### incremental Load first solution 

In [0]:
#for race_id_list in results_final_df.select("race_id").distinct().collect():
#  if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#    spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id = {race_id_list.race_id})")
#results_final_df.write.mode("append").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")


##### incremental Load Second solution 
##### Note. The partition cloumn must be the last column and the overwrite must be dynamic

In [0]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")


results_final_df= results_final_df.selectExpr("result_id", "driver_id", "constructor_id", "number", "grid", "position", "position_text", "position_order", "points", "laps", "time", "milliseconds", "fastest_lap", "rank", "fastest_lap_time", "fastest_lap_speed", "ingestion_date", "data_source", "file_date", "race_id")

 
 if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
     results_final_df.write.mode("overwrite").insertInto("f1_processed.results")
 else:
     results_final_df.write.mode("overwrite").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")


##### incremental Load Second solution using functions

In [0]:
#overwrite_partition(results_final_df, "f1_processed", "results", "race_id")


##### incremental Load for Delta files

In [0]:
merge_condition = "tgt.result_id = src.result_id AND tgt.race_id = src.race_id"
merge_delta_data(results_deduped, "f1_processed", "results", processed_folder_path, merge_condition, "race_id")

In [0]:
dbutils.notebook.exit("Success")