### Mounting using OAuth with Gen2 storage account and bronze container

### Following the Medallion Architecture

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="f1-kv", key="client-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="f1-kv", key="secret-id"),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope="f1-kv", key="tenant-id")}/oauth2/token"
}

dbutils.fs.mount(
  source = "abfss://bronze@formula1strorage.dfs.core.windows.net/",
  mount_point = "/mnt/mybronze",
  extra_configs = configs)


In [0]:
print(dbutils.secrets.get(scope="f1-kv", key="client-id"))

### Mounting silver container

In [0]:
dbutils.fs.mount(
  source = "abfss://silver@formula1strorage.dfs.core.windows.net/",
  mount_point = "/mnt/mysilver",
  extra_configs = configs)

### Mounting gold container

In [0]:
dbutils.fs.mount(
  source = "abfss://gold@formula1strorage.dfs.core.windows.net/",
  mount_point = "/mnt/mygold",
  extra_configs = configs)

In [0]:
display(dbutils.fs.mounts())

### Reading my_master_data.csv form mybronze mount point

In [0]:
# Read CSV file from mounted storage
df = spark.read.csv("/mnt/mybronze/my_raw_data/my_master_data.csv", header=True)

# List files in the mount
df.show()
df.printSchema()


###  lists all files and folders in the directory path /mnt/mybronze/my_raw_data.

In [0]:
display(dbutils.fs.ls("/mnt/mybronze/my_raw_data"))



In [0]:
dbutils.fs.ls("/mnt/mybronze")


In [0]:
df = df.drop("dateOfBirth", "_c0")
display(df)


### Reading the all null values present in the dataset

In [0]:
from pyspark.sql.functions import col, sum, when  # ← Import 'when' properly

# Count nulls in each column
null_counts = df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

null_counts.show()


### Droping the null values from the dataset

In [0]:
# Remove rows that contain any null value
df_cleaned = df.na.drop()

# Show the cleaned DataFrame
df_cleaned.show()


### Counting null values in each column

In [0]:
from pyspark.sql.functions import col, sum, when  # ← Import 'when' properly

# Count nulls in each column
null_counts = df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

null_counts.show()


In [0]:
from pyspark.sql.functions import col, sum, when

# Count nulls in each column
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()


### Droping some null values to clean the dataset

In [0]:
# Drop rows that contain ANY null values
df_cleaned = df.na.drop()

# Check if nulls are removed
df_cleaned.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_cleaned.columns]).show()


In [0]:
df_cleaned.show()

### Renaming Column name

In [0]:
df_cleaned = df_cleaned.withColumnRenamed("name", "driver_name") \
    .withColumnRenamed("circuitId", "circuit_ID") \
    .withColumnRenamed("driverId", "driver_ID") \
    .withColumnRenamed("circuitName", "circuit_name") \
    .withColumnRenamed("type", "track_type") \
    .withColumnRenamed("q_worst", "qualify_Worst_Time") \
    .withColumnRenamed("q_best", "qualify_Best_Time") \
    .withColumnRenamed("q_mean", "qualify_Average_Time") \
    .withColumnRenamed("qual_position", "qualify_Position")

df = df_cleaned
df.show()



In [0]:
# Assuming your cleaned DataFrame is called 'df'

# Set the mount point directory for the Silver layer
silver_path = "/mnt/mysilver/my_cleaned_data"  # Update with your actual path

# Write the cleaned DataFrame in Delta format
df.write.format("delta").mode("overwrite").save(silver_path)


### Reading data from silver layer in delta formet

In [0]:
# Read the data back from Delta format
df_read = spark.read.format("delta").load(silver_path)
 
# Show the first few rows
df_read.show()


### Reading cleaned data from silver layer

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

# Load the cleaned Delta dataset from the folder
df = spark.read.format("delta").load("/mnt/mysilver/my_cleaned_data")

# Show schema to verify structure
df.printSchema()



### converting the cleaned data into 3NF and storing in the format of delta tables in silver layer

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Define silver base path
silver_base_path = "/mnt/mysilver"

# Load cleansed data from Silver layer
df = spark.read.format("delta").load(f"{silver_base_path}/my_cleaned_data")

# ----------------------------
# DRIVERS TABLE
df_drivers = df.select("driver_ID", "driver_name", "nationality", "code").dropDuplicates()
df_drivers.write.format("delta").mode("overwrite").save(f"{silver_base_path}/drivers")

# ----------------------------
# CONSTRUCTORS TABLE
df_constructors = df.select("constructor").dropDuplicates() \
    .withColumnRenamed("constructor", "constructor_name") \
    .withColumn("constructor_id", monotonically_increasing_id())  # optional ID if missing
df_constructors.write.format("delta").mode("overwrite").save(f"{silver_base_path}/constructors")

