### Ingest pitstops.json file

##### Read the json file with multiple line using spark df reader

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, FloatType, TimestampType
from pyspark.sql.functions import col, current_timestamp, lit, to_timestamp,concat

In [0]:
pitstops_schema = StructType(fields=[StructField("raceId", IntegerType(), True),
                                     StructField("driverId", IntegerType(), True),
                                      StructField("stop", StringType(), True),
                                      StructField("lap", IntegerType(), True), 
                                      StructField("time", StringType(), True),
                                      StructField("duration", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)])

In [0]:
pitstops_df = spark.read \
.schema(pitstops_schema) \
.option("multiLine", True) \
.json(f"{raw_inc_folder_path}/{v_file_date}/pit_stops.json")

##### Select the columns

In [0]:
pitstops_final_df = pitstops_df.withColumnRenamed("raceId", "race_id") \
                                      .withColumnRenamed("driverId", "driver_id") \
                                      .withColumn("ingestion_date", current_timestamp()) \
                                      .withColumn("datasource", lit(v_data_source))\
                                      .withColumn("file_date", lit(v_file_date))

Write the result as parquet file

In [0]:
#pitstops_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/pitstops")

Incrememtal Load
#####Defind in Includes

In [0]:
#overwrite_partition(pitstops_final_df, "f1_processed_incremental", "pitstops", "race_id")

In [0]:
merge_condition =  "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.stop = src.stop AND tgt.race_id = src.race_id"
merge_delta_data(pitstops_final_df, "f1_processed_incremental", "pitstops", processed_inc_folder_path, merge_condition, "race_id")

In [0]:
%sql
SELECT * FROM f1_processed_incremental.pitstops

In [0]:
display(spark.read.format("delta").load(f"{processed_inc_folder_path}/pitstops"))

In [0]:
%sql
REFRESH TABLE f1_processed_incremental.pitstops;

SELECT race_id, COUNT(race_id)
FROM f1_processed_incremental.pitstops
GROUP BY race_id;

In [0]:
dbutils.notebook.exit("Success")