In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
dbutils.widgets.text("p_data_source","")
v_data_source=dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date","2021-03-21")
v_file_date=dbutils.widgets.get("p_file_date")

####Ingest the result.json file


In [0]:
# spark.read.json('/mnt/formuladl15/raw/2021-03-21/results.json').createOrReplaceTempView("results_cutover")

In [0]:
# spark.read.json('/mnt/formuladl15/raw/2021-03-28/results.json').createOrReplaceTempView("results_w1")

In [0]:
# %sql
# select raceId, count(1)
# from results_w1
# group by raceId
# order by raceId

In [0]:
# spark.read.json('/mnt/formuladl15/raw/2021-04-18/results.json').createOrReplaceTempView("results_w2")

#######Step 1 - Read the JSON file using the spark dataframe API

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType ,FloatType

In [0]:
results_schema = StructType(fields=[StructField("resultId", IntegerType(), False),
                            StructField("raceId", IntegerType(), True),
                            StructField("driverId", IntegerType(), True),
                            StructField("constructorId", IntegerType(), True),
                            StructField("number", IntegerType(), True),
                            StructField("grid", IntegerType(), True),
                            StructField("position", IntegerType(), True),
                            StructField("positionText", StringType(), True),
                            StructField("positionOrder", IntegerType(), True),
                            StructField("points", FloatType(), True),
                            StructField("laps", IntegerType(), True),
                            StructField("time", StringType(), True),
                            StructField("milliseconds", IntegerType(), True),
                            StructField("fastestLap", IntegerType(), True),
                            StructField("rank", IntegerType(), True),
                            StructField("fastestLapTime", StringType(), True),
                            StructField("fastestLapSpeed", FloatType(), True),
                            StructField("statusId", IntegerType(), True)])

In [0]:
results_df=spark.read.schema(results_schema).json(f"{raw_folder_path}/{v_file_date}/results.json")

#######Step 2 - Rename columns and add new columns

In [0]:
from pyspark.sql.functions import current_timestamp,lit

In [0]:
results_with_columns_df=results_df.withColumnRenamed("resultId","result_id") \
    .withColumnRenamed("raceId","race_id") \
    .withColumnRenamed("driverId","driver_id") \
    .withColumnRenamed("constructorId","constructor_id") \
    .withColumnRenamed("positionText","position_text") \
    .withColumnRenamed("positionOrder","position_order") \
    .withColumnRenamed("fastestLap","fastest_lap") \
    .withColumnRenamed("fastestLapTime","fastest_lap_time") \
    .withColumnRenamed("fastestLapSpeed","fastest_lap_speed") \
    .withColumn("data_source", lit(v_data_source)) \
        .withColumn("file_date", lit(v_file_date))


In [0]:
results_with_ingestion_df=add_ingestion_date(results_with_columns_df)

######Step 3 - Drop the unwanted column

In [0]:
from pyspark.sql.functions import col

In [0]:
results_final_df=results_with_ingestion_df.drop(col('statusId'))

######Step 4 - Write to output to the processed container in parquet format

######Method1

In [0]:
# # for race_id_list in results_final_df.select("race_id").distinct().collect():
# #     spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION(race_id={race_id_list.race_id})")
# for race_id_list in results_final_df.select("race_id").distinct().collect():
#     race_id = race_id_list.race_id
#     if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
        
#         spark.sql(f"DELETE FROM f1_processed.results WHERE race_id = {race_id}")

######Method2

In [0]:
# %sql
# -- drop table f1_processed.results;

In [0]:
def re_arrange_partition_column(input_df,partition_column):
    column_list= []
    for column in input_df.schema.names:
        if column != partition_column:
            column_list.append(column)
    column_list.append(partition_column)
    output_df=input_df.select(column_list)
    return output_df

In [0]:
re_arrange_partition_column(results_final_df,'race_id')

In [0]:
# def overwrite_partition(input_df,db_name,table_name,partition_column):
#     output_df=re_arrange_partition_column(input_df,partition_column)
#     spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
#     if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
#         output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
#     else:
#         output_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")

In [0]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

In [0]:
results_df=results_final_df.select("result_id","driver_id","constructor_id","number","grid","position","position_text","position_order","points","laps","time","milliseconds","fastest_lap","rank","fastest_lap_time","fastest_lap_speed","data_source","file_date","ingestion_date","race_id")

In [0]:
from pyspark.sql.functions import col
#Note:- The race_id column is of type integer but type casting to string 
# Cast the race_id column to string

results_final_df = results_final_df.withColumn("race_id", col("race_id").cast("string"))

In [0]:
# results_final_df.printSchema()

In [0]:
# Step 1: Check if the table exists
table_exists = spark._jsparkSession.catalog().tableExists("f1_processed", "results")

# Step 2: Perform the appropriate operation
if table_exists:
    # Register the DataFrame as a temporary view
    results_final_df.createOrReplaceTempView("results_final_df")
    
    # Perform a MERGE operation
    spark.sql("""
    MERGE INTO f1_processed.results AS target
    USING results_final_df AS source
    ON target.race_id = source.race_id
    WHEN MATCHED THEN
        UPDATE SET  *

    WHEN NOT MATCHED THEN
        INSERT *
                 
    """,mergeSchema=True)
else:
    # Create the table and insert data
    results_final_df.write \
        .mode("overwrite") \
        .partitionBy("race_id") \
        .format("delta") \
        .saveAsTable("f1_processed.results")

In [0]:
# results_final_df.write.mode("overwrite").partitionBy('race_id').format("delta").saveAsTable("f1_processed.results")

In [0]:
# results_final_df.write.mode("append").insertInto("f1_processed.results")

In [0]:
%sql
select race_id,file_date,count(1) from 
f1_processed.results
group by race_id,file_date
having file_date = '2021-03-28';

In [0]:
dbutils.notebook.exit("success")