### Ingest circuits.csv file

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

##### Read the CSV files using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType


In [0]:
circuits_schema = StructType(fields = [StructField("circuitId", IntegerType(), False),
                                        StructField("circuitRef", StringType(), False),
                                        StructField("name", StringType(), False),
                                        StructField("location", StringType(), False),
                                        StructField("country", StringType(), False),
                                        StructField("lat", DoubleType(), False),
                                        StructField("lng", DoubleType(), False),
                                        StructField("alt", IntegerType(), False)]
                             )

In [0]:
circuits_df = spark.read \
    .option("header", True) \
    .schema(circuits_schema)\
    .csv(f"{raw_folder_path}/{v_file_date}/circuits.csv")

##### Select only the required columns

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country").alias("race_country"), col("lat"), col("lng"), col("alt"))

#### Rename the columns as required

In [0]:
from pyspark.sql.functions import lit

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
                                          .withColumnRenamed("circuitRef", "circuit_ref") \
                                          .withColumnRenamed("lat", "latitude") \
                                          .withColumnRenamed("lng", "longitude") \
                                          .withColumnRenamed("alt", "altitude") \
                                          .withColumn("data_source", lit(v_data_source)) \
                                          .withColumn("file_date", lit(v_file_date))

##### Add ingestion data to the dataframe

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df)

##### Write data to datalake as parquet


In [0]:
circuits_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")

In [0]:
dbutils.notebook.exit("Success")