In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

#### Ingest results.json file

##### Step 1 - Read the JSON file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                                    StructField("raceId", IntegerType(), True),
                                    StructField("driverId", IntegerType(), True),
                                    StructField("constructorId", IntegerType(), True),
                                    StructField("number", IntegerType(), True),\
                                    StructField("grid", IntegerType(), True),
                                    StructField("position", IntegerType(), True),
                                    StructField("positionText", StringType(), True),
                                    StructField("positionOrder", IntegerType(), True),
                                    StructField("points", FloatType(), True),
                                    StructField("laps", IntegerType(), True),
                                    StructField("time", StringType(), True),
                                    StructField("milliseconds", IntegerType(), True),
                                    StructField("fastestLap", IntegerType(), True),
                                    StructField("rank", IntegerType(), True),
                                    StructField("fastestLapTime", StringType(), True),
                                    StructField("fastestLapSpeed", StringType(), True),
                                    StructField("statusId", IntegerType(), True),
                                   ])

In [0]:
results_df = spark.read.schema(results_schema).json(f"{raw_folder_path}/results.json")

In [0]:
display(results_df)

resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
1,18,1,1,22,1,1.0,1,1,10.0,58,1:34:50.616,5690616.0,39.0,2.0,1:27.452,218.3,1
2,18,2,2,3,5,2.0,2,2,8.0,58,+5.478,5696094.0,41.0,3.0,1:27.739,217.586,1
3,18,3,3,7,7,3.0,3,3,6.0,58,+8.163,5698779.0,41.0,5.0,1:28.090,216.719,1
4,18,4,4,5,11,4.0,4,4,5.0,58,+17.181,5707797.0,58.0,7.0,1:28.603,215.464,1
5,18,5,1,23,3,5.0,5,5,4.0,58,+18.014,5708630.0,43.0,1.0,1:27.418,218.385,1
6,18,6,3,8,13,6.0,6,6,3.0,57,\N,,50.0,14.0,1:29.639,212.974,11
7,18,7,5,14,17,7.0,7,7,2.0,55,\N,,22.0,12.0,1:29.534,213.224,5
8,18,8,6,1,15,8.0,8,8,1.0,53,\N,,20.0,4.0,1:27.903,217.18,5
9,18,9,2,4,2,,R,9,0.0,47,\N,,15.0,9.0,1:28.753,215.1,4
10,18,10,7,12,18,,R,10,0.0,43,\N,,23.0,13.0,1:29.558,213.166,3


##### Step 2 - Rename the columns and add new columns
1. resultId renamed to result_id
2. raceId renamed to race_id
3. driverId renamed to driver_id
4. constructorId renamed to constructor_id
5. positionText renamed to position_text
6. positionOrder renamed to position_order
7. fastestLap renamed to fastest_lap
8. fastestLapTime renamed to fastest_lap_time
9. fastestLapSpeed renamed to fastest_lap_speed
10. ingestion_date (new)

In [0]:
from pyspark.sql.functions import current_timestamp

