In [0]:
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [0]:
# Check if writing datatypes are correct
# Check if the writing to parquet works as expected

In [0]:
results_schema = StructType(
    [
        StructField("resultId", IntegerType(), False),
        StructField("raceId", IntegerType(), False),
        StructField("driverId", IntegerType(), False),
        StructField("constructorId", IntegerType(), False),
        StructField("number", IntegerType(), False),
        StructField("grid", IntegerType(), False),
        StructField("position", IntegerType(), False),
        StructField("positionText", StringType(), False),
        StructField("positionOrder", IntegerType(), False),
        StructField("points", IntegerType(), False),
        StructField("laps", IntegerType(), False),
        StructField("time", StringType(), False),
        StructField("milliseconds", IntegerType(), False),
        StructField("fastestLap", IntegerType(), False),
        StructField("rank", IntegerType(), False),
        StructField("fastestLapTime", StringType(), False),
        StructField("fastestLapSpeed", FloatType(), False),
        StructField("statusId", IntegerType(), False),
    ]
)

In [0]:
results_df = spark.read.json("/mnt/sanformula1dl/raw/results.json", schema=results_schema)

In [0]:
display(results_df)

In [0]:
results_selected_df = results_df.select(
    col("resultId").alias("result_id"),
    col("raceId").alias("race_id"),
    col("driverId").alias("driver_id"),
    col("constructorId").alias("constructor_id"),
    col("number"),
    col("grid"),
    col("position"),
    col("positionText").alias("position_text"),
    col("positionOrder").alias("position_order"),
    col("points"),
    col("laps"),
    col("time"),
    col("milliseconds"),
    col("fastestLap").alias("fastest_lap"),
    col("rank"),
    col("fastestLapTime").alias("fastest_lap_time"),
    col("fastestLapSpeed").alias("fastest_lap_speed")
)

In [0]:
results_final_df = results_selected_df.withColumn("ingestion_date", current_timestamp()) \
    .withColumn("data_source", lit("manual") )

In [0]:
display(results_final_df)

In [0]:
results_final_df.write.partitionBy("race_id").parquet("/mnt/sanformula1dl/processed/results", mode="overwrite")