### Ingest circuits.csv file

#### Step 1 - Prep runtime + common parameters

In [0]:
# Set up runtime parameter to capture data source value

dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

#print(v_data_source)

In [0]:
# Set up runtime parameter to capture file extract date

dbutils.widgets.text("p_file_extract_date", "")
v_file_extract_date = dbutils.widgets.get("p_file_extract_date")

#print(v_file_extract_date)

In [0]:
# Run configuration notebook to get parameterised paths
# print(raw_folder_path)
# print(processed_folder_path)

# To use the variables, replace strings with f"strings" and wrap variable with curly braces
# NOTE: MAKE SURE TO PLACE %run COMMANDS ON INDIVIDUAL CELLS

In [0]:
%run "../includes/configuration"

In [0]:
# Run common_functions notebook to append ingestion_date

In [0]:
%run "../includes/common_functions"

In [0]:
print(raw_folder_path)

#### Step 2 - Read the CSV file using the spark dataframe reader + defining the schema

In [0]:
# Scratchpad - Check the current mounted file system

display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/mnt/dlsdardatainfradev/processed,abfss://processed@dlsdardatainfradev.dfs.core.windows.net/,
/mnt/dlsdardatainfradev/demo,abfss://demo@dlsdardatainfradev.dfs.core.windows.net/,
/mnt/dlsdardatainfradev/raw,abfss://raw@dlsdardatainfradev.dfs.core.windows.net/,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/dlsdardatainfradev/presentation,abfss://presentation@dlsdardatainfradev.dfs.core.windows.net/,
/,DatabricksRoot,


In [0]:
# # Scratchpad - Check the contents of /raw
# %fs
# ls /mnt/dlsdardatainfradev/raw

In [0]:
# Reading circuits.csv file

# Ed - substituted with common variable path
#circuits_df = spark.read.csv("dbfs:/mnt/dlsdardatainfradev/raw/circuits.csv")

circuits_df = spark.read.csv(f"dbfs:{raw_folder_path}/{v_file_extract_date}/circuits.csv")

# Standard data profiling methods:
# ===================================================================
# 1) Display contents in human readable format -> display(circuits_df)
# 2) Check variable type     ->   type(circuits_df)
# 3) Display first n records ->   circuits_df.show()
# 4) Display contents and specifying first line as the header -> spark.read.option("header",True).csv("dbfs:/mnt/dlsdardatainfradev/raw/circuits.csv")
# 5) Display the dataframe schema (columns, datatypes) -> circuits_df.printSchema()
# 6) Analyze column attributes of a file (summary count,mean, min, max ) -> circuits_df.describe().show()
# 7) Infer the schema of a file based on the actual column values. Note need to run printSchema() next to extract the new schema

#       spark.read \
#       .option("header",True) \
#       .option("inferSchema",True) \
#       .csv("dbfs:/mnt/dlsdardatainfradev/raw/circuits.csv")

In [0]:
# To specify a schema, we import the below types

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
# Note boolean argument is if nullable or not

circuits_schema = StructType(
                             fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name"      , StringType(), True),
                                     StructField("location"  , StringType(), True),
                                     StructField("country"   , StringType(), True),
                                     StructField("lat"       , DoubleType(), True),
                                     StructField("lng"       , DoubleType(), True),
                                     StructField("alt"       , IntegerType(), True),
                                     StructField("url"       , StringType(), True) ] 
                               )

In [0]:
# Now after profiling the file, using (a) circuits_df.describe().show() and (b) option("inferSchema",True),
# we can now finally read the file with a defined schema (circuits_schema)

circuits_df = spark.read \
                   .option("header",True) \
                   .schema(circuits_schema) \
                   .csv(f"dbfs:{raw_folder_path}/{v_file_extract_date}/circuits.csv")

In [0]:
# Validating schema
circuits_df.printSchema()

#### Step 3 - Select required columns

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(
                                           col("circuitId")  \
                                          ,col("circuitRef") \
                                          ,col("name")       \
                                          ,col("location")   \
                                          ,col("country")    \
                                          ,col("lat")        \
                                          ,col("lng")        \
                                          ,col("alt")
                                                      )
#display(circuits_selected_df)

#### Step 4 - Rename the columns as required

In [0]:
# Dataframe.withColumnRenamed(existing, new)

circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
                                          .withColumnRenamed("circuitRef", "circuit_ref") \
                                          .withColumnRenamed("lat", "latitude") \
                                          .withColumnRenamed("lng", "longitude") \
                                          .withColumnRenamed("alt", "altitude") 

