###Ingest Results data from bronze to silver

0. Import configuration notebook and widgets

In [0]:
%run ../Includes/Configuration

In [0]:
dbutils.widgets.text('p_data_source', '', 'Data Source')
v_data_source = dbutils.widgets.get('p_data_source')

1. Imports

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, current_timestamp, lit

2. Create schema

In [0]:
results_schema = StructType(
    fields=[
        StructField("resultId", IntegerType(), False),
        StructField("raceId", IntegerType(), False),
        StructField("driverId", IntegerType(), False),
        StructField("constructorId", IntegerType(), False),
        StructField("number", IntegerType(), True),
        StructField("grid", IntegerType(), False),
        StructField("position", IntegerType(), True),
        StructField("positionText", StringType(), False),
        StructField("positionOrder", IntegerType(), False),
        StructField('points', FloatType(), False),
        StructField("laps", IntegerType(), False),
        StructField("time", StringType(), True),
        StructField("milliseconds", IntegerType(), True),
        StructField("fastestLap", IntegerType(), True),
        StructField("rank", IntegerType(), True),
        StructField("fastestLapTime", StringType(), True),
        StructField("fastestLapSpeed", StringType(), True),
        StructField("statusId", IntegerType(), False)
    ]
)

3. Read results.json file

In [0]:
results_raw_df = spark.read \
    .format("json") \
    .schema(results_schema) \
    .load(f"{bronze_container_path}/results.json")

4. Transform the circuit dataframe

In [0]:
results_df = results_raw_df \
        .select(
                col("resultId").alias("result_id"),
                col("raceId").alias("race_id"),
                col("driverId").alias("driver_id"),
                col("constructorId").alias("constructor_id"),
                col("number"),
                col("grid"),
                col("position"),
                col("positionText").alias('position_text'),
                col("positionOrder").alias('position_order'),
                col("points"),
                col("laps"),
                col("time"),
                col("milliseconds"),
                col("fastestLap").alias('fastest_lap'),
                col("rank"),
                col("fastestLapTime").alias('fastest_lap_time'),
                col("fastestLapSpeed").alias('fastest_lap_speed'),
                current_timestamp().alias('ingestion_date')
        ) \
        .withColumn("data_source", lit(v_data_source))

5. Write the dataframe to silver layer

In [0]:
results_df.write.mode('overwrite').partitionBy('race_id').parquet(f"{silver_container_path}/results")