# Ingest pit_stops.json
#### Multi line json

In [0]:
%sql
DROP TABLE IF EXISTS f1_processed.pit_stops;

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, IntegerType, StructField, DateType, StringType, FloatType

In [0]:
pit_stops_schema = StructType(fields = [
                                    StructField("raceId", IntegerType(), False),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("stop", StringType(), True),
                                    StructField("lap", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("duration", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True)
                                    ])

In [0]:
pit_stops_df = spark.read \
.option("multiline", True) \
.schema(pit_stops_schema) \
.json(f"{raw_folder_path}/{v_file_date}/pit_stops.json")
display(pit_stops_df)

In [0]:
pit_stops_ingest_df = add_ingestion_date(pit_stops_df)

In [0]:
display(pit_stops_ingest_df)

In [0]:
from pyspark.sql.functions import lit, col

In [0]:
pit_stops_renamed_df = pit_stops_ingest_df.withColumnRenamed("raceId","race_id") \
.withColumnRenamed("driverId", "driver_id") \
.withColumn("data_source", lit(v_data_source)) \
.withColumn("file_date", lit(v_file_date))
display(pit_stops_renamed_df)

In [0]:
pit_stops_renamed_df.schema.names

In [0]:
pit_stops_renamed_df.printSchema()

In [0]:
# def re_arrange_partition_column(input_df, partition_column):
#   column_list = []
#   for column_name in input_df.schema.names:
#     if column_name != partition_column:
#       column_list.append(column_name)
#   column_list.append(partition_column)
#   output_df = input_df.select(column_list)
#   return output_df

In [0]:
# def overwrite_partition(input_df, db_name, table_name, partition_column):
#   output_df = re_arrange_partition_column(input_df, partition_column)
#   spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#   if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
#     output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
#   else:
#     output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")

In [0]:
# partition_column = 'race_id'
# column_list = []
# for column_name in pit_stops_renamed_df.schema.names:
#     if column_name != partition_column:
#         column_list.append(column_name)
# column_list.append(partition_column)
# output_df = pit_stops_renamed_df.select(column_list)
# display(output_df)

In [0]:
# spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

In [0]:
# db_name = 'f1_processed'
# table_name = 'pit_stops'

# if (spark._jsparkSession.catalog().tableExists('f1_processed.pit_stops')):
#     output_df.write.mode("overwrite").insertInto('f1_processed.pit_stops')
# else:
#     output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable('f1_processed.pit_stops')

In [0]:
# pit_stops_final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.pit_stops")
overwrite_partition(pit_stops_renamed_df, 'f1_processed', 'pit_stops', 'race_id')

In [0]:
%sql
SELECT * FROM f1_processed.pit_stops;

In [0]:
dbutils.notebook.exit("Success")