# Ingest circuits from bronze to silver as parquet file
###   Will see all the steps here 

In [0]:
%run "../includes/storage_acc_config"

In [0]:
%run "../includes/folder_path"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import current_timestamp ,lit


In [0]:
# inferSchema=True --> this is used to infer the schema of the data will take longer to run
# to over come this we can use the schema from the csv file


circuit_schema = StructType(fields=[

    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])



In [0]:
circuit_df = spark.read.options(header=True).csv(f'{bronze_folder_path}/circuits.csv')


In [0]:
# display(circuit_df)

# from pyspark.sql.functions import lit, current_timestamp

circuit_modify_df = circuit_df.withColumnRenamed("circuitId", "circuit_id") \
                            .withColumnRenamed("circuitRef", "circuit_ref") \
                            .withColumnRenamed("lat", "latitude") \
                            .withColumnRenamed("lng", "longitude") \
                            .withColumnRenamed("alt", "altitude") \
                            .withColumn("ingestion_date", current_timestamp()) \
                            .withColumn("url", lit(None)).drop("url")


## Write the data to another layer from datframe as parquet file

In [0]:

circuit_modify_df.write.mode("overwrite").parquet(f"{silver_folder_path}/circuits")