## Ingest circuits.csv file

##### Execute notebooks with enviroment configs and common functions

In [0]:
%run "../includes/config"

In [0]:
%run "../includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, current_timestamp, lit

##### Prepare schema for file reading

In [0]:
circuit_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", IntegerType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)    
])

##### Read and transform file
- Read CSV file
- Rename columns to adapt to python pattern
- Add new column with current date

In [0]:
circuits_df = spark.read \
.option("header", True) \
.schema(circuit_schema) \
.csv(f"{raw_folder}/circuits.csv") \
.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [0]:
circuit_renamed_df = circuits_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref")\
.withColumnRenamed("lat", "latitude")\
.withColumnRenamed("lng", "longitude")\
.withColumnRenamed("alt", "altitude")\
.withColumn("env", lit("prd"))

In [0]:
circuit_final_df = add_ingestion_date(circuit_renamed_df)

##### Create external table using parquet files

In [0]:
circuit_final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.circuits")