In [0]:
dbutils.fs.mounts()

Out[14]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/datalakegen212/formula1c', source='abfss://formula1c@datalakegen212.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/datalakegen212/raw', source='abfss://raw@datalakegen212.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType=''),
 Mou

In [0]:
display(dbutils.fs.ls("/mnt/datalakegen212/raw"))

path,name,size,modificationTime
dbfs:/mnt/datalakegen212/raw/AdventureWorks_Sales_2015.csv,AdventureWorks_Sales_2015.csv,118594,1691936099000
dbfs:/mnt/datalakegen212/raw/circuits.csv,circuits.csv,10044,1691946334000


####spark reader API. checkout the documenttion to checkout the parameters

In [0]:
circuits_dataframe = spark.read.option("header", True).option("inferSchema", True).csv("/mnt/datalakegen212/raw/circuits.csv")

In [0]:
display(circuits_dataframe)

circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,http://en.wikipedia.org/wiki/Istanbul_Park
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,http://en.wikipedia.org/wiki/Circuit_de_Monaco
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,http://en.wikipedia.org/wiki/Circuit_Gilles_Villeneuve
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,http://en.wikipedia.org/wiki/Circuit_de_Nevers_Magny-Cours
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,http://en.wikipedia.org/wiki/Silverstone_Circuit
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103,http://en.wikipedia.org/wiki/Hockenheimring


In [0]:
circuits_dataframe.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



#### Now we have to apply dataframe Api of spark to transform the data
#### 1) apply proper schema : two methods for that; 1) let spark infer the schema (datatypes) in read statement
#### it will slow the read action in very large data.
#### 2) use pyspark modules to structure the datatypes of each fields.

In [0]:
circuits_dataframe.printSchema()

root
 |-- circuitId: string (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- url: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType


In [0]:
circuits_schema = StructType(fields = [StructField("circuitId", IntegerType(), False),
                                       StructField("circuitRef", StringType(), True),
                                       StructField("name", StringType(), True),
                                       StructField("location", StringType(), True),
                                       StructField("country", StringType(), True),
                                       StructField("lat", DoubleType(), True),
                                       StructField("lng", DoubleType(), True),
                                       StructField("alt", DoubleType(), True),
                                       StructField("url", StringType(), True),
                                       ])

In [0]:
circuits_dataframe = spark.read.option("header", True).schema(circuits_schema).csv("/mnt/datalakegen212/raw/circuits.csv")

In [0]:
circuits_dataframe.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: double (nullable = true)
 |-- url: string (nullable = true)



#### Selecting specific columns of interest we can also drop by using dataframe drop api

In [0]:
circuits_dataframe_selected = circuits_dataframe.select("circuitId", "circuitRef", "name", "location", "country", "lat","lng", "alt", "url")

we can also use fileds by df.columname without string, or df["column name"] just like pandas and also by using col funcion . but these end three have advantage over the first one, which is if we want to apply the functions on the column name then we can do that by the end three methods.

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_dataframe_selected = circuits_dataframe.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country").alias("race_country"), col("lat"),col("lng"), col("alt"))

In [0]:
display(circuits_dataframe_selected)

circuitId,circuitRef,name,location,race_country,lat,lng,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


### Renaming columns

In [0]:
df_selected_renamed = circuits_dataframe_selected.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("race_country", "country") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude")

In [0]:
display(df_selected_renamed)

circuit_id,circuit_ref,name,location,country,latitude,longitude,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


### Adding audit column with injestion time

In [0]:
from pyspark.sql.functions import current_timestamp, lit

"production" is not a column object it is a literal object. so this lit function will make the literal object into column object.

In [0]:
final_df = df_selected_renamed.withColumn("injestion_date", current_timestamp())\
.withColumn("environment",lit("Production"))

In [0]:
display(final_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,alt,injestion_date,environment
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,2023-08-13T18:26:56.780+0000,Production
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,2023-08-13T18:26:56.780+0000,Production
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,2023-08-13T18:26:56.780+0000,Production
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,2023-08-13T18:26:56.780+0000,Production
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,2023-08-13T18:26:56.780+0000,Production
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,2023-08-13T18:26:56.780+0000,Production
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,2023-08-13T18:26:56.780+0000,Production
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,2023-08-13T18:26:56.780+0000,Production
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,2023-08-13T18:26:56.780+0000,Production
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,2023-08-13T18:26:56.780+0000,Production


#### write data to datalake in parquet format

In [0]:
final_df.write.parquet("/mnt/datalakegen212/processed")

we can see that only one part because we have one node. if multiple nodes than there will be many parts of the files. not that the file is in the datalake we can simply use spark read Api to read the data from data lake.

In [0]:
display(dbutils.fs.ls("/mnt/datalakegen212/processed"))

path,name,size,modificationTime
dbfs:/mnt/datalakegen212/processed/_SUCCESS,_SUCCESS,0,1691951514000
dbfs:/mnt/datalakegen212/processed/_committed_4030838509837295265,_committed_4030838509837295265,123,1691951514000
dbfs:/mnt/datalakegen212/processed/_started_4030838509837295265,_started_4030838509837295265,0,1691951512000
dbfs:/mnt/datalakegen212/processed/part-00000-tid-4030838509837295265-cf791cf4-1b58-4d27-919c-1e81b19a94f9-24-1-c000.snappy.parquet,part-00000-tid-4030838509837295265-cf791cf4-1b58-4d27-919c-1e81b19a94f9-24-1-c000.snappy.parquet,8322,1691951514000


In [0]:
df_parquet = spark.read.parquet("/mnt/datalakegen212/processed")

In [0]:
display(df_parquet)

circuit_id,circuit_ref,name,location,country,latitude,longitude,alt,injestion_date,environment
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,2023-08-13T18:31:51.747+0000,Production
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,2023-08-13T18:31:51.747+0000,Production
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,2023-08-13T18:31:51.747+0000,Production
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,2023-08-13T18:31:51.747+0000,Production
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,2023-08-13T18:31:51.747+0000,Production
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,2023-08-13T18:31:51.747+0000,Production
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,2023-08-13T18:31:51.747+0000,Production
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,2023-08-13T18:31:51.747+0000,Production
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,2023-08-13T18:31:51.747+0000,Production
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,2023-08-13T18:31:51.747+0000,Production
