### Ingest results.json file

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

#### Step 1 - read the json using spark dataframe reader API

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
results_schema = StructType(fields = [StructField("resultId", IntegerType(), False),
                                      StructField("raceId", IntegerType(), True),
                                      StructField("driverId", IntegerType(), True),
                                      StructField("constructorId", IntegerType(), True),
                                      StructField("number", IntegerType(), True),
                                      StructField("grid", IntegerType(), True),
                                      StructField("position", IntegerType(), True),
                                      StructField("positionText", StringType(), True),
                                      StructField("positionOrder", IntegerType(), True),
                                      StructField("points", FloatType(), True),
                                      StructField("laps", IntegerType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("milliseconds", IntegerType(), True),
                                      StructField("fastestLap", IntegerType(), True),
                                      StructField("rank", IntegerType(), True),
                                      StructField("fastestLapSpeed", FloatType(), True),
                                      StructField("statusId", StringType(), True)
])

In [0]:
results_df = spark.read \
.schema(results_schema) \
.json(f"{raw_folder_path}/results.json")

#### Step 2 - Drop unwanted column from the dataframe

In [0]:
from pyspark.sql.functions import col

In [0]:
results_dropped_df = results_df.drop(col('statusId'))

#### Step 3 - rename and add ingestion date

In [0]:
from pyspark.sql.functions import current_timestamp, lit

In [0]:
results_final_df = add_ingestion_date(results_dropped_df) \
                    .withColumnRenamed('resultId', 'result_id') \
                    .withColumnRenamed('raceId', 'race_id') \
                    .withColumnRenamed('driverId', 'driver_id') \
                    .withColumnRenamed('constructorId', 'constructor_id') \
                    .withColumnRenamed('positionText', 'position_text') \
                    .withColumnRenamed('positionOrder', 'position_order') \
                    .withColumnRenamed('fastestLap', 'fastest_lap') \
                    .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
                    .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed') \
                    .withColumn("data_source", lit(v_data_source))

#### Step 4 - Write output to parquet file

In [0]:
results_final_df.write.mode("overwrite").partitionBy('race_id').parquet(f"{processed_folder_path}/results")

In [0]:
dbutils.notebook.exit("Success")