### Ingest circuit.csv file

In [0]:
dbutils.widgets.help()

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "../includes/configuration"

In [0]:
raw_folder_path

#### Step 1 - Read the CSV file using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
display(dbutils.fs.mounts())

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                    StructField("circuitRef", StringType(), True),
                                    StructField("name", StringType(), True),
                                    StructField("location", StringType()),
                                    StructField("country", StringType(), True),
                                    StructField("lat", DoubleType(), True),
                                    StructField("lng", DoubleType(), True),
                                    StructField("alt", IntegerType(), True),
                                    StructField("url", StringType(), True)
                                    ])
                                                 


In [0]:
circuits_df = spark.read \
.option("header", True) \
.schema(circuits_schema) \
.csv(f"{raw_folder_path}/circuits.csv")

In [0]:
display(circuits_df)

### Step 2 - Select only required columns
 Four ways!

In [0]:
# circuits_selected_df = circuits_df.select("circuitId", "circuitRef", "name", "location", "country", "lat", "lng", "alt") # select columns manually but cannot apply column based functions. Other three below you can!

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df.circuitId, circuits_df.circuitRef, circuits_df.name, circuits_df.location, circuits_df.country, circuits_df.lat, circuits_df.alt)

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df["circuitId"], circuits_df["circuitRef"], circuits_df["name"], circuits_df["location"], circuits_df["country"], circuits_df["lat"], circuits_df["alt"])

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("alt"))

### Step 3 - Rename columns as needed

In [0]:
from pyspark.sql.functions import lit

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude") \
.withColumnRenamed("alt", "altitude") \
.withColumn("data_source", lit(v_data_source))


###  Step 4 - Add/Replace new columns - Add ingestion data to dataframe

In [0]:
from pyspark.sql.functions import current_timestamp, lit

circuits_final_df = circuits_renamed_df.withColumn("ingestion_date", current_timestamp()) \
# .withColumn("env", lit("Production"))
display(circuits_final_df)

### Step 5 - Write data to datalake as parquet into file system

In [0]:
circuits_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/circuits")

In [0]:
# circuits_final_df.write.parquet("abfss://raw@formula1dl1216.dfs.core.windows.net/circuits")

In [0]:
%fs
ls abfss://raw@formula1dl1216.dfs.core.windows.net/circuits

In [0]:
# df = spark.read.parquet("abfss://raw@formula1dl1216.dfs.core.windows.net/circuits")
# display(df) 

In [0]:
display(spark.read.parquet("abfss://raw@formula1dl1216.dfs.core.windows.net/circuits"))

In [0]:
dbutils.notebook.exit("Success")