##Ingest CSV and perform transformation

In [0]:
from pyspark.sql.types import StructType, StructField,IntegerType, StringType, DoubleType

In [0]:
dbutils.widgets.text("user_input","")
user_output_variable = dbutils.widgets.get("user_input")

In [0]:
%run 
"./include/path_folder"

In [0]:
%run 
"./include/path_functions"

In [0]:
circuit_schema = StructType(fields = [StructField("circuitId",IntegerType(),False),
                                     StructField("circuitRef",StringType(),True),
                                     StructField("name",StringType(),True),
                                     StructField("location",StringType(),True),
                                     StructField("country",StringType(),True),
                                     StructField("lat",DoubleType(),True),
                                     StructField("lng",DoubleType(),True),
                                     StructField("alt",DoubleType(),True),
                                     StructField("url",StringType(),True)])

In [0]:
circuit_df = spark.read \
.option('header',True) \
.schema(circuit_schema) \
.csv(f"{raw_path_folder}/circuits.csv")

In [0]:
display(circuit_df)

circuitId,circuitRef,name,location,country,lat,lng,alt,url
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,http://en.wikipedia.org/wiki/Melbourne_Grand_Prix_Circuit
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,http://en.wikipedia.org/wiki/Sepang_International_Circuit
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,http://en.wikipedia.org/wiki/Bahrain_International_Circuit
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,http://en.wikipedia.org/wiki/Circuit_de_Barcelona-Catalunya
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,http://en.wikipedia.org/wiki/Istanbul_Park
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,http://en.wikipedia.org/wiki/Circuit_de_Monaco
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,http://en.wikipedia.org/wiki/Circuit_Gilles_Villeneuve
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,http://en.wikipedia.org/wiki/Circuit_de_Nevers_Magny-Cours
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,http://en.wikipedia.org/wiki/Silverstone_Circuit
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,http://en.wikipedia.org/wiki/Hockenheimring


In [0]:
circuit_selected_df = circuit_df.select("circuitId","circuitRef","name","location","country","lat","lng","alt")

In [0]:
circuit_selected_df= circuit_df.drop("url")

In [0]:
display(circuit_selected_df)

circuitId,circuitRef,name,location,country,lat,lng,alt
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0


In [0]:
circuit_selected_df = circuit_selected_df.select(circuit_selected_df.circuitId,
                                                 circuit_selected_df.circuitRef, 
                                                 circuit_selected_df.name,
                                                 circuit_selected_df.location,
                                                 circuit_selected_df.country,
                                                 circuit_selected_df.lat,
                                                 circuit_selected_df.lng,
                                                 circuit_selected_df.alt)

In [0]:
circuit_selected_df = circuit_selected_df.select(circuit_selected_df["circuitId"],
                                                 circuit_selected_df["circuitRef"], 
                                                 circuit_selected_df["name"],
                                                 circuit_selected_df["location"],
                                                 circuit_selected_df["country"],
                                                 circuit_selected_df["lat"],
                                                 circuit_selected_df["lng"],
                                                 circuit_selected_df["alt"])

In [0]:
from pyspark.sql.functions import col, lit

In [0]:
circuit_selected_df = circuit_selected_df.select(col("circuitId"),
                                                 col("circuitRef"), 
                                                 col("name"),
                                                 col("location"),
                                                 col("country"),
                                                 col("lat"),
                                                 col("lng"),
                                                 col("alt"))

### Transformation Commands
* Rename the columns
* Add the columns

In [0]:
circuits_renamed_df = circuit_selected_df.withColumnRenamed("circuitId","circuit_id") \
.withColumnRenamed("circuitRef","circuit_ref") \
.withColumnRenamed("lat","latitude") \
.withColumnRenamed("lng","longitude") \
.withColumnRenamed("alt","altitude") \
.withColumn("Environment",lit(user_output_variable))

In [0]:
display(circuits_renamed_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,Environment
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,Dev Env
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,Dev Env
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,Dev Env
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,Dev Env
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,Dev Env
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,Dev Env
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,Dev Env
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,Dev Env
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,Dev Env
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,Dev Env


In [0]:
circuits_final_df = ingestion_date_column(circuits_renamed_df)

In [0]:
display(circuits_final_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,Environment,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,Dev Env,2021-10-17T06:28:18.015+0000
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,Dev Env,2021-10-17T06:28:18.015+0000
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,Dev Env,2021-10-17T06:28:18.015+0000
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,Dev Env,2021-10-17T06:28:18.015+0000
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,Dev Env,2021-10-17T06:28:18.015+0000
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,Dev Env,2021-10-17T06:28:18.015+0000
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,Dev Env,2021-10-17T06:28:18.015+0000
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,Dev Env,2021-10-17T06:28:18.015+0000
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,Dev Env,2021-10-17T06:28:18.015+0000
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,Dev Env,2021-10-17T06:28:18.015+0000


### write the dataframe in delta lake/processed folder in parquet format

In [0]:
dbutils.fs.ls('/mnt/formula01dl/omkar')

In [0]:
circuits_final_df.write.mode('overwrite').parquet(f"{processed_path_folder}/circuits")

In [0]:
display(spark.read.parquet(f"{processed_path_folder}/circuits"))

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,Environment,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10.0,Dev Env,2021-10-17T06:28:18.343+0000
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18.0,Dev Env,2021-10-17T06:28:18.343+0000
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7.0,Dev Env,2021-10-17T06:28:18.343+0000
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109.0,Dev Env,2021-10-17T06:28:18.343+0000
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130.0,Dev Env,2021-10-17T06:28:18.343+0000
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7.0,Dev Env,2021-10-17T06:28:18.343+0000
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13.0,Dev Env,2021-10-17T06:28:18.343+0000
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228.0,Dev Env,2021-10-17T06:28:18.343+0000
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153.0,Dev Env,2021-10-17T06:28:18.343+0000
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103.0,Dev Env,2021-10-17T06:28:18.343+0000
