# Using widgets

In [0]:
dbutils.widgets.text("data_source_param", "")
data_source_val = dbutils.widgets.get("data_source_param")

# Run helpers notebooks

In [0]:
%run "../helpers/configuration"

In [0]:
%run "../helpers/functions"

# Ingest and process raw data

In [0]:
# Define schema
# Example line of json file:
# {"resultId":1,"raceId":18,"driverId":1,"constructorId":1,"number":22,"grid":1,"position":1,"positionText":1,"positionOrder":1,"points":10,"laps":58,"time":"1:34:50.616","milliseconds":5690616,"fastestLap":39,"rank":2,"fastestLapTime":"1:27.452","fastestLapSpeed":218.3,"statusId":1}

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                            StructField("raceId", IntegerType(), False),
                            StructField("driverId", IntegerType(), False),
                            StructField("constructorId", IntegerType(), False),
                            StructField("number", IntegerType(), True),
                            StructField("grid", IntegerType(), False),
                            StructField("position", IntegerType(), False),
                            StructField("positionText", StringType(), False),
                            StructField("positionOrder", IntegerType(), False),
                            StructField("points", FloatType(), False),
                            StructField("laps", IntegerType(), False),
                            StructField("time", StringType(), True),
                            StructField("milliseconds", IntegerType(), True),
                            StructField("fastestLap", IntegerType(), True),
                            StructField("rank", IntegerType(), True),
                            StructField("fastestLapTime", StringType(), True),
                            StructField("fastestLapSpeed", StringType(), True),
                            StructField("statusId", IntegerType(), False),
                            ])

In [0]:
# Read json file
results_df = spark.read.json(f"{raw_folder_path}/results.json", schema = schema)
# display(results_df)

In [0]:
# Process data

from pyspark.sql.functions import col, lit

results_processed = results_df.withColumnRenamed("resultId", "result_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("positionText", "position_text") \
    .withColumnRenamed("positionOrder", "position_order") \
    .withColumnRenamed("fastestLap", "fastest_lap") \
    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
    .withColumn("data_source", lit(data_source_val)) \
    .drop(col("statusId"))

# display(results_processed)

In [0]:
# Add ingestion date
results_final = add_ingestion_date(results_processed)

In [0]:
# Write df to parquet
results_final.write.parquet(f"{processed_folder_path}/results", mode = "overwrite", partitionBy = "race_id")

In [0]:
# Check if data has been properly written
display(spark.read.parquet(f"{processed_folder_path}/results"))