#### ingest the multiline pitstop json file

- read json file
- rename columns and add new columns
- drop unwanted columns
- write output to silver container

In [0]:
dbutils.widgets.text("data_source","testing")
value_data_source = dbutils.widgets.get("data_source")

In [0]:
%run "../../constants/configuration"

In [0]:
%run "../../utils/common_functions"

In [0]:
from pyspark.sql.functions import current_timestamp, col, concat, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [0]:
pit_stops_schema = StructType([
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), True),
    StructField("stop", StringType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True)  
])

In [0]:
pit_stops_df = spark.read \
    .schema(pit_stops_schema)\
    .option("multiline", "true")\
    .json(f"{bronze_container_path}/pit_stops.json")

In [0]:
pit_stops_with_ingestion_date_df = add_ingestion_date(pit_stops_df)

##### rename columns and add columns

In [0]:
pit_stops_final_df = pit_stops_with_ingestion_date_df.withColumnRenamed("driverId", "driver_id")\
                                 .withColumnRenamed("raceId", "race_id")\
                                 .withColumn("data_source", lit(value_data_source))

In [0]:
#pit_stops_final_df.write.mode("overwrite").format("delta").saveAsTable("motor_dev.silver.pit_stops")

merge_condition = "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.stop = src.stop AND tgt.race_id = src.race_id"
merge_delta_data(pit_stops_final_df, 'silver', 'pit_stops', merge_condition, 'race_id')