### Ingest pit_stops.csv file

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

##### Step 1 - Read the JSON file using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
fields = [StructField("raceId",IntegerType(),False),
          StructField("driverId",IntegerType(),True),
          StructField("stop",IntegerType(),True),
          StructField("lap",IntegerType(),True),
          StructField("time",StringType(),True),
          StructField("duration",StringType(),True),
          StructField("milliseconds",IntegerType(),True)
         ]
pit_stops_schema = StructType(fields = fields)

In [0]:
pit_stops_df = spark.read \
.schema(pit_stops_schema) \
.option("multiLine", True) \
.json(f"{raw_folder_path}/{v_file_date}/pit_stops.json")

##### Step 2 - Rename and add columns

In [0]:
from pyspark.sql.functions import lit

In [0]:
pit_stops_final_df = pit_stops_df \
.withColumnRenamed("raceId","race_id") \
.withColumnRenamed("driverId","driver_id") \
.withColumn("data_source",lit(v_data_source)) \
.withColumn("file_date", lit(v_file_date))

pit_stops_final_df = add_ingestion_date(pit_stops_final_df)

##### Step 3 - Write dataframe in delta format

In [0]:
merge_condition = "target.race_id = source.race_id and target.driver_id = source.driver_id and target.stop = source.stop"
merge_delta_data(pit_stops_final_df, "f1_processed", "pit_stops", processed_folder_path, merge_condition, "race_id")

In [0]:
dbutils.notebook.exit("Success")

Success

In [0]:
%sql
/*select race_id, count(1) 
from f1_processed.pit_stops
group by race_id
order by race_id desc*/

race_id,count(1)
1053,56
1052,40
1047,23
1046,39
1045,57
1044,38
1043,30
1042,25
1041,33
1040,24
