### Define the dataset
Use the dataset `Formula 1 datasets` which contains data from 1950 to 2022.
This dataset is available on Kaggle and other website called Ergast

https://ergast.com/mrd/

### Define the Catalog and Schema

In [0]:
catalog = "douglas_s_m0pf_da"
db_name = "genie_teams_integration"
volume = "f1_dataset_files"

In [0]:
#Create a catalog called _demos and a database called genie_data in it
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog};")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{db_name};")


#Create a volume in the database genie_data
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{db_name}.{volume};")

In [0]:
import os
os.environ['CATALOG'] = catalog
os.environ['DB_NAME'] = db_name
os.environ['VOLUME'] = volume

In [0]:
%sh

# Define the download URL and target directory
URL="https://raw.githubusercontent.com/rubenv/ergast-mrd/master/f1db_csv.zip"
DEST_DIR="/Volumes/${CATALOG}/${DB_NAME}/${VOLUME}"


# Download the ZIP file
wget -O $DEST_DIR/f1db_csv.zip $URL

# Unzip the file
unzip $DEST_DIR/f1db_csv.zip -d $DEST_DIR

# Clean up the ZIP file
rm $DEST_DIR/f1db_csv.zip

# List the contents of the directory
echo "Files extracted to $DEST_DIR:"
ls $DEST_DIR

### Create a delta table from the CSV files

In [0]:
from pyspark.sql import SparkSession
import os

# Folder containing the CSV files
input_folder = f"/Volumes/{catalog}/{db_name}/{volume}"

# Ensure the database exists in the specified catalog
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{db_name}")

# Loop through all CSV files in the folder
for file_name in os.listdir(input_folder):
    if file_name.endswith(".csv"):
        # Full path to the CSV file
        file_path = f"{input_folder}/{file_name}"
        
        # Load the CSV file into a DataFrame
        df = spark.read.format("csv") \
                       .option("header", "true") \
                       .option("inferSchema", "true") \
                       .load(file_path)
        
        # Create a table name based on the file name (remove the .csv extension)
        table_name = file_name.replace(".csv", "")
        
        # Fully qualified table name
        fully_qualified_table_name = f"{catalog}.{db_name}.{table_name}"
        
        # Register the DataFrame as a table in the catalog
        df.write.format("delta").saveAsTable(fully_qualified_table_name)
        
        print(f"Table '{fully_qualified_table_name}' created from file '{file_name}'")

In [0]:
# List all tables in the specified catalog and database
tables_df = spark.sql(
    f"SHOW TABLES IN {catalog}.{db_name}"
).select(
    "tableName"
)
tables = [
    row['tableName'] for row in tables_df.collect()
]

# Generate DESC for each table
for table in tables:
    fully_qualified_table_name = f"{catalog}.{db_name}.{table}"
    print(f"DESC for table '{fully_qualified_table_name}':")
    spark.sql(f"DESC {fully_qualified_table_name}").show()

### Creating the MV`s


In [0]:
# Import the pyspark functions
from pyspark.sql.functions import col, concat, lit

# ==============================================================================
# Defining Bronze and Silver Tables
# ==============================================================================

# Source tables (Bronze)
circuits_table = f"{CATALOG}.{SCHEMA}.circuits"
constructors_table = f"{CATALOG}.{SCHEMA}.constructors"
drivers_table = f"{CATALOG}.{SCHEMA}.drivers"
races_table = f"{CATALOG}.{SCHEMA}.races"
results_table = f"{CATALOG}.{SCHEMA}.results"
sprint_results_table = f"{CATALOG}.{SCHEMA}.sprint_results"
status_table = f"{CATALOG}.{SCHEMA}.status"

# Target Tables (Silver)
silver_table_name = f"{CATALOG}.{SCHEMA}.fct_race_results"

# ==============================================================================
# Load bronze tables as dataframes
# ==============================================================================
print("Loading bronze tables...")
circuits_df = spark.table(circuits_table)
constructors_df = spark.table(constructors_table)
drivers_df = spark.table(drivers_table)
races_df = spark.table(races_table)
results_df = spark.table(results_table)
sprint_results_df = spark.table(sprint_results_table)
status_df = spark.table(status_table)

# ==============================================================================
# Combining Grand Prix and Sprint
# ==============================================================================
print("Combining Grand Prix and Sprint...")

# Add 'race_type' and prepare to join
results_unioned_df = results_df.withColumn("race_type", lit("Grand Prix"))

# Add coluns to sprint_results
sprint_results_unioned_df = sprint_results_df \
    .withColumn("race_type", lit("Sprint Race")) \
    .withColumn("rank", lit(None).cast("string")) \
    .withColumn("fastestLapSpeed", lit(None).cast("string"))

combined_results_df = results_unioned_df.unionByName(sprint_results_unioned_df)

# ==============================================================================
# Rename coluns for a better column identification
# ==============================================================================
races_renamed_df = races_df.withColumnRenamed("name", "race_name")
circuits_renamed_df = circuits_df.withColumnRenamed("name", "circuit_name")
constructors_renamed_df = constructors_df.withColumnRenamed("name", "constructor_name")

