### Ingest circuits.csv file

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
# print(v_data_source)

Ergast API


##### Step 1 - Read the csv file using the Spark Dataframe Reader

In [0]:
# display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/mnt/sinkstoragedltemp/presentation,abfss://presentation@sinkstoragedltemp.dfs.core.windows.net/,
/mnt/sinkstoragedltemp/processed,abfss://processed@sinkstoragedltemp.dfs.core.windows.net/,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/sinkstoragedltemp/demo,abfss://demo@sinkstoragedltemp.dfs.core.windows.net/,
/Volume,DbfsReserved,
/volumes,DbfsReserved,


In [0]:
# %fs
# ls /mnt/sinkstoragedltemp/raw

path,name,size,modificationTime
dbfs:/mnt/sinkstoragedltemp/raw/ecdc/,ecdc/,0,1693235406000
dbfs:/mnt/sinkstoragedltemp/raw/formula1/,formula1/,0,1696346051000
dbfs:/mnt/sinkstoragedltemp/raw/population/,population/,0,1692681489000


In [0]:
# circuits_df = spark.read.option("header", True) \
#     .option("inferSchema", True) \
#         .csv(f"{raw_folder_path}/{v_file_date}/circuits.csv")

In [0]:
# type(circuits_df)

pyspark.sql.dataframe.DataFrame

In [0]:
# circuits_df.describe().show()

+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|summary|         circuitId|circuitRef|   name| location|  country|               lat|              lng|              alt|                 url|
+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|  count|                77|        77|     77|       77|       77|                77|               77|               77|                  77|
|   mean|              39.0|      NULL|   NULL|     NULL|     NULL| 33.72035103896102|3.551302597402597|247.4935064935065|                NULL|
| stddev|22.371857321197094|      NULL|   NULL|     NULL|     NULL|22.885969000074535| 64.8766790440326|363.2672505910991|                NULL|
|    min|                 1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -22.9756|        -0.331667|               -7|http://en.wiki

In [0]:
# circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



- The problem with above code is, it runs 2 spark jobs! 
- So, we have to manually create our own schema.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)
])

In [0]:
circuits_df = spark.read.option("header", True) \
    .schema(circuits_schema) \
    .csv(f"{raw_folder_path}/{v_file_date}/circuits.csv")

In [0]:
# circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



In [0]:
# display(circuits_df.limit(5))

circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,http://en.wikipedia.org/wiki/Istanbul_Park


##### Step 2 - Select only the required columns

In [0]:
# circuits_selected_df = circuits_df.select("circuitId", "circuitRef", "name", "location", "country", "lat", "lng", "alt")

The below 3 methods of selecting columns, will let you apply any column based function like alias...

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df.circuitId, circuits_df.circuitRef, circuits_df.name, circuits_df.location, 
#                                           circuits_df.country, circuits_df.lat, circuits_df.lng, circuits_df.alt)

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df["circuitId"], circuits_df["circuitRef"], circuits_df["name"], circuits_df["location"], 
#                                           circuits_df["country"], circuits_df["lat"], circuits_df["lng"], circuits_df["alt"])

In [0]:
from pyspark.sql.functions import col, lit

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), 
                                          col("country"), col("lat"), col("lng"), col("alt"))

##### Step 3 - Rename the column as required

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
                                        .withColumnRenamed("circuitRef", "circuit_ref") \
                                        .withColumnRenamed("lat", "latitude") \
                                        .withColumnRenamed("lng", "longitude") \
                                        .withColumnRenamed("alt", "altitude")

##### Step 4 - Add ingestion date to the dataframe

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df) \
    .withColumn("data_source", lit(v_data_source)) \
    .withColumn("file_date", lit(v_file_date))

- Extra: - 
- In above, the DataFrame was adding a new column. And it basically a new Object. So, we can use lit() to add literals as an Object.
- Else if you not use `lit` and directly use string value or paremeter, then this error occurs: `PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got str.`

##### Step 5 - Write data to Delta Lake

In [0]:
# circuits_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/circuits")

In [0]:
circuits_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")

In [0]:
# %sql
# DESC EXTENDED f1_processed.circuits;

col_name,data_type,comment
circuit_id,int,
circuit_ref,string,
name,string,
location,string,
country,string,
latitude,double,
longitude,double,
altitude,int,
ingestion_date,timestamp,
data_source,string,


In [0]:
%sql
SELECT * FROM f1_processed.circuits
LIMIT 5;

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,ingestion_date,data_source,file_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,2023-12-22T11:00:51.502Z,Ergast,2021-04-18
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,2023-12-22T11:00:51.502Z,Ergast,2021-04-18
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,2023-12-22T11:00:51.502Z,Ergast,2021-04-18
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,2023-12-22T11:00:51.502Z,Ergast,2021-04-18
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,2023-12-22T11:00:51.502Z,Ergast,2021-04-18


In [0]:
dbutils.notebook.exit("Success")