In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

In [0]:
races_schema = StructType(fields=[
    StructField("raceId", IntegerType(), False),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True)])

In [0]:
races_df = spark.read.options(header=True).schema(races_schema).csv(f"{raw_folder_path}/{v_file_date}/races.csv")

In [0]:
from pyspark.sql.functions import col
races_selected_df = races_df.select(col("raceId"), col("year"), col("round"), col("circuitId"), col("name"), col("date"), col("time")) #dropping url

In [0]:
races_renamed_df =races_selected_df.withColumnRenamed("raceId", "race_id") \
                                   .withColumnRenamed("year", "race_year") \
                                   .withColumnRenamed("circuitId", "circuit_id") 

In [0]:
from pyspark.sql.functions import current_timestamp,lit,to_timestamp,concat,col
races_final_df = add_ingestion_date(races_renamed_df) \
                                .withColumn('race_timestamp',to_timestamp(concat(col('date'),lit(' '),col('time')),'yyyy-MM-dd HH:mm:ss')) \
.withColumn("data_source", lit(v_data_source))\
                                .select(col('race_id'),col('race_year'),col('round'),col('circuit_id'),col('name'),col('race_timestamp'),col('ingestion_date'),col('data_source'))

In [0]:
display(races_final_df)

In [0]:
races_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.races")

In [0]:
dbutils.notebook.exit("success")