# Ingest race results data

In [0]:
dbutils.widgets.text("env", "dev", "Environment")

env = dbutils.widgets.get("env")

In [0]:
%run ../config $env=$env

In [0]:
%run ../utils

In [0]:
csv_file_path = raw_data_folder_path + "results.csv"

df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(csv_file_path)
)

df.display() if env == "dev" else None

In [0]:
import pyspark.sql.functions as F

df_transformed = (
    df
    .withColumnRenamed("raceId", "race_id")
    .withColumnRenamed("resultId", "result_id")
    .withColumnRenamed("driverId", "driver_id")
    .withColumnRenamed("constructorId", "constructor_id")
    .withColumnRenamed("fastestLap", "fastest_lap")
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed")
    .withColumnRenamed("fastestLapTime", "fastest_lap_time")
    .withColumnRenamed("positionOrder", "position_order")
    .withColumnRenamed("positionText", "position_text")
    .withColumn("ingestion_date", F.current_timestamp())
    .drop("statusId")
)

df_transformed.display()  if env == "dev" else None

In [0]:
df_transformed = fillna_str(df_transformed, r"\N")

df_transformed.display()  if env == "dev" else None

In [0]:
df_transformed = (
    df_transformed
    .withColumn("number", F.col("number").cast("integer"))
    .withColumn("position", F.col("position").cast("integer"))
    .withColumn("milliseconds", F.col("milliseconds").cast("integer"))
    .withColumn("fastest_lap", F.col("fastest_lap").cast("integer"))
    .withColumn("rank", F.col("rank").cast("integer"))
    .withColumn("points", F.col("points").cast("integer"))
    .withColumn("fastest_lap_speed", F.col("fastest_lap_speed").cast("float"))
)

df_transformed.display()  if env == "dev" else None

In [0]:
df_transformed.write.format("delta").saveAsTable("bronze_tbl_results", mode="overwrite")