###Ingest results.json file

####Step 1 - Read the JSON file using the spark dataframe reader 

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
result_schema = StructType(fields=[StructField("resultId",IntegerType(), False),
                                 StructField("raceId",IntegerType(), True),
                                 StructField("driverId",IntegerType(), True),
                                 StructField("constructorId",IntegerType(), True),
                                 StructField("number",IntegerType(), True),
                                 StructField("grid",IntegerType(), True),
                                 StructField("position",IntegerType(), True),
                                 StructField("positionText",StringType(), True),
                                 StructField("positionOrder",IntegerType(), True),
                                 StructField("points",FloatType(), True),
                                 StructField("laps",IntegerType(), True),
                                 StructField("time",StringType(), True),
                                 StructField("milliseconds",IntegerType(), True),
                                 StructField("fastestLap",IntegerType(), True),
                                 StructField("rank",IntegerType(), True),
                                 StructField("fastestLapTime",StringType(), True),
                                 StructField("fastestLapSpeed",StringType(), True),
                                 StructField("statusId",IntegerType(), True)
])

In [0]:
results_df = spark.read \
.schema(result_schema)\
.json(f"{raw_folder_path}/{v_file_date}/results.json")

In [0]:
display(results_df)

#### Step 2 - Rename columns and add new columns
1. resultId renamed to result_id
2. raceId renamed to race_id
3. driverId renamed to driver_id
4. constructorId renamed to constructor_id
5. positionText renamed to position_text
6. positionOrder renamed to position_order
7. fastestLap renamed to fastest_lap
8. fastestLapTime renamed to fastest_lap_time
9. fastestLapSpeed renamed to fastest_lap_speed
10. add ingestion date column

In [0]:
results_with_columns_df = add_ingestion_date(results_df) \
                                   .withColumnRenamed("resultId", "result_id") \
                                   .withColumnRenamed("raceId", "race_id") \
                                   .withColumnRenamed("driverId", "driver_id") \
                                   .withColumnRenamed("constructorId", "constructor_id") \
                                   .withColumnRenamed("positionText", "position_text") \
                                   .withColumnRenamed("positionOrder", "position_order") \
                                   .withColumnRenamed("fastestLap", "fastest_lap") \
                                   .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                   .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
                                   .withColumn("data_source", lit(v_data_source)) \
                                   .withColumn("file_date", lit(v_file_date))
                                       



In [0]:
display(results_with_columns_df)

#### Step 3 - Write to output to processed container in parquet format

#####Method 1

In [0]:
#  for race_id_list in results_with_columns_df.select("race_id").distinct().collect():
#      if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#          spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id= {race_id_list.race_id})")

In [0]:
# results_with_columns_df.write.mode("append").partitionBy('race_id').format("parquet").saveAsTable("f1_processed.results")

#####Method 2

In [0]:
%sql
--DROP TABLE f1_processed.results;

In [0]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

In [0]:
# race_id must be the last column in the schema as spark assumes the last column is the partion column

results_final_df = results_with_columns_df.select("result_id", "driver_id", "constructor_id", "number", "grid", "position", "position_text", "position_order", "points", "laps", "time", "milliseconds","fastest_lap","rank","fastest_lap_time","fastest_lap_speed","data_source", "file_date","ingestion_date","race_id")

In [0]:
if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
    results_final_df.write.mode("overwrite").insertInto("f1_processed.results")
else:
    results_final_df.write.mode("overwrite").partitionBy('race_id').format("parquet").saveAsTable("f1_processed.results")

In [0]:
# %sql
# SELECT race_id, COUNT(1)
# FROM f1_processed.results
# GROUP BY race_id
# ORDER BY race_id DESC;

In [0]:
display(spark.read.parquet(f"{processed_folder_path}/results"))

In [0]:
dbutils.notebook.exit("Success")