# Ingest Results file
- 1. Define Schema
- 2. Read the File
- 3. Rename Coulmns and add ingestion date
- 4. Drop statusid
- 5. write the df in parquet format to adls partitioning by race_id

In [0]:
dbutils.widgets.text('p_data_source', '')
v_data_source = dbutils.widgets.get('p_data_source')

In [0]:
dbutils.widgets.text("p_file_date","2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

### Define Schema

In [0]:
display(dbutils.fs.mounts())

In [0]:
display(dbutils.fs.ls(f'{raw_folder_path}'))

In [0]:
results_raw_df = spark.read.json(f'{raw_folder_path}/{v_file_date}/results.json')

In [0]:
display(results_raw_df)

In [0]:
display(results_raw_df.describe())

In [0]:
results_raw_df.printSchema()

In [0]:
display(len(results_raw_df.columns))

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


In [0]:
results_schema = StructType([
    StructField('resultId', IntegerType(), False),
    StructField('raceId', IntegerType(), True),
    StructField('driverId', IntegerType(), True),
    StructField('constructorId', IntegerType(), True),
    StructField('number', IntegerType(), True),
    StructField('grid', IntegerType(), True),
    StructField('position', IntegerType(), True),
    StructField('positionText', StringType(), True),
    StructField('positionOrder', IntegerType(), True),
    StructField('points', FloatType(), True),
    StructField('laps', IntegerType(), True),
    StructField('time', StringType(), True),
    StructField('milliseconds', IntegerType(), True),
    StructField('fastestLap', IntegerType(), True),
    StructField('rank', IntegerType(), True),
    StructField('fastestLapTime', StringType(), True),
    StructField('fastestLapSpeed', FloatType(), True),
    StructField('statusId', StringType(), True)
])

In [0]:
results_read_df = spark.read.schema(results_schema).json(f'{raw_folder_path}/{v_file_date}/results.json')

In [0]:
display(results_read_df)

In [0]:
results_read_df.printSchema()

### Rename columns and add ingestion date

In [0]:
from pyspark.sql.functions import col, current_timestamp, lit

In [0]:
results_ingestion_date_df = add_ingestion_date(results_read_df)

In [0]:
results_renamed_df = results_ingestion_date_df\
    .withColumnRenamed('resultId','result_id')\
    .withColumnRenamed('raceId','race_id')\
    .withColumnRenamed('driverId', 'driver_id')\
    .withColumnRenamed('constructorId', 'constructor_id')\
    .withColumnRenamed('positionText','position_text')\
    .withColumnRenamed('positionOrder', 'position_order')\
    .withColumnRenamed('fastestLap', 'fastest_lap')\
    .withColumnRenamed('fastestLapTime', 'fastest_lap_time')\
    .withColumnRenamed('fastestLapSpeed','fastest_lap_speed')\
    .withColumn('data_source', lit(v_data_source)) \
    .withColumn('file_date',lit(v_file_date))


In [0]:
results_renamed_df.printSchema()

In [0]:
display(results_renamed_df)

### drop Column

In [0]:
results_df = results_renamed_df.drop(col('statusId'))

In [0]:
results_df.printSchema()

### write in parquet format with partiton by race_id

# Method 1

In [0]:
# for race_id_list in results_df.select("race_id").distinct().collect():
#     if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#         spark.sql(f"ALTER TABLE f1_processed.results DROP IF EXISTS PARTITION (race_id = {race_id_list.race_id})")


In [0]:
# results_df.write.partitionBy('race_id').mode('append').format('parquet').saveAsTable('f1_processed.results')

# METHOD 2

In [0]:
%sql
-- DROP TABLE f1_processed.results

In [0]:
# spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

In [0]:
# results_df = results_df.select(
#     "result_id",
#     "driver_id",
#     "constructor_id",
#     "number",
#     "grid",
#     "position",
#     "position_text",
#     "position_order",
#     "points",
#     "laps",
#     "time",
#     "milliseconds",
#     "fastest_lap",
#     "rank",
#     "fastest_lap_time",
#     "fastest_lap_speed",
#     "data_source",
#     "file_date",
#     "ingestion_date",
#     "race_id"
# )

In [0]:
# if (spark._jsparkSession.catalog().tableExists("f1_processed.results")):
#     results_df.write.mode('overwrite').insertInto('f1_processed.results')
# else:
#     results_df.write.partitionBy('race_id').mode('overwrite').format('parquet').saveAsTable('f1_processed.results')

In [0]:
overwrite_partition(results_df, 'f1_processed', 'results', 'race_id')

In [0]:
display(spark.read.parquet(f'{processed_folder_path}/results'))

In [0]:
dbutils.notebook.exit('Success')

In [0]:
%sql 
SELECT race_id, COUNT(*)
FROM f1_processed.results
GROUP BY race_id
ORDER BY race_id DESC