# 0. Get configuration parameters and functions

In [0]:
%run "../includes/configuration"

In [0]:
raw_folder_path

'abfss://raw@aubdbcourse.dfs.core.windows.net'

In [0]:
%run "../includes/common_functions"

In [0]:
dbutils.widgets.text('p_data_source', '')
param_data_source = dbutils.widgets.get('p_data_source')

In [0]:
dbutils.widgets.text('p_file_date', '2021-03-21')
param_file_date = dbutils.widgets.get('p_file_date')

# 1. Read data from results.json

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [0]:
results_schema = StructType([
    StructField('resultId', IntegerType(), False),
    StructField('raceId', IntegerType(), False),
    StructField('driverId', IntegerType(), False),
    StructField('constructorId', IntegerType(), False),
    StructField('number', IntegerType(), True),
    StructField('grid', IntegerType(), False),
    StructField('position', IntegerType(), True),
    StructField('positionText', StringType(), False),
    StructField('positionOrder', IntegerType(), False),
    StructField('points', FloatType(), False),
    StructField('laps', IntegerType(), False),
    StructField('time', StringType(), True),
    StructField('milliseconds', IntegerType(), True),
    StructField('fastestLap', IntegerType(), True),
    StructField('rank', IntegerType(), True),
    StructField('fastestLapTime', StringType(), True),
    StructField('fastestLapSpeed', StringType(), True),
    StructField('statusId', StringType(), False)
])

In [0]:
results_df = spark.read \
    .option('inferSchema', 'false') \
    .schema(results_schema) \
    .json(f'{raw_folder_path}/{param_file_date}/results.json')

In [0]:
results_df.printSchema()

root
 |-- resultId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- constructorId: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- grid: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- positionText: string (nullable = true)
 |-- positionOrder: integer (nullable = true)
 |-- points: float (nullable = true)
 |-- laps: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- milliseconds: integer (nullable = true)
 |-- fastestLap: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- fastestLapTime: string (nullable = true)
 |-- fastestLapSpeed: string (nullable = true)
 |-- statusId: string (nullable = true)



In [0]:
display(results_df.limit(10))

resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
24986,1053,830,9,33,3,1,1,1,25.0,63,2:02:34.598,7354598,60,2,1:17.524,227.96,1
24987,1053,1,131,44,1,2,2,2,19.0,63,+22.000,7376598,60,1,1:16.702,230.403,1
24988,1053,846,1,4,7,3,3,3,15.0,63,+23.702,7378300,63,3,1:18.259,225.819,1
24989,1053,844,6,16,4,4,4,4,12.0,63,+25.579,7380177,60,6,1:18.379,225.473,1
24990,1053,832,6,55,11,5,5,5,10.0,63,+27.036,7381634,60,7,1:18.490,225.154,1
24991,1053,817,1,3,6,6,6,6,8.0,63,+51.220,7405818,54,12,1:19.341,222.739,1
24992,1053,842,213,10,5,7,7,7,6.0,63,+52.818,7407416,52,9,1:18.994,223.718,1
24993,1053,840,117,18,10,8,8,8,4.0,63,+56.909,7411507,59,8,1:18.782,224.32,1
24994,1053,839,214,31,9,9,9,9,2.0,63,+65.704,7420302,62,15,1:19.422,222.512,1
24995,1053,4,214,14,15,10,10,10,1.0,63,+66.561,7421159,62,14,1:19.417,222.526,1


# 2. Reform the schema:
- rename ids, position and lap attributes to snake_case convention
- add ingestion date
- get rid of status column

In [0]:
from pyspark.sql.functions import current_timestamp, lit, desc

In [0]:
results_df = results_df \
  .withColumnRenamed('resultId', 'result_id') \
  .withColumnRenamed('raceId', 'race_id') \
  .withColumnRenamed('driverId', 'driver_id') \
  .withColumnRenamed('constructorId', 'constructor_id') \
  .withColumnRenamed('positionText', 'position_text') \
  .withColumnRenamed('positionOrder', 'position_order') \
  .withColumnRenamed('fastestLap', 'fastest_lap') \
  .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
  .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed') \
  .withColumn('data_source', lit(param_data_source)) \
  .withColumn('file_date', lit(param_file_date))\
  .withColumn('ingestion_date', current_timestamp()) \
  .dropDuplicates(['race_id', 'driver_id'])\
  .drop('statusId')

In [0]:
display(results_df.limit(10))

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,data_source,file_date,ingestion_date
24987,1053,1,131,44,1,2.0,2,2,19.0,63,+22.000,7376598.0,60,1,1:16.702,230.403,,2021-04-18,2024-09-06T08:25:50.69Z
24995,1053,4,214,14,15,10.0,10,10,1.0,63,+66.561,7421159.0,62,14,1:19.417,222.526,,2021-04-18,2024-09-06T08:25:50.69Z
24998,1053,8,51,7,16,13.0,13,13,0.0,63,+94.773,7449371.0,62,5,1:18.353,225.548,,2021-04-18,2024-09-06T08:25:50.69Z
25000,1053,20,117,5,0,15.0,15,15,0.0,61,\N,,59,10,1:19.074,223.491,,2021-04-18,2024-09-06T08:25:50.69Z
24996,1053,815,9,11,2,11.0,11,11,0.0,63,+67.151,7421749.0,62,13,1:19.396,222.585,,2021-04-18,2024-09-06T08:25:50.69Z
24991,1053,817,1,3,6,6.0,6,6,8.0,63,+51.220,7405818.0,54,12,1:19.341,222.739,,2021-04-18,2024-09-06T08:25:50.69Z
25003,1053,822,131,77,8,,R,18,0.0,30,\N,,30,19,1:28.485,199.721,,2021-04-18,2024-09-06T08:25:50.69Z
24986,1053,830,9,33,3,1.0,1,1,25.0,63,2:02:34.598,7354598.0,60,2,1:17.524,227.96,,2021-04-18,2024-09-06T08:25:50.69Z
24990,1053,832,6,55,11,5.0,5,5,10.0,63,+27.036,7381634.0,60,7,1:18.490,225.154,,2021-04-18,2024-09-06T08:25:50.69Z
24994,1053,839,214,31,9,9.0,9,9,2.0,63,+65.704,7420302.0,62,15,1:19.422,222.512,,2021-04-18,2024-09-06T08:25:50.69Z


# 3. Save and read from parquet

In [0]:
merge_condition = 'target.result_id = source.result_id and target.race_id = source.race_id'
upsert_to_delta_table('f1_processed', 'results', processed_folder_path, results_df, merge_condition, 'race_id')

In [0]:
results_df = spark.read.table('f1_processed.results')
display(results_df.groupBy('race_id').count().orderBy(desc('race_id')))

race_id,count
1053,20


In [0]:
dbutils.notebook.exit('success')