# Ingest results.json

#### Requirements
 - rename cols into snake case
 - drop status_id
 - create ingestion date col
 - partition the data on write by race_id

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, current_timestamp

#### Step 1 - Read the data

In [None]:
results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", IntegerType(), True),
])

In [None]:
results_df = spark.read.option("header", True).schema(results_schema).json('/mnt/formula1lgdl/raw/results.json')

#### Step 2 - Rename cols to snake case and drop status_id

In [None]:
results_df_renamed = results_df.select(
    col("resultId").alias("result_id"),
    col("raceId").alias("race_id"),
    col("driverId").alias("driver_id"),
    col("constructorId").alias("constructor_id"),
    col("number"),
    col("grid"),
    col("position"),
    col("positionText").alias("position_text"),
    col("positionOrder").alias("position_order"),
    col("points"),
    col("laps"),
    col("time"),
    col("milliseconds"),
    col("fastestLap").alias("fastest_lap"),
    col("rank"),
    col("fastestLapTime").alias("fastest_lap_time"),
    col("fastestLapSpeed").alias("fastest_lap_speed"),
)

#### Step 4 - create ingestion date col

In [None]:
results_df_final = results_df_renamed.withColumn("ingestion_date", current_timestamp())

#### Step 4 - partition the data on write by race_id

In [None]:
results_df_final.write.mode("overwrite").partitionBy("race_id").parquet("/mnt/formula1lgdl/processed/results")