# ----------------------------
# CIRCUITS TABLE
df_circuits = df.select(
    "circuit_ID", "circuit_name", "lat", "long", "locality", "country", "track_type", "direction", "length"
).dropDuplicates()
df_circuits.write.format("delta").mode("overwrite").save(f"{silver_base_path}/circuits")

# ----------------------------
# RACES TABLE
df_races = df.select(
    "season", "round", "race_name", "circuit_ID", "date", "distance", "weather"
).dropDuplicates()
df_races.write.format("delta").mode("overwrite").save(f"{silver_base_path}/races")

# ----------------------------
# RESULTS TABLE
df_results = df.select(
    "season", "round", "race_name", "driver_ID", "constructor", "grid", "finish_position",
    "time", "status", "points"
).dropDuplicates()
df_results.write.format("delta").mode("overwrite").save(f"{silver_base_path}/results")

# ----------------------------
# QUALIFYING TABLE
df_qualifying = df.select(
    "season", "round", "race_name", "driver_ID", "qualify_Position",
    "qualify_Best_Time", "qualify_Worst_Time", "qualify_Average_Time"
).dropDuplicates()
df_qualifying.write.format("delta").mode("overwrite").save(f"{silver_base_path}/qualifying")



### Creating ⭐ Star Schema with fact_table and dim_tables

In [0]:
from pyspark.sql.functions import col


# Load cleaned data from Silver layer (Delta format)
silver_path = "/mnt/mysilver/my_cleaned_data"
df = spark.read.format("delta").load(silver_path)

# GOLD layer base path
gold_path = "/mnt/mygold/f1_star_schema"

# DIM_DRIVER
dim_driver = df.select("driver_ID", "driver_name", "nationality", "code").dropDuplicates()
dim_driver.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_driver")

# DIM_CONSTRUCTOR
dim_constructor = df.select("constructor").dropDuplicates().withColumnRenamed("constructor", "constructor_name")
dim_constructor.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_constructor")

# DIM_CIRCUIT
dim_circuit = df.select("circuit_ID", "circuit_name", "lat", "long", "locality", "country", "track_type", "direction", "length").dropDuplicates()
dim_circuit.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_circuit")

# DIM_DATE
dim_date = df.select("date", "season", "round").dropDuplicates()
dim_date.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_date")

# DIM_QUALIFYING
dim_qualifying = df.select("driver_ID", "race_name", "qualify_Position", "qualify_Best_Time", "qualify_Worst_Time", "qualify_Average_Time").dropDuplicates()
dim_qualifying.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_qualifying")

# DIM_RACE
dim_race = df.select("race_name", "distance", "weather").dropDuplicates()
dim_race.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_race")

# FACT_RACE_RESULTS
fact_race_results = df.select(
    "season", "round", "race_name", "circuit_ID", "date", "driver_ID", "constructor", "grid",
    col("finish_position").alias("position"), "time", "status", "points"
)
fact_race_results.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_race_results")


### 1. Aggregations 
### 1.1 Ave Avg Points per Driver

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
agg_avg_points_per_driver = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("driver_ID")
      .agg(spark_avg("points").alias("avg_points"))
      .join(spark.read.format("delta").load(f"{gold_path}/dim_driver"), "driver_ID")
)
agg_avg_points_per_driver.write.format("delta").mode("overwrite").save(f"{base}/agg_avg_points_per_driver")
agg_avg_points_per_driver.show()


In [0]:
df_results.printSchema()


In [0]:
display(agg_avg_points_per_driver)

Databricks visualization. Run in Databricks to view.

### 1.2 Total Races per Circuit

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
agg_races_per_circuit = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("circuit_ID")
      .agg(count("*").alias("num_races"))
      .join(spark.read.format("delta").load(f"{gold_path}/dim_circuit"), "circuit_ID")
)
agg_races_per_circuit.write.format("delta").mode("overwrite").save(f"{base}/agg_races_per_circuit")
display(agg_races_per_circuit)


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### 1.3 Avg Qualify Position per Driver

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
agg_avg_qualify = (
    spark.read.format("delta").load(f"{gold_path}/dim_qualifying")
      .groupBy("driver_ID")
      .agg(spark_avg("qualify_Position").alias("avg_qualify_pos"))
      .join(spark.read.format("delta").load(f"{gold_path}/dim_driver"), "driver_ID")
)
agg_avg_qualify.write.format("delta").mode("overwrite").save(f"{base}/agg_avg_qualify_per_driver")
display(agg_avg_qualify)


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### 1.4 Races per Season

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
agg_races_per_season = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("season")
      .agg(count("*").alias("races_count"))
)
agg_races_per_season.write.format("delta").mode("overwrite").save(f"{base}/agg_races_per_season")
display(agg_races_per_season)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### 2. KPIs
### 2.1 Top Driver Win Rate

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
kpi_win_rate = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("driver_ID")
      .agg(
         (spark_sum((col("position")==1).cast("int")) / count("*"))
         .alias("win_rate")
      )
      .join(spark.read.format("delta").load(f"{gold_path}/dim_driver"), "driver_ID")
)
kpi_win_rate.write.format("delta").mode("overwrite").save(f"{base}/kpi_win_rate")
display(kpi_win_rate)

