## Ingest quealifyng folder



# Step 1.- Read the JSON file using the spark dataframe reader API

In [0]:
%run ../includes/configuration 

In [0]:
%run ../includes/common_functions

In [0]:
dbutils.widgets.text('p_data_source', 'testing')
v_data_source = dbutils.widgets.get('p_data_source')

In [0]:
dbutils.widgets.text('p_file_date', '2021-03-28')
v_file_date = dbutils.widgets.get('p_file_date')

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [0]:
qualifying_schema = StructType(fields=[StructField('qualifyId', IntegerType(), False),
                                    StructField('raceId', IntegerType(), True),
                                    StructField('driverId', IntegerType(), True),
                                    StructField('constructorId', IntegerType(), True),
                                    StructField('number', IntegerType(), True),
                                    StructField('position', IntegerType(), True),
                                    StructField('q1', StringType(), True),
                                    StructField('q2', StringType(), True),
                                    StructField('q3', StringType(), True)])

In [0]:
qualifying_df = spark.read \
.schema(qualifying_schema) \
.option('multiLine', True) \
.json(f'{raw_folder_path}/{v_file_date}/qualifying')

## Step 2.- Rename columns and add new columns 
1. Renamed qualifyingId, raceId, driverId, constructorId
2. Add ingestion_date with current timestamp


In [0]:
from pyspark.sql.functions import lit 

In [0]:
final_df = add_ingestion_date(qualifying_df) \
                       .withColumnRenamed('qualifyId', 'qualify_id') \
                       .withColumnRenamed('driverId', 'driver_id') \
                       .withColumnRenamed('raceId', 'race_id') \
                       .withColumnRenamed('constructorId', 'constructor_id') \
                       .withColumn('data_source', lit(v_data_source)) \
                       .withColumn('file_date', lit(v_file_date))

## Step 3.- Write to output tp processed container in parquet format

In [0]:
re_arrange_partition_column(final_df,'race_id')

In [0]:
# overwrite_partition(final_df, 'f1_processed', 'qualifying', 'race_id')

In [0]:
merge_condition = 'tgt.qualify_id = src.qualify_id AND tgt.race_id = src.race_id'
merge_delta(final_df, 'f1_processed', 'qualifying', processed_folder_path, merge_condition, 'race_id')

In [0]:
dbutils.notebook.exit('Success')

In [0]:
%sql select * from f1_processed.qualifying