### Step 1 - Start Spark Session and Include additional configurations and common functions

In [1]:
%run "../includes/configurations"

In [2]:
%run "../includes/common_functions"

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

# Initialize a Spark session
spark = SparkSession.builder.appName("ResultsIngestion").getOrCreate()

23/12/29 11:30:49 WARN Utils: Your hostname, falcao-sys resolves to a loopback address: 127.0.1.1; using 192.168.11.185 instead (on interface wlx7898e8c12476)
23/12/29 11:30:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/29 11:30:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step 2 - Read the JSON file sing the Spark Dataframe Reader

In [4]:
results_schema = StructType(fields=[
    StructField("resultId", IntegerType(), False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField("constructorId", IntegerType(), False),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), False),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), False),
    StructField("positionOrder", IntegerType(), False),
    StructField("points", FloatType(), False),
    StructField("laps", IntegerType(), False),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", FloatType(), True),
    StructField("statusId", IntegerType(), False),
])

In [5]:
results_df = spark.read.schema(results_schema).json("../data/results.json")

### Step 3 - Rename and drop columns, and add new columns

In [6]:
results_column_mapping = {
    "resultId": "result_id",
    "raceId": "race_id",
    "driverId": "driver_id",
    "constructorId": "constructor_id",
    "positionText": "position_text",
    "positionOrder": "position_order",
    "fastestLap": "fastest_lap",
    "fastestLapTime": "fastest_lap_time",
    "fastestLapSpeed": "fastest_lap_speed"
}

results_df = rename_columns(results_df, results_column_mapping)

In [7]:
columns_to_drop = ['statusId']

results_df = drop_columns(results_df, columns_to_drop)

In [8]:
results_df = add_ingestion_date(results_df)

### Step 4 - Write to output to process container in Parquet Format

In [9]:
results_df.write.mode("overwrite").partitionBy("race_id").parquet("../processed_data/results")

                                                                                