Databricks visualization. Run in Databricks to view.

### 2.2 Circuit Avg Finish Time

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
kpi_circuit_time = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("circuit_ID")
      .agg(spark_avg("time").alias("avg_finish_time"))
      .join(spark.read.format("delta").load(f"{gold_path}/dim_circuit"), "circuit_ID")
)
kpi_circuit_time.write.format("delta").mode("overwrite").save(f"{base}/kpi_circuit_avg_finish_time")
display(kpi_circuit_time)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### 2.3 Season Avg point

In [0]:
from pyspark.sql.functions import col, avg as spark_avg, sum as spark_sum, count, min as spark_min

base = "/mnt/mygold/f1_star_schema_analytics"
kpi_season_pts = (
    spark.read.format("delta").load(f"{gold_path}/fact_race_results")
      .groupBy("season")
      .agg(spark_avg("points").alias("avg_points"))
)
kpi_season_pts.write.format("delta").mode("overwrite").save(f"{base}/kpi_season_avg_points")
display(kpi_season_pts)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### 2.4  Number of Participations per Season

In [0]:
from pyspark.sql.functions import col, count, sum as spark_sum

gold_path = "/mnt/mygold/f1_star_schema"
analytics_path = "/mnt/mygold/f1_star_schema_analytics"

# Load fact_race_results
fact_race_results = spark.read.format("delta").load(f"{gold_path}/fact_race_results")

# KPI: Number of Participations per Season
kpi_season_wins_participations = (
    fact_race_results
      .groupBy("season")
      .agg(
          count("*").alias("total_participations"),
          spark_sum((col("position") == 1).cast("int")).alias("total_wins")
      )
      .orderBy("season")
)

# Save to Gold Layer
kpi_season_wins_participations.write.format("delta") \
    .mode("overwrite") \
    .save(f"{analytics_path}/kpi_season_wins_participations")

# Display
display(kpi_season_wins_participations)


Databricks visualization. Run in Databricks to view.

### 2.5 Number of Wins per Constructor

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

gold_path = "/mnt/mygold/f1_star_schema"
analytics_path = "/mnt/mygold/f1_star_schema_analytics"

# Load fact_race_results
fact_race_results = spark.read.format("delta").load(f"{gold_path}/fact_race_results")

# KPI: Number of Wins per Constructor
kpi_constructor_wins = (
    fact_race_results
      .groupBy("constructor")
      .agg(
          spark_sum((col("position") == 1).cast("int")).alias("total_wins")
      )
      .orderBy(col("total_wins").desc())
)

# Save to Gold Layer
kpi_constructor_wins.write.format("delta") \
    .mode("overwrite") \
    .save(f"{analytics_path}/kpi_constructor_total_wins")

# Display
from pyspark.sql.functions import col, sum as spark_sum

gold_path = "/mnt/mygold/f1_star_schema"
analytics_path = "/mnt/mygold/f1_star_schema_analytics"

# Load fact and dim tables
fact_df = spark.read.format("delta").load(f"{gold_path}/fact_race_results")
dim_constructor = spark.read.format("delta").load(f"{gold_path}/dim_constructor")

# Join fact with constructor to get constructor_name
fact_with_constructor = (
    fact_df.join(dim_constructor, fact_df["constructor"] == dim_constructor["constructor_name"], "left")
)

# KPI: Wins per Constructor
kpi_constructor_wins = (
    fact_with_constructor
      .groupBy("constructor_name")
      .agg(spark_sum((col("position") == 1).cast("int")).alias("total_wins"))
      .orderBy(col("total_wins").desc())
)

# Save KPI
kpi_constructor_wins.write.format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true").save(f"{analytics_path}/kpi_constructor_total_wins")

display(kpi_constructor_wins)


Databricks visualization. Run in Databricks to view.

### 2.6 Number of races per country

In [0]:
from pyspark.sql.functions import count

# Set gold and analytics path
gold_path = "/mnt/mygold/f1_star_schema"
base = "/mnt/mygold/f1_star_schema_analytics"

# Read necessary tables
df_fact_race_results = spark.read.format("delta").load(f"{gold_path}/fact_race_results")
df_dim_circuit = spark.read.format("delta").load(f"{gold_path}/dim_circuit")

# Join fact and circuit to get country info
kpi_races_per_country = (
    df_fact_race_results
    .join(df_dim_circuit, on="circuit_ID", how="inner")
    .groupBy("country")
    .agg(count("race_name").alias("number_of_races"))
    .orderBy(col("number_of_races").desc())
)

# Save in Gold Analytics Layer
kpi_races_per_country.write.format("delta").mode("overwrite").save(f"{base}/kpi_races_per_country")

# Display for visualization
display(kpi_races_per_country)


Databricks visualization. Run in Databricks to view.