## Ingest results.json file

### Step 1. Read the json file using spark DataFrame reader

In [1]:
import findspark
findspark.init('/opt/spark')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, current_timestamp

In [2]:
spark = SparkSession.builder\
    .enableHiveSupport()\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
results_schema = StructType(
    fields = [
        StructField("resultId", IntegerType(), False),
        StructField("raceId", IntegerType(), True),
        StructField("driverId", IntegerType(), True),
        StructField("constructorId", IntegerType(), True),
        StructField("number", IntegerType(), True),
        StructField("grid", IntegerType(), True),
        StructField("position", IntegerType(), True),
        StructField("positionText", StringType(), True),
        StructField("positionOrder", IntegerType(), True),
        StructField("points", FloatType(), True),
        StructField("laps", IntegerType(), True),
        StructField("time", StringType(), True),
        StructField("milliseconds", IntegerType(), True),
        StructField("fastestLap", IntegerType(), True),
        StructField("rank", IntegerType(), True),
        StructField("fastestLapTime", StringType(), True),
        StructField("fastestLapSpeed", FloatType(), True),
        StructField("statusId", StringType(), True)
])

In [4]:
results_df = spark.read \
    .schema(results_schema) \
    .json("/user/jupyter/formula1/raw/results.json")

### Step 2. Rename columns and new columns

In [5]:
results_with_new_columns_df = results_df \
    .withColumnRenamed("resultId", "result_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("positionText", "position_text") \
    .withColumnRenamed("positionOrder", "position_order") \
    .withColumnRenamed("fastestLap", "fastest_lap") \
    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
    .withColumn("ingestion_date", current_timestamp())

### Step 3. Drop the unwanted column

In [6]:
results_final_df = results_with_new_columns_df.drop(col("statusId"))

### Step 4. Save the transformed data in HDFS as a parquet

In [7]:
results_final_df.write.mode("overwrite").partitionBy("race_id").parquet("/user/jupyter/formula1/processed/results")

                                                                                

In [9]:
spark.stop()