In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "/Formula1/includes/configuration"

Out[2]: [FileInfo(path='dbfs:/mnt/formularacedata/presentation/', name='presentation/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/formularacedata/processed/', name='processed/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/formularacedata/raw/', name='raw/', size=0, modificationTime=0)]

In [0]:
%run "/Formula1/includes/common_functions"

In [0]:
file_path = f'{raw_folder_path}/results.json'
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, FloatType

schema = StructType( [
    StructField("resultId", IntegerType(),False),
    StructField("raceId", IntegerType(), False),
    StructField("driverId", IntegerType(), False),
    StructField ("constructorId", IntegerType(), False),
    StructField("number", IntegerType(),False),
    StructField ("grid",IntegerType(), False),
    StructField ("position", IntegerType(), False),
    StructField("positionText", StringType(),True),
    StructField ("positionOrder",IntegerType(),True),
    StructField("points", FloatType()),
    StructField("laps", IntegerType()),
    StructField ("time",StringType()),
    StructField ("milliseconds", IntegerType()),
    StructField("fastestLap", IntegerType()),
    StructField ("rank",IntegerType()),
    StructField("fastestLapTime", StringType()),
    StructField("fastestLapSpeed", DecimalType() ),
    StructField ("statusId", IntegerType())

])

In [0]:
df = spark.read.format("json").schema(schema).load(file_path)
df.show(2)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|            218|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|            218|       1|
+--------+------+--------+-------------+------+----+--------+------------+-------------+--

In [0]:
from pyspark.sql.functions import col, current_timestamp
new_df = df.select(   col("resultId").alias("result_id"),
                      col("raceId").alias("race_id"),
                      col("driverId").alias("driver_id"),
                      col("constructorId").alias("constructor_id"),
                      "number",
                      "grid",
                      "position",
                      col("positionText").alias("position_text"),
                      col("positionOrder").alias("position_order"),
                      "points",
                      "laps",
                      "time",
                      "milliseconds",
                      col("fastestLap").alias("fastest_lap"),
                      "rank",
                      col("fastestLapTime").alias("fastest_lap_time"),
                      col("fastestLapSpeed").alias("fastest_lap_speed"),
                      current_timestamp().alias("ingestion_date")


                   )

new_df.show(2)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastest_lap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+-----------+----+----------------+-----------------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|         39|   2|        1:27.452|              218|2023-06-29 19:32:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|         41|   3|        1:27.739|              2

In [0]:
results_df = new_df.dropDuplicates(['race_id','driver_id'])

In [0]:
merge_condition = "tgt.result_id = src.result_id AND tgt.race_id = src.race_id"
merge_delta_data(results_df, 'f1_processed', 'results', processed_folder_path, merge_condition, 'race_id')

In [0]:
%sql
SELECT * FROM f1_processed.results LIMIT 5

result_id,race_id,driver_id,constructor_id,number,grid,position,position_text,position_order,points,laps,time,milliseconds,fastest_lap,rank,fastest_lap_time,fastest_lap_speed,ingestion_date
7573,1,1,1,1,18,,D,20,0.0,58,\N,,39,13,1:29.020,214,2023-06-29T19:38:17.157+0000
7563,1,2,2,6,9,10.0,10,10,0.0,58,+7.085,5662869.0,48,5,1:28.283,216,2023-06-29T19:38:17.157+0000
7559,1,3,3,16,5,6.0,6,6,3.0,58,+5.722,5661506.0,48,1,1:27.706,218,2023-06-29T19:38:17.157+0000
7558,1,4,4,7,10,5.0,5,5,4.0,58,+4.879,5660663.0,53,9,1:28.712,215,2023-06-29T19:38:17.157+0000
7561,1,7,5,11,17,8.0,8,8,1.0,58,+6.298,5662082.0,50,17,1:29.823,213,2023-06-29T19:38:17.157+0000


In [0]:
dbutils.notebook.exit("success")

success