
## Ingest result file


#### Step 1 - Read the JSON file using the spark dataframe reader API

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), nullable=False),
    StructField("raceId", IntegerType(), nullable=True),
    StructField("driverId", IntegerType(), nullable=True),
    StructField("constructorId", IntegerType(), nullable=True),
    StructField("number", IntegerType(), nullable=True),
    StructField("grid", IntegerType(), nullable=True),
    StructField("position", IntegerType(), nullable=True),
    StructField("positionText", StringType(), nullable=True),
    StructField("positionOrder", IntegerType(), nullable=True),
    StructField("points", FloatType(), nullable=True),
    StructField("laps", IntegerType(), nullable=True),
    StructField("time", StringType(), nullable=True),
    StructField("milliseconds", IntegerType(), nullable=True),
    StructField("fastestLap", IntegerType(), nullable=True),
    StructField("rank", IntegerType(), nullable=True),
    StructField("fastestLapTime", StringType(), nullable=True),
    StructField("fastestLapSpeed", FloatType(), nullable=True),
    StructField("statusId", StringType(), nullable=True)   
])

In [0]:
results_df = spark.read.format("json")\
                       .schema(results_schema)\
                       .load("/mnt/tideformula1dl/raw/results.json")


#### Step 2 - Rename columns and add new columns

In [0]:
from pyspark.sql.functions import current_timestamp

results_with_columns_df = results_df.withColumnRenamed("resultId","result_id") \
                                    .withColumnRenamed("raceId","race_id") \
                                    .withColumnRenamed("driverId","driver_id") \
                                    .withColumnRenamed("constructorId","constructor_id") \
                                    .withColumnRenamed("positionText","position_text") \
                                    .withColumnRenamed("positionOrder","position_order") \
                                    .withColumnRenamed("fastestLap","fastest_lap") \
                                    .withColumnRenamed("fastestLapTime","fastest_lap_time") \
                                    .withColumnRenamed("fastestLapSpeed","fastest_lap_speed") \
                                    .withColumn("ingestion_date",current_timestamp())


#### Step 3 - Drop the unwanted column

In [0]:
from pyspark.sql.functions import col

results_final_df = results_with_columns_df.drop(col("statusId"))


#### Step 4 - Write to output to processed container in parquet format

In [0]:
results_final_df.write.mode("overwrite").partitionBy("race_id").format("parquet").saveAsTable("f1_processed.results")

In [0]:
dbutils.notebook.exit("Success")