####Step 5 - Add audit columns -> ingestion date + data source to the dataframe

In [0]:
from pyspark.sql.functions import lit

# To add audit column, we use DataFrame.withColumn(colName, col)
# where col is the column expression 

# Ed - replaced below with common_function add_ingestion_date()
#    - Added additional audit column to specify "data_source"
# <start>
#circuits_final_df = circuits_renamed_df.withColumn("ingestion_date", current_timestamp()) 
circuits_final_df = add_ingestion_date(circuits_renamed_df.withColumn("data_source",lit(v_data_source)) \
                                                          .withColumn("file_extract_date",lit(v_file_extract_date)) \
                                       )
# <end>

# if need to use a literal value - like 'DEV','PROD', we can use lit() to indicate that the column is a literal.
# Have to import this method before using
#
# from pyspark.sql.functions import current_timestamp, lit
#
# Example:
# .withColumn("env",lit("PROD"))

#display(circuits_final_df)

In [0]:
display(circuits_final_df)

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,data_source,file_extract_date,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,test,2021-04-18,2022-07-30T17:07:10.483+0000
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,test,2021-04-18,2022-07-30T17:07:10.483+0000
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,test,2021-04-18,2022-07-30T17:07:10.483+0000
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,test,2021-04-18,2022-07-30T17:07:10.483+0000
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,test,2021-04-18,2022-07-30T17:07:10.483+0000
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,test,2021-04-18,2022-07-30T17:07:10.483+0000
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,test,2021-04-18,2022-07-30T17:07:10.483+0000
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,test,2021-04-18,2022-07-30T17:07:10.483+0000
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,test,2021-04-18,2022-07-30T17:07:10.483+0000
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103,test,2021-04-18,2022-07-30T17:07:10.483+0000


#### Step 6 - Write data to datalake as parquet and create managed table -> Convert to Delta as final step

In [0]:
# Use DataFrameWriter.parquet(path,mode=None,partitionBy=None,compression=None)
# Saves the content of the DF in Parquet format at the specified path

# Use overwrite mode when writing to parquet sink

# Ed: Converted to also directly save as external table at time the parquet file is generated
# <start>
#circuits_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/circuits")
#circuits_final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.circuits")
circuits_final_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")
# <end>

In [0]:
# To verify the parquet file, we can do an actual read on the parquet file
#display(spark.read.parquet(f"{processed_folder_path}/circuits") )

In [0]:
%sql
--Verifying external table via SQL
SELECT * FROM f1_processed.circuits;

circuit_id,circuit_ref,name,location,country,latitude,longitude,altitude,data_source,file_extract_date,ingestion_date
1,albert_park,Albert Park Grand Prix Circuit,Melbourne,Australia,-37.8497,144.968,10,test,2021-04-18,2022-07-30T17:07:10.737+0000
2,sepang,Sepang International Circuit,Kuala Lumpur,Malaysia,2.76083,101.738,18,test,2021-04-18,2022-07-30T17:07:10.737+0000
3,bahrain,Bahrain International Circuit,Sakhir,Bahrain,26.0325,50.5106,7,test,2021-04-18,2022-07-30T17:07:10.737+0000
4,catalunya,Circuit de Barcelona-Catalunya,Montmeló,Spain,41.57,2.26111,109,test,2021-04-18,2022-07-30T17:07:10.737+0000
5,istanbul,Istanbul Park,Istanbul,Turkey,40.9517,29.405,130,test,2021-04-18,2022-07-30T17:07:10.737+0000
6,monaco,Circuit de Monaco,Monte-Carlo,Monaco,43.7347,7.42056,7,test,2021-04-18,2022-07-30T17:07:10.737+0000
7,villeneuve,Circuit Gilles Villeneuve,Montreal,Canada,45.5,-73.5228,13,test,2021-04-18,2022-07-30T17:07:10.737+0000
8,magny_cours,Circuit de Nevers Magny-Cours,Magny Cours,France,46.8642,3.16361,228,test,2021-04-18,2022-07-30T17:07:10.737+0000
9,silverstone,Silverstone Circuit,Silverstone,UK,52.0786,-1.01694,153,test,2021-04-18,2022-07-30T17:07:10.737+0000
10,hockenheimring,Hockenheimring,Hockenheim,Germany,49.3278,8.56583,103,test,2021-04-18,2022-07-30T17:07:10.737+0000


#### Step 7 - Return exit code upon completion -> Success

In [0]:
dbutils.notebook.exit("Success")

Success