In [0]:
results_with_columns_df = add_ingestion_date(results_df.withColumnRenamed("resultId", "result_id") \
                                    .withColumnRenamed("raceId", "race_id") \
                                    .withColumnRenamed("driverId", "driver_id") \
                                    .withColumnRenamed("constructorId", "constructor_id") \
                                    .withColumnRenamed("positionText", "position_text") \
                                    .withColumnRenamed("positionOrder", "position_order") \
                                    .withColumnRenamed("fastestLap", "fastest_lap") \
                                    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed"))

In [0]:
display(results_with_columns_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,statusId,ingestion_date
1,18,1,1,22,1,1.0,1,1,10.0,58,1:34:50.616,5690616.0,39.0,2.0,1:27.452,218.3,1,2022-08-01T03:30:37.323+0000
2,18,2,2,3,5,2.0,2,2,8.0,58,+5.478,5696094.0,41.0,3.0,1:27.739,217.586,1,2022-08-01T03:30:37.323+0000
3,18,3,3,7,7,3.0,3,3,6.0,58,+8.163,5698779.0,41.0,5.0,1:28.090,216.719,1,2022-08-01T03:30:37.323+0000
4,18,4,4,5,11,4.0,4,4,5.0,58,+17.181,5707797.0,58.0,7.0,1:28.603,215.464,1,2022-08-01T03:30:37.323+0000
5,18,5,1,23,3,5.0,5,5,4.0,58,+18.014,5708630.0,43.0,1.0,1:27.418,218.385,1,2022-08-01T03:30:37.323+0000
6,18,6,3,8,13,6.0,6,6,3.0,57,\N,,50.0,14.0,1:29.639,212.974,11,2022-08-01T03:30:37.323+0000
7,18,7,5,14,17,7.0,7,7,2.0,55,\N,,22.0,12.0,1:29.534,213.224,5,2022-08-01T03:30:37.323+0000
8,18,8,6,1,15,8.0,8,8,1.0,53,\N,,20.0,4.0,1:27.903,217.18,5,2022-08-01T03:30:37.323+0000
9,18,9,2,4,2,,R,9,0.0,47,\N,,15.0,9.0,1:28.753,215.1,4,2022-08-01T03:30:37.323+0000
10,18,10,7,12,18,,R,10,0.0,43,\N,,23.0,13.0,1:29.558,213.166,3,2022-08-01T03:30:37.323+0000


##### Step 3 - Drop unwanted columns
1. statusId

In [0]:
results_dropped_df = results_with_columns_df.drop("statusId")

In [0]:
results_final_df = results_dropped_df

In [0]:
display(results_final_df)

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,ingestion_date
1,18,1,1,22,1,1.0,1,1,10.0,58,1:34:50.616,5690616.0,39.0,2.0,1:27.452,218.3,2022-08-01T03:30:38.322+0000
2,18,2,2,3,5,2.0,2,2,8.0,58,+5.478,5696094.0,41.0,3.0,1:27.739,217.586,2022-08-01T03:30:38.322+0000
3,18,3,3,7,7,3.0,3,3,6.0,58,+8.163,5698779.0,41.0,5.0,1:28.090,216.719,2022-08-01T03:30:38.322+0000
4,18,4,4,5,11,4.0,4,4,5.0,58,+17.181,5707797.0,58.0,7.0,1:28.603,215.464,2022-08-01T03:30:38.322+0000
5,18,5,1,23,3,5.0,5,5,4.0,58,+18.014,5708630.0,43.0,1.0,1:27.418,218.385,2022-08-01T03:30:38.322+0000
6,18,6,3,8,13,6.0,6,6,3.0,57,\N,,50.0,14.0,1:29.639,212.974,2022-08-01T03:30:38.322+0000
7,18,7,5,14,17,7.0,7,7,2.0,55,\N,,22.0,12.0,1:29.534,213.224,2022-08-01T03:30:38.322+0000
8,18,8,6,1,15,8.0,8,8,1.0,53,\N,,20.0,4.0,1:27.903,217.18,2022-08-01T03:30:38.322+0000
9,18,9,2,4,2,,R,9,0.0,47,\N,,15.0,9.0,1:28.753,215.1,2022-08-01T03:30:38.322+0000
10,18,10,7,12,18,,R,10,0.0,43,\N,,23.0,13.0,1:29.558,213.166,2022-08-01T03:30:38.322+0000


##### Step 4 - Write output to the processed container in parquet format

In [0]:
results_final_df.write.mode("overwrite").partitionBy('race_id').parquet(f"{processed_folder_path}/results")

In [0]:
%fs

ls dbfs:/FileStore/tables/processed/results

path,name,size,modificationTime
dbfs:/FileStore/tables/processed/results/_SUCCESS,_SUCCESS,0,1659325315000
dbfs:/FileStore/tables/processed/results/race_id=1/,race_id=1/,0,1659325307000
dbfs:/FileStore/tables/processed/results/race_id=10/,race_id=10/,0,1659325314000
dbfs:/FileStore/tables/processed/results/race_id=100/,race_id=100/,0,1659325305000
dbfs:/FileStore/tables/processed/results/race_id=1000/,race_id=1000/,0,1659325313000
dbfs:/FileStore/tables/processed/results/race_id=1001/,race_id=1001/,0,1659325309000
dbfs:/FileStore/tables/processed/results/race_id=1002/,race_id=1002/,0,1659325305000
dbfs:/FileStore/tables/processed/results/race_id=1003/,race_id=1003/,0,1659325314000
dbfs:/FileStore/tables/processed/results/race_id=1004/,race_id=1004/,0,1659325310000
dbfs:/FileStore/tables/processed/results/race_id=1005/,race_id=1005/,0,1659325304000


In [0]:
display(spark.read.parquet(f"{processed_folder_path}/results"))

result_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,ingestion_date,race_id
19232,657,113,14,19,1.0,1,1,8.0,200,3:49:17.27,13757270.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19233,525,114,9,3,2.0,2,2,6.0,200,+1:09.95,13827220.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19234,658,113,2,1,3.0,3,3,5.0,200,+1:19.73,13837000.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19235,526,113,34,11,4.0,4,4,1.5,200,+2:52.68,13929950.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19236,673,113,73,14,5.0,5,5,2.0,200,+3:24.55,13961820.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19237,615,113,77,24,6.0,6,6,0.0,200,+3:47.55,13984820.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19238,528,109,7,6,7.0,7,7,0.0,200,+4:13.35,14010620.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19239,555,113,5,32,8.0,8,8,0.0,200,+5:01.17,14058440.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19240,674,113,28,25,9.0,9,9,0.0,200,+7:07.24,14184510.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
19241,655,129,24,13,10.0,10,10,0.0,200,+7:07.69,14184960.0,,,\N,\N,2022-08-01T03:39:44.302+0000,800