# ==============================================================================
# Join the table with the dimentions
# ==============================================================================
final_df = (
    combined_results_df
    .join(races_renamed_df, "raceId", "inner")
    .join(drivers_df, "driverId", "inner")
    .join(constructors_renamed_df, "constructorId", "inner")
    .join(circuits_renamed_df, "circuitId", "inner")
    .join(status_df, "statusId", "inner")
)

# ==============================================================================
# 7. Selecionar e Renomear as colunas finais
#    (Equivalente ao SELECT principal da query SQL)
# ==============================================================================
print("Selecionando e formatando colunas finais...")
silver_df = final_df.select(
    # Chaves e IDs
    col("resultId"),
    col("raceId"),
    col("driverId"),
    col("constructorId"),
    col("circuitId"),
    col("statusId"),
    
    # Informações da Corrida e Circuito
    col("year").alias("race_year"),
    col("round"),
    col("race_name"),
    col("date").alias("race_date"),
    col("circuit_name"),
    col("location").alias("circuit_location"),
    col("country").alias("circuit_country"),
    col("race_type"),
    
    # Informações do Piloto
    concat(col("forename"), lit(" "), col("surname")).alias("driver_name"),
    col("drivers.number").alias("driver_number"), # Qualificar para evitar ambiguidade
    col("drivers.nationality").alias("driver_nationality"),
    col("dob").alias("driver_date_of_birth"),
    
    # Informações do Construtor
    col("constructor_name"),
    col("constructors.nationality").alias("constructor_nationality"),
    
    # Métricas e Resultados da Corrida
    col("grid").alias("starting_grid"),
    col("positionOrder").alias("finishing_position"),
    col("points"),
    col("laps"),
    col("results.time").alias("total_race_time"), # Qualificar para evitar ambiguidade
    col("fastestLapTime"),
    col("fastestLapSpeed"),
    col("status").alias("finishing_status")
)

# ==============================================================================
# 8. Salvar o DataFrame resultante como uma tabela Delta (camada Silver)
# ==============================================================================
print(f"Salvando tabela silver em: {silver_table_name}")
(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true") # Boa prática durante o desenvolvimento
    .saveAsTable(silver_table_name)
)

print("Tabela silver criada com sucesso! ✅")

# ==============================================================================
# 9. (Opcional) Exibir uma amostra dos dados da nova tabela
# ==============================================================================
display(spark.table(silver_table_name))

In [0]:
final_df = (
    f"""
    CREATE OR REPLACE VIEW {catalog}.{db_name}.vw_race_results_enriched
    COMMENT 'View that consolidates Grand Prix and Sprint results with details on drivers, constructors, and circuits.'
    AS
    WITH combined_results AS (
      SELECT
        resultId, raceId, driverId, constructorId, number, grid, position,
        positionText, positionOrder, points, laps, time, milliseconds,
        fastestLap, rank, fastestLapTime, fastestLapSpeed, statusId,
        'Grand Prix' AS race_type
      FROM
        {catalog}.{db_name}.results
      UNION ALL
      SELECT
        resultId, raceId, driverId, constructorId, number, grid, position,
        positionText, positionOrder, points, laps, time, milliseconds,
        fastestLap,
        CAST(NULL AS STRING) as rank,
        fastestLapTime,
        CAST(NULL AS STRING) as fastestLapSpeed,
        statusId,
        'Sprint Race' AS race_type
      FROM
        {catalog}.{db_name}.sprint_results
    )
    SELECT
      res.resultId, res.raceId, res.driverId, res.constructorId, r.circuitId, res.statusId,
      r.year AS race_year, r.round, r.name AS race_name, r.date AS race_date,
      circ.name AS circuit_name, circ.location AS circuit_location, circ.country AS circuit_country,
      res.race_type,
      CONCAT(d.forename, ' ', d.surname) AS driver_name,
      d.number AS driver_number, d.nationality AS driver_nationality, d.dob AS driver_date_of_birth,
      c.name AS constructor_name, c.nationality AS constructor_nationality,
      res.grid AS starting_grid, res.positionOrder AS finishing_position, res.points, res.laps,
      res.time AS total_race_time, res.fastestLapTime, res.fastestLapSpeed,
      st.status AS finishing_status
    FROM
      combined_results res
      JOIN {catalog}.{db_name}.races r ON res.raceId = r.raceId
      JOIN {catalog}.{db_name}.drivers d ON res.driverId = d.driverId
      JOIN {catalog}.{db_name}.constructors c ON res.constructorId = c.constructorId
      JOIN {catalog}.{db_name}.circuits circ ON r.circuitId = circ.circuitId
      JOIN {catalog}.{db_name}.status st ON res.statusId = st.statusId;
    """
)

spark.sql(final_df)

In [0]:
display(spark.sql(f"SELECT * FROM {catalog}.{db_name}.vw_race_results_enriched LIMIT 10"))

In [0]:
display(spark.sql(f"SELECT DISTINCT race_year FROM {catalog}.{db_name}.vw_race_results_enriched"))