In [0]:
dbutils.fs.mounts()

Out[3]: [MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/formula1hh/processed', source='abfss://processed@formula1hh.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/formula1hh/raw', source='abfss://raw@formula1hh.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/mnt/formula1hh/presentation', source='abfss://presentation@formula1hh.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/mnt/formula1hh/demo', source='abfss://demo@formula1hh.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType='')

In [0]:
%fs
ls /mnt/formula1hh/raw

path,name,size,modificationTime
dbfs:/mnt/formula1hh/raw/2021-03-21/,2021-03-21/,0,1683160218000
dbfs:/mnt/formula1hh/raw/2021-03-28/,2021-03-28/,0,1683160240000
dbfs:/mnt/formula1hh/raw/2021-04-18/,2021-04-18/,0,1683160248000


In [0]:
dbutils.widgets.text("p_data_source", " ")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType

race_schema = StructType(fields=[StructField("race_id", IntegerType(), False),
                                    StructField("year", IntegerType(), True),
                                    StructField("round", IntegerType(), True),
                                    StructField("circuit_id", StringType(), True),
                                    StructField("name", StringType(), True),
                                    StructField("date", DateType(), True),
                                    StructField("time", StringType(), True)])

In [0]:
race_df = spark.read.csv(f'{raw_folder_path}{v_file_date}/races.csv',header = True, schema = race_schema)

In [0]:
display(race_df)

race_id,year,round,circuit_id,name,date,time
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00
4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00
5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00
6,2009,6,6,Monaco Grand Prix,2009-05-24,12:00:00
7,2009,7,5,Turkish Grand Prix,2009-06-07,12:00:00
8,2009,8,9,British Grand Prix,2009-06-21,12:00:00
9,2009,9,20,German Grand Prix,2009-07-12,12:00:00
10,2009,10,11,Hungarian Grand Prix,2009-07-26,12:00:00


In [0]:
race_df.printSchema()

root
 |-- race_id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- round: integer (nullable = true)
 |-- circuit_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)



In [0]:
from pyspark.sql.functions import current_timestamp, to_timestamp, lit, concat, col
race_timestamp_df = race_df.withColumn('ingestion_date', current_timestamp()).withColumn('race_timestamp', to_timestamp(concat(col('date'), lit(' '), col('time')), 'yyyy-MM-dd HH:mm:ss'))

In [0]:
display(race_timestamp_df)

race_id,year,round,circuit_id,name,date,time,ingestion_date,race_timestamp
1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,2023-05-04T00:41:14.668+0000,2009-03-29T06:00:00.000+0000
2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,2023-05-04T00:41:14.668+0000,2009-04-05T09:00:00.000+0000
3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,2023-05-04T00:41:14.668+0000,2009-04-19T07:00:00.000+0000
4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00,2023-05-04T00:41:14.668+0000,2009-04-26T12:00:00.000+0000
5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00,2023-05-04T00:41:14.668+0000,2009-05-10T12:00:00.000+0000
6,2009,6,6,Monaco Grand Prix,2009-05-24,12:00:00,2023-05-04T00:41:14.668+0000,2009-05-24T12:00:00.000+0000
7,2009,7,5,Turkish Grand Prix,2009-06-07,12:00:00,2023-05-04T00:41:14.668+0000,2009-06-07T12:00:00.000+0000
8,2009,8,9,British Grand Prix,2009-06-21,12:00:00,2023-05-04T00:41:14.668+0000,2009-06-21T12:00:00.000+0000
9,2009,9,20,German Grand Prix,2009-07-12,12:00:00,2023-05-04T00:41:14.668+0000,2009-07-12T12:00:00.000+0000
10,2009,10,11,Hungarian Grand Prix,2009-07-26,12:00:00,2023-05-04T00:41:14.668+0000,2009-07-26T12:00:00.000+0000


In [0]:
selected_df = race_timestamp_df.select("race_id", col("year").alias('race_year'), 'round', 'circuit_id', 'name', 'ingestion_date', 'race_timestamp' )

In [0]:
selected_df = selected_df.withColumn('data_source', lit(v_data_source)).\
    withColumn('file_date', lit(v_file_date))

In [0]:
display(selected_df)

race_id,race_year,round,circuit_id,name,ingestion_date,race_timestamp,data_source,file_date
1,2009,1,1,Australian Grand Prix,2023-05-04T00:42:26.006+0000,2009-03-29T06:00:00.000+0000,,2021-03-21
2,2009,2,2,Malaysian Grand Prix,2023-05-04T00:42:26.006+0000,2009-04-05T09:00:00.000+0000,,2021-03-21
3,2009,3,17,Chinese Grand Prix,2023-05-04T00:42:26.006+0000,2009-04-19T07:00:00.000+0000,,2021-03-21
4,2009,4,3,Bahrain Grand Prix,2023-05-04T00:42:26.006+0000,2009-04-26T12:00:00.000+0000,,2021-03-21
5,2009,5,4,Spanish Grand Prix,2023-05-04T00:42:26.006+0000,2009-05-10T12:00:00.000+0000,,2021-03-21
6,2009,6,6,Monaco Grand Prix,2023-05-04T00:42:26.006+0000,2009-05-24T12:00:00.000+0000,,2021-03-21
7,2009,7,5,Turkish Grand Prix,2023-05-04T00:42:26.006+0000,2009-06-07T12:00:00.000+0000,,2021-03-21
8,2009,8,9,British Grand Prix,2023-05-04T00:42:26.006+0000,2009-06-21T12:00:00.000+0000,,2021-03-21
9,2009,9,20,German Grand Prix,2023-05-04T00:42:26.006+0000,2009-07-12T12:00:00.000+0000,,2021-03-21
10,2009,10,11,Hungarian Grand Prix,2023-05-04T00:42:26.006+0000,2009-07-26T12:00:00.000+0000,,2021-03-21


