In [None]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType, DoubleType, DateType
from pyspark.sql.functions import current_timestamp, to_timestamp, col, concat, lit

from delta.tables import DeltaTable

In [None]:
dbutils.widgets.text("p_data_source", "s3://<bucket>/raw")
v_data_source = dbutils.widgets.get("p_data_source")

dbutils.widgets.text("p_file_date", "2021-03-28")
v_file_date = dbutils.widgets.get("p_file_date")

In [None]:
%run "includes/configuration"
%run "includes/utils"

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS f1_processed;
LOCATION "s3://<bucket>/processed"

### 1. ingest circuit file - CSV

In [None]:
circuit_schema = StructType([StructField("circuitID", IntegerType(), False),
                             StructField("circuitRef", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("location", StringType(), True),
                             StructField("country", StringType(), True),
                             StructField("lat", DoubleType(), True),
                             StructField("lng", DoubleType(), True),
                             StructField("alt", IntegerType(), True),
                             StructField("url", StringType(), True)])

In [None]:
circuit_df = spark.read \
    .option("header", True) \
    .schema(circuit_schema) \
    .csv(f"{v_data_source}/{v_file_date}/circuits.csv")

In [None]:
# rename columns
circuit_df = circuit_df.withColumnRenamed("circuitID", "circuit_id") \
    .withColumnRenamed("circuitRef", "circuit_ref") \
    .withColumnRenamed("lat", "latitude") \
    .withColumnRenamed("lng", "longitude") \
    .withColumnRenamed("alt", "altitude") \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
# write to s3 as a Table
circuit_df.write.mode("overwrite").format("delta") \
    .saveAsTable("f1_processed.circuits")

In [None]:
%sql
SELECT * FROM f1_processed.circuits;

### 2. ingest races file - CSV

In [None]:
race_schema = StructType([StructField("raceId", IntegerType(), False),
                          StructField("year", IntegerType(), True),
                          StructField("round", IntegerType(), True),
                          StructField("circuitId", IntegerType(), True),
                          StructField("name", StringType(), True),
                          StructField("date", StringType(), True),
                          StructField("time", StringType(), True)])

In [None]:
race_df = spark.read \
    .option("header", True) \
    .schema(race_schema) \
    .csv(f"{raw_folder_path}/races.csv")

In [None]:
# add timestamp column
race_df = race_df.withColumn("race_timestamp", to_timestamp(concat(col("date"), lit(" "), col("time")),"yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
# select required columns
race_df = race_df.select(col("raceId").alias("race_id"),
                         col("year").alias("race_year"),
                         col("round"),
                         col("circuitId").alias("circuit_id"),
                         col("name"),
                         col("race_timestamp"),
                         col("ingestion_date"))

In [None]:
# write to s3 by partitioning
# race_df.write.mode("overwrite") \
#     .partitionBy("race_year") \
#     .parquet(f"{precessed_folder_path}/races.parquet")

In [None]:
# write to s3 as a Table
race_df.write.mode("overwrite").format("parquet") \
    .saveAsTable("f1_processed.races")

### 3. ingest constructors file - JSON

In [None]:
constructor_schema = "constructorId INT, constructorRef STRING, name STRING, nationality STRING, url STRING"

In [None]:
constructor_df = spark.read \
    .schema(constructor_schema) \
    .json(f"{raw_folder_path}/constructors.json")

In [None]:
# drop a column
constructor_df = constructor_df.drop(col("url"))

In [None]:
# rename and create a new column
constructor_df = constructor_df.withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("constructorRef", "constructor_ref") \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
# write to s3
# constructor_df.write.mode("overwrite") \
#     .parquet(f"{precessed_folder_path}/constructors.parquet")

In [None]:
# write to s3 as a Table
constructor_df.write.mode("overwrite").format("parquet") \
    .saveAsTable("f1_processed.constructors")

### 4. ingest drivers file - JSON

In [None]:
name_schema = StructType([StructField("forename", StringType(), True),
                          StructField("surname", StringType(), True)])

In [None]:
driver_schema = StructType([StructField("driverId", IntegerType(), False),
                          StructField("driverRef", StringType(), True),
                          StructField("number", IntegerType(), True),
                          StructField("code", StringType(), True),
                          StructField("name", name_schema, True),
                          StructField("dob", DateType(), True),
                          StructField("nationality", StringType(), True),
                          StructField("url", StringType(), True)])

In [None]:
driver_df = spark.read \
    .schema(driver_schema) \
    .json(f"{raw_folder_path}/drivers.json")

In [None]:
# rename and create new columns
driver_df = driver_df.withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("driverRef", "driver_ref") \
    .withColumn("name", concat(col("name.forename"), lit(" "), col("name.surname"))) \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
# drop columns
driver_df = driver_df.drop(col("url"))

In [None]:
# write to s3
# driver_df.write.mode("overwrite") \
#     .parquet(f"{precessed_folder_path}/drivers.parquet")

In [None]:
# write to s3 as a Table
driver_df.write.mode("overwrite").format("parquet") \
    .saveAsTable("f1_processed.drivers")

### 5. ingest results file - JSON

In [None]:
result_schema = StructType([StructField("resultId", IntegerType(), False),
                            StructField("raceId", IntegerType(), True),
                            StructField("driverId", IntegerType(), True),
                            StructField("constructorId", IntegerType(), True),
                            StructField("number", IntegerType(), True),
                            StructField("grid", IntegerType(), True),
                            StructField("position", IntegerType(), True),
                            StructField("positionText", StringType(), True),
                            StructField("positionOrder", IntegerType(), True),
                            StructField("points", FloatType(), True),
                            StructField("laps", IntegerType(), True),
                            StructField("time", StringType(), True),
                            StructField("milliseconds", IntegerType(), True),
                            StructField("fastestLap", IntegerType(), True),
                            StructField("rank", IntegerType(), True),
                            StructField("fastestLapTime", StringType(), True),
                            StructField("fastestLapSpeed", FloatType(), True),
                            StructField("statusId", StringType(), True)])

In [None]:
result_df = spark.read \
    .schema(result_schema) \
    .json(f"{v_data_source}/{v_file_date}/results.json")

In [None]:
# rename and create a new column
result_df = result_df.withColumnRenamed("resultId", "result_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("drivertId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumnRenamed("positionText", "position_text") \
    .withColumnRenamed("positionOrder", "position_order") \
    .withColumnRenamed("fastestLap", "fastest_lap") \
    .withColumnRenamed("fastestLapTime", "fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed", "fastest_lap_speed") \
    .withColumn("data_source", lit(v_data_source)) \
    .withColumn("file_date", lit(v_file_date)) \
    .drop(col("statusId"))

In [None]:
merge_con = "tgt.result_id = src.result_id AND tgt.race_id = src.race_id"
merge_dalta_data(result_df, "f1_processed", "results", "s3://formula1-dl/processed", 
                 merge_con, "race_id")

In [None]:
%sql
SELECT race_id, COUNT(1)
FROM f1_processed.results
GROUP BY race_id
ORDER BY race_id DESC;

### 6. ingest pit_stops file - JSON (multiline)

In [None]:
pit_stop_schema = StructType([StructField("raceId", IntegerType(), False),
                              StructField("driverId", IntegerType(), True),
                              StructField("stop", StringType(), True),
                              StructField("lap", IntegerType(), True),
                              StructField("time", StringType(), True),
                              StructField("duration", StringType(), True),
                              StructField("milliseconds", IntegerType(), True)])

In [None]:
pit_stop_df = spark.read \
    .schema(pit_stop_schema) \
    .option("multiLine", True) \
    .json(f"f"{v_data_source}/{v_file_date}/pit_stops.json")

In [None]:
# rename and create a new column
pit_stop_df = pit_stop_df.withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumn("ingestion_date", current_timestamp())
    .withColumn("data_source", lit(v_data_source)) \
    .withColumn("file_date", lit(v_file_date))

In [None]:
merge_con = """tgt.result_id = src.result_id AND tgt.driver_id = src.driver_id AND 
    tgt.stop = src.stop AND tgt.race_id = src.race_id"""
merge_dalta_data(pit_stop_df, "f1_processed", "pit_stops", "s3://<bucket>/processed", 
                 merge_con, "race_id")

### 7. ingest lap_times file - FOLDER_CSV

In [None]:
lap_times_schema = StructType([StructField("raceId", IntegerType(), False),
                               StructField("driverId", IntegerType(), True),
                               StructField("lap", IntegerType(), True),
                               StructField("position", IntegerType(), True),
                               StructField("time", StringType(), True),
                               StructField("milliseconds", IntegerType(), True)])

In [None]:
lap_times_df = spark.read \
    .schema(lap_times_schema) \
    .csv(f"{raw_folder_path}/lap_times")

In [None]:
# rename and create a new column
lap_times_df = lap_times_df.withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
merge_con = """tgt.result_id = src.result_id AND tgt.driver_id = src.driver_id AND 
    tgt.stop = src.stop AND tgt.race_id = src.race_id"""
merge_dalta_data(lap_times_df, "f1_processed", "lap_times", "s3://<bucket>/processed", 
                 merge_con, "race_id")

### 8. ingest qualifying file - FOLDER_JSON

In [None]:
qualifying_schema = StructType([StructField("qualifyingId", IntegerType(), False),
                                StructField("raceId", IntegerType(), True),
                                StructField("driverId", IntegerType(), True),
                                StructField("constructorId", IntegerType(), True),
                                StructField("number", IntegerType(), True),
                                StructField("position", IntegerType(), True),
                                StructField("q1", StringType(), True),
                                StructField("q2", StringType(), True),
                                StructField("q3", StringType(), True)])

In [None]:
qualifying_df = spark.read \
    .schema(qualifying_schema) \
    .option("multiLine", True) \
    .json(f"{raw_folder_path}/qualifying")

In [None]:
# rename and create a new column
qualifying_df = qualifying_df.withColumnRenamed("qualifyingId", "qualifying_id") \
    .withColumnRenamed("raceId", "race_id") \
    .withColumnRenamed("driverId", "driver_id") \
    .withColumnRenamed("constructorId", "constructor_id") \
    .withColumn("ingestion_date", current_timestamp())

In [None]:
# write to s3
# qualifying_df.write.mode("overwrite") \
#     .parquet(f"{precessed_folder_path}/qualifying.parquet")

In [None]:
# write to s3 as a Table
qualifying_df.write.mode("overwrite").format("parquet") \
    .saveAsTable("f1_processed.qualifyings")