##Ingest CSV circuits file

###1. read csv using spark dataframe reader

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DoubleType

In [0]:
circuits_schema=StructType(fields=[
StructField("circuitId",IntegerType(),False),
StructField("circuitRef",StringType(),True),
StructField("name",StringType(),True),
StructField("location",StringType(),True),
StructField("country",StringType(),True),
StructField("lat",DoubleType(),True),
StructField("lng",DoubleType(),True),
StructField("alt",IntegerType(),True),
StructField("url",StringType(),True)])

In [0]:
circuits_df=spark.read \
    .option("header",True) \
    .schema(circuits_schema) \
    .csv("abfss://raw@formula1dlbtg.dfs.core.windows.net/circuits.csv")

In [0]:
display(circuits_df)

In [0]:
circuits_df.printSchema()

In [0]:
circuits_df.describe().show()

###2. select only the required column


In [0]:
cirtuits_selected_df = circuits_df.select("circuitId","circuitRef","name","location","country","lat","lng","alt")
display(cirtuits_selected_df)

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"),col("circuitRef"),col("name"),col("location"),col("country").alias("race_country"),col("lat"),col("lng"),col("alt"))

###3. rename the columns as required

In [0]:
circuits_renamed_df = circuits_selected_df=circuits_selected_df.withColumnRenamed("circuitId","circuit_id") \
    .withColumnRenamed("circuitRef","circuit_ref") \
    .withColumnRenamed("lat","latitude") \
    .withColumnRenamed("lng","longitude") \
    .withColumnRenamed("alt","altitude")
display(circuits_renamed_df)

###4. add ingestion date to the dataframe

In [0]:
from pyspark.sql.functions import current_timestamp,lit

In [0]:
circuits_final_df = circuits_renamed_df.withColumn("ingestion_date",current_timestamp()) 
display(circuits_final_df)

### 5. write to datalake as parquet

In [0]:
circuits_final_df.write.parquet("abfss://processed@formula1dlbtg.dfs.core.windows.net/circuits",mode="overwrite")

In [0]:
df = spark.read.parquet("abfss://processed@formula1dlbtg.dfs.core.windows.net/circuits")
display(df)