In [0]:
# write data to datalake parquet
selected_df.write.mode('overwrite').partitionBy('race_year').format('parquet').saveAsTable('f1_processed.race')

In [0]:
%fs
ls '/mnt/formula1hh/processed/race'

path,name,size,modificationTime
dbfs:/mnt/formula1hh/processed/race/_SUCCESS,_SUCCESS,0,1683160971000
dbfs:/mnt/formula1hh/processed/race/race_year=1950/,race_year=1950/,0,1683160954000
dbfs:/mnt/formula1hh/processed/race/race_year=1951/,race_year=1951/,0,1683160955000
dbfs:/mnt/formula1hh/processed/race/race_year=1952/,race_year=1952/,0,1683160955000
dbfs:/mnt/formula1hh/processed/race/race_year=1953/,race_year=1953/,0,1683160955000
dbfs:/mnt/formula1hh/processed/race/race_year=1954/,race_year=1954/,0,1683160955000
dbfs:/mnt/formula1hh/processed/race/race_year=1955/,race_year=1955/,0,1683160955000
dbfs:/mnt/formula1hh/processed/race/race_year=1956/,race_year=1956/,0,1683160956000
dbfs:/mnt/formula1hh/processed/race/race_year=1957/,race_year=1957/,0,1683160956000
dbfs:/mnt/formula1hh/processed/race/race_year=1958/,race_year=1958/,0,1683160956000


In [0]:
df = spark.read.parquet('/mnt/formula1hh/processed/race')

In [0]:
%sql
SELECT *
FROM f1_processed.race

race_id,round,circuit_id,name,ingestion_date,race_timestamp,data_source,file_date,race_year
1053,2,21,Emilia Romagna Grand Prix,2023-05-04T00:42:33.869+0000,2021-04-18T13:00:00.000+0000,,2021-03-21,2021
1052,1,3,Bahrain Grand Prix,2023-05-04T00:42:33.869+0000,2021-03-28T15:00:00.000+0000,,2021-03-21,2021
1051,21,1,Australian Grand Prix,2023-05-04T00:42:33.869+0000,2021-11-21T06:00:00.000+0000,,2021-03-21,2021
1054,3,20,TBC,2023-05-04T00:42:33.869+0000,,,2021-03-21,2021
1055,4,4,Spanish Grand Prix,2023-05-04T00:42:33.869+0000,2021-05-09T13:00:00.000+0000,,2021-03-21,2021
1056,5,6,Monaco Grand Prix,2023-05-04T00:42:33.869+0000,2021-05-23T13:00:00.000+0000,,2021-03-21,2021
1057,6,73,Azerbaijan Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-06T12:00:00.000+0000,,2021-03-21,2021
1058,7,7,Canadian Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-13T18:00:00.000+0000,,2021-03-21,2021
1059,8,34,French Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-27T13:00:00.000+0000,,2021-03-21,2021
1060,9,70,Austrian Grand Prix,2023-05-04T00:42:33.869+0000,2021-07-04T13:00:00.000+0000,,2021-03-21,2021


In [0]:
display(df)

race_id,round,circuit_id,name,ingestion_date,race_timestamp,data_source,file_date,race_year
1053,2,21,Emilia Romagna Grand Prix,2023-05-04T00:42:33.869+0000,2021-04-18T13:00:00.000+0000,,2021-03-21,2021
1052,1,3,Bahrain Grand Prix,2023-05-04T00:42:33.869+0000,2021-03-28T15:00:00.000+0000,,2021-03-21,2021
1051,21,1,Australian Grand Prix,2023-05-04T00:42:33.869+0000,2021-11-21T06:00:00.000+0000,,2021-03-21,2021
1054,3,20,TBC,2023-05-04T00:42:33.869+0000,,,2021-03-21,2021
1055,4,4,Spanish Grand Prix,2023-05-04T00:42:33.869+0000,2021-05-09T13:00:00.000+0000,,2021-03-21,2021
1056,5,6,Monaco Grand Prix,2023-05-04T00:42:33.869+0000,2021-05-23T13:00:00.000+0000,,2021-03-21,2021
1057,6,73,Azerbaijan Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-06T12:00:00.000+0000,,2021-03-21,2021
1058,7,7,Canadian Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-13T18:00:00.000+0000,,2021-03-21,2021
1059,8,34,French Grand Prix,2023-05-04T00:42:33.869+0000,2021-06-27T13:00:00.000+0000,,2021-03-21,2021
1060,9,70,Austrian Grand Prix,2023-05-04T00:42:33.869+0000,2021-07-04T13:00:00.000+0000,,2021-03-21,2021


In [0]:
dbutils.notebook.exit('Success!')

Success!