### Ingesting Results file

##### Step 1 - Read the results.json file

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, FloatType

In [0]:
results_schema = StructType(fields=[
    StructField('resultId', IntegerType(), False),
    StructField('raceId', IntegerType(), False),
    StructField('driverId', IntegerType(), False),
    StructField('constructorId', IntegerType(), False),
    StructField('number', IntegerType(), True),
    StructField('grid', IntegerType(), False),
    StructField('position', IntegerType(), True),
    StructField('positionText', StringType(), False),
    StructField('positionOrder', IntegerType(), True),
    StructField('points', FloatType(), False),
    StructField('laps', IntegerType(), False),
    StructField('time', StringType(), True),
    StructField('milliseconds', IntegerType(), False),
    StructField('fastestLap', IntegerType(), False),
    StructField('rank', IntegerType(), False),
    StructField('fastestLapTime', StringType(), True),
    StructField('fastestLapSpeed', StringType(), True),
    StructField('statusId', IntegerType(), True),
])

In [0]:
volume_dir = '/Volumes/formula1/default/f1_volume'

In [0]:
results_raw_df = spark.read \
    .schema(results_schema) \
    .json(f'{volume_dir}/raw/results.json')

In [0]:
display(results_raw_df)

In [0]:
from pyspark.sql.functions import current_timestamp, col

In [0]:
results_renamed_df = results_raw_df.withColumnRenamed('resultId', 'result_id') \
                                    .withColumnRenamed('raceId', 'race_id') \
                                    .withColumnRenamed('driverId', 'driver_id') \
                                    .withColumnRenamed('constructorId', 'constructor_id') \
                                    .withColumnRenamed('positionText', 'position_text') \
                                    .withColumnRenamed('positionOrder', 'position_order') \
                                    .withColumnRenamed('positionText', 'position_text') \
                                    .withColumnRenamed('fastestLap', 'fastest_lap') \
                                    .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
                                    .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed')

results_renamed_df = results_renamed_df.withColumn('ingestion_date', current_timestamp())


In [0]:
results_final_df = results_renamed_df.drop(col('statusId'))

In [0]:
results_final_df.write.mode('overwrite').partitionBy("race_id").parquet(f'{volume_dir}/processed/results')

In [0]:
%fs
ls /mnt/formula19533dl/processed/results

In [0]:
display(spark.read.parquet(f'{volume_dir}/processed/results'))