### 🔍 Step 1: Define the schema for the circuits.csv file

In [78]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

circuits_schema = StructType(fields=[
    StructField("circuitId", IntegerType(), False),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])


StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 80, Finished, Available, Finished)

### 📥 Step 2: Load the data from the raw container

In [79]:
circuits_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(circuits_schema) \
    .load("abfss://projects@onelake.dfs.fabric.microsoft.com/LakehouseTraining.Lakehouse/Files/raw/circuits.csv")

display(circuits_df)




StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 81, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 52cf1992-118f-410d-a278-5153a0741d55)

### 🎯 Step 3: Select relevant columns

In [80]:
from pyspark.sql.functions import col

circuits_selected_df = circuits_df.select(
    col("circuitId"),
    col("circuitRef"),
    col("name"),
    col("location"),
    col("country"),
    col("lat"),
    col("lng"),
    col("alt")
)

display(circuits_selected_df)


StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 82, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c7cffeed-fc57-4963-9d5d-17e1e2fd1f01)

### 🛠️ Step 4: Rename columns to follow snake_case naming convention

In [81]:
circuits_renamed_df = circuits_selected_df \
    .withColumnRenamed("circuitId", "circuit_id") \
    .withColumnRenamed("circuitRef", "circuit_ref") \
    .withColumnRenamed("lat", "latitude") \
    .withColumnRenamed("lng", "longitude") \
    .withColumnRenamed("alt", "altitude")

display(circuits_renamed_df)


StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 83, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a83555e4-048a-43d7-9095-b28b8c5444ef)

### 🕒 Step 5: Add an ingestion date column

In [82]:
from pyspark.sql.functions import current_timestamp

circuits_final_df = circuits_renamed_df.withColumn("ingestion_date", current_timestamp())

display(circuits_final_df)


StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 84, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4652c285-db39-4111-be42-6af0761b4191)

### 💾 Step 6: Write the transformed data to the processed container in parquet format

In [84]:
circuits_final_df.write.mode("overwrite").parquet(
    "abfss://projects@onelake.dfs.fabric.microsoft.com/LakehouseTraining.Lakehouse/Files/processed"
)


StatementMeta(, c738c759-daae-4574-b6af-27cea67ab8d4, 86, Finished, Available, Finished)