#### Ingest lap_times folder

#### Step 1 - Read the CSV file using the spark dataframe reader API

In [0]:
%run "../includes/common_functions"

In [0]:
%run "../includes/configuration"


In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
lap_times_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("lap", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True)
                                     ])
                                     

In [0]:
lap_times_df = spark.read \
.schema(lap_times_schema) \
.csv(f"/mnt/avinashprojectformula1dl/raw/{v_file_date}/lap_times")


Step 2 - Rename columns and add new columns
1. Rename driverId and raceId
1. Add ingestion_date with current timestamp

In [0]:
lap_times_with_ingestion_date_df = add_ingestion_date(lap_times_df)

In [0]:
from pyspark.sql.functions import current_timestamp ,lit
final_df = lap_times_df.withColumnRenamed("driverId", "driver_id") \
.withColumnRenamed("raceId", "race_id") \
.withColumn("ingestion_date", current_timestamp()) \
.withColumn("data_source", lit(v_data_source)) \
.withColumn("file_date", lit(v_file_date))



#### Step 3 - Write to output to processed container in parquet format

In [0]:
merge_condition = "tgt.race_id = src.race_id AND tgt.driver_id = src.driver_id AND tgt.lap = src.lap AND tgt.race_id = src.race_id"
merge_delta_data(final_df, 'f1_processed', 'lap_times', processed_folder_path, merge_condition, 'race_id')


In [0]:
#overwrite_partition(final_df, 'f1_processed', 'lap_times', 'race_id')

In [0]:
dbutils.notebook.exit("Success")

In [0]:
#final_df.write.mode("overwrite").formparquet("/mnt/avinashprojectformula1dl/processed/lap_times")