### Ingest Results File

In [0]:
dbutils.widgets.text('p_data_source', '')
v_data_source = dbutils.widgets.get('p_data_source')

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

#### Step 1: Read results json file with manual schema declaration.

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", IntegerType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", StringType(), True)
])

In [0]:
results_df = spark.read\
                .schema(results_schema)\
                .json(f'{raw_folder_path}/results.json')

#### Step 2: Drop statusId

In [0]:
results_dropped_df = results_df.drop('statusId')

#### Step 3: Rename columns

In [0]:
from pyspark.sql.functions import lit

results_renamed_df = results_dropped_df.withColumnRenamed('resultId', 'result_id')\
                                        .withColumnRenamed('raceId', 'race_id')\
                                        .withColumnRenamed('driverId', 'driver_id')\
                                        .withColumnRenamed('constructorId', 'constructor_id')\
                                        .withColumnRenamed('positionText', 'position_text')\
                                        .withColumnRenamed('positionOrder', 'position_order')\
                                        .withColumnRenamed('fastestLap', 'fastest_lap')\
                                        .withColumnRenamed('fastestLapTime', 'fastest_lap_time')\
                                        .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed')\
                                        .withColumn('data_source', lit(v_data_source))


#### Step 4: Add Ingestion date

In [0]:
results_final_df = add_ingestion_date(results_renamed_df)

#### Step 5: Save data to data lake in parquet

In [0]:
results_final_df.write.mode('overwrite')\
                        .partitionBy('race_id')\
                        .format('parquet')\
                        .saveAsTable('f1_processed.results')


In [0]:
%fs
ls '/mnt/formula1dl244/processed/results'

In [0]:
dbutils.notebook.exit("Success")