In [0]:
client_id = dbutils.secrets.get(scope="formula1-scope", key="client-id")
tenant_id = dbutils.secrets.get(scope="formula1-scope", key="tenant-id")
client_secret = dbutils.secrets.get(scope="formula1-scope", key="client-secret")

In [0]:
spark.conf.set("fs.azure.account.auth.type.1formula1dl11.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.1formula1dl11.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.1formula1dl11.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.1formula1dl11.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.1formula1dl11.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
dbutils.fs.ls("abfss://raw@1formula1dl11.dfs.core.windows.net")

In [0]:
races_df = spark.read.option("header", "true").csv("abfss://raw@1formula1dl11.dfs.core.windows.net/races.csv")

In [0]:
races_df.show(10)


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType
races_schema = StructType(fields=[StructField("raceId", IntegerType(), False),
                                      StructField("year", IntegerType(), True),
                                      StructField("round", IntegerType(), True),
                                      StructField("circuitId", StringType(), True),
                                      StructField("name", StringType(), True),
                                      StructField("date", StringType(), True),
                                      StructField("time", StringType(), True),
                                      StructField("url", StringType(), True)])
races_df = spark.read \
.schema(races_schema) \
.option("header", "true") \
.csv("abfss://raw@1formula1dl11.dfs.core.windows.net/races.csv")

races_df.show(10)
races_df.printSchema()

In [0]:
from pyspark.sql.functions import col, current_timestamp, to_timestamp, concat, lit, concat_ws
column_select_df = races_df.select(col("raceId"), col("year"), col("round"), col("circuitId"), col("name"), col("date"), col("time"))

column_rename_df = column_select_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude") \
.withColumnRenamed("alt", "altitude") \
.withColumn("race_timestamp",to_timestamp(concat_ws(" ",col("date"),col("time")), "yyyy-MM-dd HH:mm:ss")) \
.withColumn("ingestion_date", current_timestamp())

column_rename_df.show(10)

In [0]:
column_rename_df.write.mode("overwrite").parquet("abfss://processed@1formula1dl11.dfs.core.windows.net/races")
display(dbutils.fs.ls("abfss://processed@1formula1dl11.dfs.core.windows.net/races"))