# Pipeline: Raw -> Silver

## Instru√ß√µes e informa√ß√µes

### Objetivo:  

Nesta fase, realizamos um processo de **extra√ß√£o, limpeza e transforma√ß√£o** dos dados:

- **Extra√ß√£o:** carregamento de arquivos CSV brutos provenientes de diferentes fontes.  
- **Limpeza:** tratamento de valores faltantes, padroniza√ß√£o de tipos e formatos, corre√ß√£o de inconsist√™ncias.  
- **Transforma√ß√£o:** aplica√ß√£o de regras de neg√≥cio, filtros, agrega√ß√µes e organiza√ß√£o dos dados de forma estruturada para an√°lise.  

### Configura√ß√£o Inicial

Esta se√ß√£o realiza a configura√ß√£o inicial de todo o ambiente que ser√° utilizado nas an√°lises subsequentes, incluindo:  

- Importa√ß√£o das bibliotecas necess√°rias (ex.: `pandas`, `numpy`, `matplotlib`, `seaborn`), explicando a fun√ß√£o de cada uma na sequ√™ncia de transforma√ß√µes.  
- Defini√ß√£o dos caminhos para os arquivos de dados brutos (CSV da camada Bronze) que ser√£o processados.  
- Identifica√ß√£o de metadados relevantes que podem ser √∫teis para a limpeza e an√°lise, como tipos de colunas, valores nulos, formatos de data, etc.  


In [51]:
import os
import re
from sqlalchemy import create_engine
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lower, trim, regexp_replace, coalesce, lit, floor, to_date, year, month, dayofmonth, concat_ws, date_format
from pyspark.sql.types import IntegerType, DoubleType, StringType, DateType
import pandas as pd

spark = (
    SparkSession.builder
    .appName("Formula1Analysis")
    .master("local[*]") 
    .config("spark.driver.memory", "4g") 
    .config("spark.executor.memory", "4g") 
    .getOrCreate()
)

if spark.sparkContext.appName == "Formula1Analysis":
    print("‚úÖ Configura√ß√£o do PySpark conclu√≠da com sucesso.")
    print(f"Vers√£o do Spark: {spark.version}")
else:
    print("‚ùå Erro na configura√ß√£o do PySpark.")

‚úÖ Configura√ß√£o do PySpark conclu√≠da com sucesso.
Vers√£o do Spark: 4.0.1


In [16]:
BASE_PATH = "../data_layer/raw/dados_brutos/"

files = {
    'constructor_results_df': 'constructor_results.csv',
    'constructor_standings_df': 'constructor_standings.csv',
    'constructors_df': 'constructors.csv',
    'driver_standings_df': 'driver_standings.csv',
    'drivers_df': 'drivers.csv',
    'lap_times_df': 'lap_times.csv',
    'pit_stops_df': 'pit_stops.csv',
    'qualifying_df': 'qualifying.csv',
    'races_df': 'races.csv',
    'results_df': 'results.csv',
    'seasons_df': 'seasons.csv',       
    'status_df': 'status.csv',         
    'circuits_df': 'circuits.csv',     
    'sprint_results_df': 'sprint_results.csv' 
}

read_options = {
    "header": True,          
    "inferSchema": True,     
    "nullValue": '\\N',      
    "sep": ","               
}

print("\nIniciando a leitura dos arquivos para a Camada Bronze:")

for df_name, file_name in files.items():
    full_path = BASE_PATH + file_name
    try:
        # A vari√°vel 'spark' √© usada aqui
        df = spark.read.csv(full_path, **read_options)
        globals()[f"{df_name.replace('_df', '')}_bronze"] = df
        print(f"‚úÖ DataFrame '{df_name.replace('_df', '')}_bronze' carregado.")
        
    except Exception as e:
        print(f"‚ùå Erro ao carregar o arquivo {full_path}: {e}")


Iniciando a leitura dos arquivos para a Camada Bronze:
‚úÖ DataFrame 'constructor_results_bronze' carregado.
‚úÖ DataFrame 'constructor_standings_bronze' carregado.
‚úÖ DataFrame 'constructors_bronze' carregado.
‚úÖ DataFrame 'driver_standings_bronze' carregado.
‚úÖ DataFrame 'drivers_bronze' carregado.
‚úÖ DataFrame 'lap_times_bronze' carregado.
‚úÖ DataFrame 'pit_stops_bronze' carregado.
‚úÖ DataFrame 'qualifying_bronze' carregado.
‚úÖ DataFrame 'races_bronze' carregado.
‚úÖ DataFrame 'results_bronze' carregado.
‚úÖ DataFrame 'seasons_bronze' carregado.
‚úÖ DataFrame 'status_bronze' carregado.
‚úÖ DataFrame 'circuits_bronze' carregado.
‚úÖ DataFrame 'sprint_results_bronze' carregado.


___
## Processo de Transforma√ß√£o para a Camada SILVER

Este notebook documenta a etapa de transforma√ß√£o de dados para o **Modelo Dimensional da Camada Silver**, sucedendo a ingest√£o de 14 arquivos CSV da base de dados de F√≥rmula 1 (Ergast) na Camada Bronze. Os dados brutos, preservados com granularidade transacional por corrida, piloto ou volta, s√£o lidos integralmente e reestruturados para an√°lise otimizada.

Esta primeira c√©lula define as fun√ß√µes que especificavam quais colunas ser√£o mantidas


In [23]:
def get_circuits_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'circuits_bronze'."""
    return [
        'circuitId', 'circuitRef', 'name', 'location', 'country', 'lat', 'lng', 'alt'
    ]

def get_races_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'races_bronze'."""
    return [
        'raceId', 'year', 'round', 'circuitId', 'name', 'date', 'time'
    ]

def get_drivers_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'drivers_bronze'."""
    return [
        'driverId', 'driverRef', 'number', 'code', 'forename', 'surname', 'dob', 'nationality'
    ]

def get_constructors_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'constructors_bronze'."""
    return [
        'constructorId', 'constructorRef', 'name', 'nationality'
    ]

def get_results_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'results_bronze'."""
    return [
        'resultId', 'raceId', 'driverId', 'constructorId', 'number', 'grid', 'position', 
        'positionText', 'positionOrder', 'points', 'laps', 'time', 'milliseconds', 
        'fastestLap', 'rank', 'fastestLapTime', 'fastestLapSpeed', 'statusId'
    ]

def get_constructor_results_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'constructor_results_bronze'."""
    return [
        'constructorResultsId', 'raceId', 'constructorId', 'points', 'status'
    ]

def get_driver_standings_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'driver_standings_bronze'."""
    return [
        'driverStandingsId', 'raceId', 'driverId', 'points', 'position', 'positionText', 'wins'
    ]

def get_constructor_standings_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'constructor_standings_bronze'."""
    return [
        'constructorStandingsId', 'raceId', 'constructorId', 'points', 'position', 'positionText', 'wins'
    ]

def get_qualifying_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'qualifying_bronze'."""
    return [
        'qualifyId', 'raceId', 'driverId', 'constructorId', 'number', 'position', 'q1', 'q2', 'q3'
    ]

def get_lap_times_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'lap_times_bronze'."""
    return [
        'raceId', 'driverId', 'lap', 'position', 'time', 'milliseconds'
    ]

def get_pit_stops_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'pit_stops_bronze'."""
    return [
        'raceId', 'driverId', 'stop', 'lap', 'time', 'duration', 'milliseconds'
    ]

def get_seasons_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'seasons_bronze'."""
    return [
        'year'
    ]

def get_status_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'status_bronze'."""
    return [
        'statusId', 'status'
    ]

def get_sprint_results_columns():
    """Retorna a lista de colunas a MANTER para o DataFrame 'sprint_results_bronze'."""
    return [
        'resultId', 'raceId', 'driverId', 'constructorId', 'number', 'grid', 'position', 
        'positionText', 'positionOrder', 'points', 'laps', 'time', 'milliseconds', 
        'fastestLap', 'fastestLapTime', 'statusId' 
    ]

print("‚úÖ Fun√ß√µes de defini√ß√£o de colunas carregadas e atualizadas.")

‚úÖ Fun√ß√µes de defini√ß√£o de colunas carregadas e atualizadas.


___

### Mapeamento dos dataframes e fun√ß√µes

In [24]:
df_mapping = {
    'circuits_bronze': get_circuits_columns,
    'races_bronze': get_races_columns,
    'drivers_bronze': get_drivers_columns,
    'constructors_bronze': get_constructors_columns,
    'results_bronze': get_results_columns,
    'constructor_results_bronze': get_constructor_results_columns,
    'driver_standings_bronze': get_driver_standings_columns,
    'constructor_standings_bronze': get_constructor_standings_columns,
    'qualifying_bronze': get_qualifying_columns,
    'lap_times_bronze': get_lap_times_columns,
    'pit_stops_bronze': get_pit_stops_columns,
    'seasons_bronze': get_seasons_columns,
    'status_bronze': get_status_columns,
    'sprint_results_bronze': get_sprint_results_columns
}

print("‚úÖ Mapeamento realizado corretamente!")

‚úÖ Mapeamento realizado corretamente!


___

### Processamento e cria√ß√£o dos DataFrames silver

In [25]:

print("\nIniciando a limpeza inicial e cria√ß√£o dos DataFrames da Camada Silver:")

for bronze_df_name, column_func in df_mapping.items():
    silver_df_name = bronze_df_name.replace('bronze', 'silver')
    
    if bronze_df_name in globals():
        bronze_df = globals()[bronze_df_name]
        selected_cols = column_func()
        silver_df = bronze_df.select(*selected_cols)
        
        globals()[silver_df_name] = silver_df
        print(f"‚úÖ DataFrame '{silver_df_name}' criado com {len(selected_cols)} colunas selecionadas.")
    else:
        print(f"‚ùå DataFrame '{bronze_df_name}' n√£o encontrado. Pulando sele√ß√£o.")

print("\nPrimeiras 5 linhas e esquema do 'drivers_silver' para verifica√ß√£o:")
if 'drivers_silver' in globals():
    globals()['drivers_silver'].show(5, truncate=False)
    globals()['drivers_silver'].printSchema()
else:
    print("‚ùå drivers_silver n√£o encontrado.")


Iniciando a limpeza inicial e cria√ß√£o dos DataFrames da Camada Silver:
‚úÖ DataFrame 'circuits_silver' criado com 8 colunas selecionadas.
‚úÖ DataFrame 'races_silver' criado com 7 colunas selecionadas.
‚úÖ DataFrame 'drivers_silver' criado com 8 colunas selecionadas.
‚úÖ DataFrame 'constructors_silver' criado com 4 colunas selecionadas.
‚úÖ DataFrame 'results_silver' criado com 18 colunas selecionadas.
‚úÖ DataFrame 'constructor_results_silver' criado com 5 colunas selecionadas.
‚úÖ DataFrame 'driver_standings_silver' criado com 7 colunas selecionadas.
‚úÖ DataFrame 'constructor_standings_silver' criado com 7 colunas selecionadas.
‚úÖ DataFrame 'qualifying_silver' criado com 9 colunas selecionadas.
‚úÖ DataFrame 'lap_times_silver' criado com 6 colunas selecionadas.
‚úÖ DataFrame 'pit_stops_silver' criado com 7 colunas selecionadas.
‚úÖ DataFrame 'seasons_silver' criado com 1 colunas selecionadas.
‚úÖ DataFrame 'status_silver' criado com 2 colunas selecionadas.
‚úÖ DataFrame 'sprint_

___

### Padroniza√ß√£o e Formata√ß√£o de Dados

O foco principal √© uniformizar formatos e tipos de dados para evitar erros de join e garantir c√°lculos corretos, nos dataframes criados:

In [28]:
silver_dfs_names = [
    'circuits_silver', 'races_silver', 'drivers_silver', 'constructors_silver',
    'results_silver', 'constructor_results_silver', 'driver_standings_silver',
    'constructor_standings_silver', 'qualifying_silver', 'lap_times_silver',
    'pit_stops_silver', 'seasons_silver', 'status_silver', 'sprint_results_silver'
]

def to_snake_case(df):
    """Renomeia todas as colunas de um DataFrame para snake_case."""
    for column in df.columns:
        new_column = regexp_replace(col(column), r'([a-z])([A-Z])', r'$1_$2')
        new_column = lower(new_column)
        df = df.withColumnRenamed(column, new_column)
    return df

def trim_string_columns(df):
    """Aplica a fun√ß√£o trim em todas as colunas do tipo String."""
    for field in df.schema.fields:
        if field.dataType == StringType():
            df = df.withColumn(field.name, trim(col(field.name)))
    return df

print("‚úÖ Mapeamento realizado corretamente!")

‚úÖ Mapeamento realizado corretamente!


In [38]:
print("\nIniciando a PADRONIZA√á√ÉO DE FORMATOS e CONVERS√ÉO DE TIPOS:")

for df_name in silver_dfs_names:
    if df_name not in globals():
        print(f"‚ùå Erro: DataFrame '{df_name}' n√£o encontrado no ambiente global. Pulando.")
        continue

    df = globals()[df_name]
    df = rename_to_snake_case(df)
    df = trim_string_columns(df)

    for c in df.columns:
        if c.endswith('_id') or c.endswith('_year') or c.endswith('_round') or c.endswith('_lap') or c.endswith('_stop') or c.endswith('_number'):
            df = df.withColumn(c, col(c).cast(IntegerType()))

    if 'circuits' in df_name:
        df = df.withColumn("lat", col("lat").cast(DoubleType()))
        df = df.withColumn("lng", col("lng").cast(DoubleType()))
        df = df.withColumn("alt", col("alt").cast(IntegerType()))

    elif 'races' in df_name or 'drivers' in df_name:
        if 'date' in df.columns:
            df = df.withColumn("date", col("date").cast(DateType()))
        if 'dob' in df.columns:
            df = df.withColumn("dob", col("dob").cast(DateType()))
            
    elif 'results' in df_name or 'sprint_results' in df_name:
        df = df.withColumn("points", col("points").cast(DoubleType()))
        for c in ['milliseconds', 'grid', 'laps']:
            if c in df.columns:
                 df = df.withColumn(c, col(c).cast(IntegerType()))

    elif 'standings' in df_name:
        df = df.withColumn("points", col("points").cast(DoubleType()))
        df = df.withColumn("wins", col("wins").cast(IntegerType()))
        
    elif 'lap_times' in df_name or 'pit_stops' in df_name:
        df = df.withColumn("milliseconds", col("milliseconds").cast(IntegerType()))

    globals()[df_name] = df
    
    print(f"‚úÖ Padroniza√ß√£o de '{df_name}' conclu√≠da.")
    
print("\n‚úÖ Processo de Padroniza√ß√£o e Formata√ß√£o de Dados (Etapa 1 Silver) finalizado.")


Iniciando a PADRONIZA√á√ÉO DE FORMATOS e CONVERS√ÉO DE TIPOS:
‚úÖ Padroniza√ß√£o de 'circuits_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'races_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'drivers_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'constructors_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'results_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'constructor_results_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'driver_standings_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'constructor_standings_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'qualifying_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'lap_times_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'pit_stops_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'seasons_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'status_silver' conclu√≠da.
‚úÖ Padroniza√ß√£o de 'sprint_results_silver' conclu√≠da.

‚úÖ Processo de Padroniza√ß√£o e Formata√ß√£o de Dados (Etapa 1 Silver) finalizado.


In [40]:
print("--- ‚úÖ PROVA DE LIMPEZA E PADRONIZA√á√ÉO (DRIVERS_SILVER) ---")

print("\nEsquema do drivers_silver:")
drivers_silver.printSchema()

print("\nPrimeira linha para comprovar o 'snake_case' e o 'date_type':")
drivers_silver.select(
    "driver_id", 
    "driver_ref", 
    "nationality", 
    "dob" 
).limit(1).show(truncate=False)

print("\nExplica√ß√£o da Prova:")
print("1. Coluna 'driverId' (CamelCase) -> Agora 'driver_id' (snake_case).")
print("2. O 'driver_id' (Chave) est√° agora como tipo 'integer'.")
print("3. A 'dob' (Data de Nascimento), que era string no Bronze, √© agora do tipo 'date'.")

--- ‚úÖ PROVA DE LIMPEZA E PADRONIZA√á√ÉO (DRIVERS_SILVER) ---

Esquema do drivers_silver:
root
 |-- driver_id: integer (nullable = true)
 |-- driver_ref: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- nationality: string (nullable = true)


Primeira linha para comprovar o 'snake_case' e o 'date_type':
+---------+----------+-----------+----------+
|driver_id|driver_ref|nationality|dob       |
+---------+----------+-----------+----------+
|1        |hamilton  |British    |1985-01-07|
+---------+----------+-----------+----------+


Explica√ß√£o da Prova:
1. Coluna 'driverId' (CamelCase) -> Agora 'driver_id' (snake_case).
2. O 'driver_id' (Chave) est√° agora como tipo 'integer'.
3. A 'dob' (Data de Nascimento), que era string no Bronze, √© agora do tipo 'date'.


___

### Tratamento de Valores Ausentes (Nulos)

O conjunto de dados de F1 usa o valor \N (barra N mai√∫scula) para representar dados ausentes ou n√£o aplic√°veis, o que n√£o √© reconhecido nativamente pelo Spark como NULL.

In [41]:
# Valor de refer√™ncia para imputa√ß√£o num√©rica
ZERO_VALUE = lit(0).cast(DoubleType())

for df_name in silver_dfs_names:
    if df_name not in globals():
        print(f"‚ùå Erro: DataFrame '{df_name}' n√£o encontrado no ambiente global. Pulando.")
        continue

    df = globals()[df_name]
    
    # Embora a leitura do CSV j√° substitua '\N' por null, esta √© uma etapa de garantia.
    # Esta linha n√£o √© estritamente necess√°ria se a leitura foi perfeita,
    # mas garante que strings vazias ou '\N' sejam tratadas.
    df = df.replace('\\N', None) 
        
    # Geral: Imputa√ß√£o de Pontos e Vit√≥rias
    # A aus√™ncia de valor em pontos/vit√≥rias significa logicamente 0.
    for c in ['points', 'wins']:
        if c in df.columns and (df.schema[c].dataType == DoubleType() or df.schema[c].dataType == IntegerType()):
            df = df.na.fill({c: 0})
            
    # Espec√≠fico para Circuitos: Altitude
    # Altitude nula pode ser imputada como 0 (n√≠vel do mar) para permitir c√°lculos
    if 'circuits' in df_name and 'alt' in df.columns:
        df = df.na.fill({'alt': 0})
        
    # Espec√≠fico para Qualifica√ß√£o: Tempos (q1, q2, q3)
    # Tempo ausente √© tratado como 0 (embora muitas an√°lises exijam a exclus√£o ou valores negativos/altos)
    for c in ['q1', 'q2', 'q3']:
        if c in df.columns:
            # Substitu√≠mos por uma string vazia para manter o tipo string 
            df = df.na.fill({c: '0:00.000'}) 
            
    # **Espec√≠fico para Pit Stops / Lap Times:**
    # Imputa√ß√£o de milliseconds nulos com 0.
    if ('lap_times' in df_name or 'pit_stops' in df_name) and 'milliseconds' in df.columns:
        df = df.na.fill({'milliseconds': 0})

    # Atualiza o DataFrame no ambiente global
    globals()[df_name] = df
    
    # Exibe o status da limpeza
    print(f"‚úÖ Tratamento de nulos em '{df_name}' conclu√≠do.")


print("\n‚úÖ Processo de Tratamento de Valores Ausentes (Etapa 2 Silver) finalizado.")

‚úÖ Tratamento de nulos em 'circuits_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'races_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'drivers_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'constructors_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'results_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'constructor_results_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'driver_standings_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'constructor_standings_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'qualifying_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'lap_times_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'pit_stops_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'seasons_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'status_silver' conclu√≠do.
‚úÖ Tratamento de nulos em 'sprint_results_silver' conclu√≠do.

‚úÖ Processo de Tratamento de Valores Ausentes (Etapa 2 Silver) finalizado.


In [48]:
print("---  VERIFICA√á√ÉO DE TRATAMENTO DE NULOS (NULL -> 0) ---")


results_check_df = results_silver.filter(
    (col("points") == 0.0) & (col("milliseconds") == 0)
).limit(1)

print("\n[1] Linha de 'results_silver' com valores imputados (DNF/DSQ):")
results_check_df.select(
    "result_id", "race_id", "points", "position_order", "milliseconds", "status_id"
).show(truncate=False)

circuits_check_df = circuits_silver.filter(
    col("alt") == 0
).limit(1)

print("\n[2] Linha de 'circuits_silver' com Altitude imputada (0):")
circuits_check_df.select(
    "circuit_id", "name", "country", "alt", "lat"
).show(truncate=False)

print("\n--- ‚úÖ Valida√ß√£o Conclu√≠da ---")

---  VERIFICA√á√ÉO DE TRATAMENTO DE NULOS (NULL -> 0) ---

[1] Linha de 'results_silver' com valores imputados (DNF/DSQ):
+---------+-------+------+--------------+------------+---------+
|result_id|race_id|points|position_order|milliseconds|status_id|
+---------+-------+------+--------------+------------+---------+
+---------+-------+------+--------------+------------+---------+


[2] Linha de 'circuits_silver' com Altitude imputada (0):
+----------+----------------------------+-------+---+-------+
|circuit_id|name                        |country|alt|lat    |
+----------+----------------------------+-------+---+-------+
|35        |Korean International Circuit|Korea  |0  |34.7333|
+----------+----------------------------+-------+---+-------+


--- ‚úÖ Valida√ß√£o Conclu√≠da ---


___

###  Tratamento de Qualidade de Dados (Valida√ß√£o)
Este passo garante a integridade e a validade l√≥gica dos dados.

In [44]:
PK_MAP = {
    'circuits_silver': ['circuit_id'],
    'races_silver': ['race_id'],
    'drivers_silver': ['driver_id'],
    'constructors_silver': ['constructor_id'],
    'status_silver': ['status_id'],
    'seasons_silver': ['year'],
    'results_silver': ['result_id'],
    'sprint_results_silver': ['result_id'],
    'constructor_results_silver': ['constructor_results_id'],
    'driver_standings_silver': ['driver_standings_id'],
    'constructor_standings_silver': ['constructor_standings_id'],
    'qualifying_silver': ['qualify_id'],
    'lap_times_silver': ['race_id', 'driver_id', 'lap'],
    'pit_stops_silver': ['race_id', 'driver_id', 'stop'],
}

print("‚úÖ Mapeamento realizado corretamente!")

‚úÖ Mapeamento realizado corretamente!


In [47]:
for df_name, pk_cols in PK_MAP.items():
    if df_name not in globals():
        print(f"‚ùå Erro: DataFrame '{df_name}' n√£o encontrado. Pulando.")
        continue

    df = globals()[df_name]
    initial_count = df.count()
    
    df = df.dropDuplicates(pk_cols)
    duplicates_removed = initial_count - df.count()
    
    df = df.na.drop(subset=[pk_cols[0]]) 
        
    if df_name == 'results_silver' or df_name == 'sprint_results_silver':
        df = df.withColumn("position_order", col("position_order").cast(IntegerType()))
        df = df.na.drop(subset=['race_id', 'driver_id', 'constructor_id', 'status_id'])

    elif 'circuits' in df_name:
        df = df.filter((col("lat") >= -90) & (col("lat") <= 90))
        df = df.filter((col("lng") >= -180) & (col("lng") <= 180))
        df = df.na.drop(subset=['name'])

    elif 'drivers' in df_name:
        df = df.na.drop(subset=['forename', 'surname'])
        
    elif 'standings' in df_name:
        df = df.filter(col("points") >= 0)
        df = df.filter(col("wins") >= 0)
        
    elif 'lap_times' in df_name or 'pit_stops' in df_name:
        df = df.filter(col("milliseconds") > 0)
        
    final_count = df.count()
    rows_filtered = initial_count - final_count
    
    globals()[df_name] = df
    
    print(f"‚úÖ Qualidade de '{df_name}' finalizada:")
    print(f"   - Duplicatas removidas (PK: {', '.join(pk_cols)}): {duplicates_removed}")
    print(f"   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: {rows_filtered - duplicates_removed}")
    print(f"   - Contagem Final: {final_count}")

print("\n‚úÖ Processo de Tratamento de Qualidade de Dados (Etapa 3 Silver) finalizado, corrigindo o erro de escopo.")

‚úÖ Qualidade de 'circuits_silver' finalizada:
   - Duplicatas removidas (PK: circuit_id): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem Final: 77
‚úÖ Qualidade de 'races_silver' finalizada:
   - Duplicatas removidas (PK: race_id): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem Final: 1125
‚úÖ Qualidade de 'drivers_silver' finalizada:
   - Duplicatas removidas (PK: driver_id): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem Final: 861
‚úÖ Qualidade de 'constructors_silver' finalizada:
   - Duplicatas removidas (PK: constructor_id): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem Final: 212
‚úÖ Qualidade de 'status_silver' finalizada:
   - Duplicatas removidas (PK: status_id): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem Final: 139
‚úÖ Qualidade de 'seasons_silver' finalizada:
   - Duplicatas removidas (PK: year): 0
   - Linhas inv√°lidas/Nulos cr√≠ticos removidos: 0
   - Contagem F

In [49]:
INTEGRITY_CHECK_COLS = {
    'circuits_silver': ['circuit_id', 'name'],
    'races_silver': ['race_id', 'year', 'circuit_id'],
    'drivers_silver': ['driver_id', 'forename', 'surname'],
    'constructors_silver': ['constructor_id', 'name'],
    'status_silver': ['status_id', 'status'],
    'results_silver': ['result_id', 'race_id', 'driver_id', 'constructor_id', 'status_id', 'points'],
    'driver_standings_silver': ['driver_standings_id', 'race_id', 'driver_id', 'points'],
    'constructor_results_silver': ['constructor_results_id', 'race_id', 'constructor_id'],
    'constructor_standings_silver': ['constructor_standings_id', 'race_id', 'constructor_id'],
    'qualifying_silver': ['qualify_id', 'race_id', 'driver_id'],
    'lap_times_silver': ['race_id', 'driver_id', 'lap', 'milliseconds'],
    'pit_stops_silver': ['race_id', 'driver_id', 'stop', 'milliseconds'],
    'seasons_silver': ['year'],
    'sprint_results_silver': ['result_id', 'race_id', 'driver_id', 'points'],
}

print("--- üìä RELAT√ìRIO FINAL DE INTEGRIDADE DA CAMADA SILVER ---")
print("Contagem de valores N√ÉO-NULOS em Chaves Prim√°rias (PK) e Chaves Estrangeiras (FK).")
print("Se o valor 'N√£o-Nulos' for igual ao 'Contagem Total', a coluna est√° √≠ntegra para o join.")
print("="*85)


for df_name in silver_dfs_names:
    if df_name not in globals():
        print(f"‚ùå ERRO: DataFrame '{df_name}' n√£o encontrado.")
        continue
    
    df = globals()[df_name]
    total_count = df.count()
    
    print(f"\n[DF: {df_name.upper()}] - Contagem Total: {total_count}")
    agg_expressions = []
    
    for col_name in INTEGRITY_CHECK_COLS.get(df_name, []):
        # Cria uma express√£o para contar onde a coluna N√ÉO √© nula
        agg_expressions.append(count(when(col(col_name).isNotNull(), 1)).alias(f"N√£o-Nulos ({col_name})"))
        
    if agg_expressions:
        integrity_check = df.agg(*agg_expressions).collect()[0].asDict()
        
        print("-" * 50)
        for k, v in integrity_check.items():
            status = "‚úÖ OK" if v == total_count else "‚ö†Ô∏è FALHA"
            print(f"| {k:<30}: {v:<15} ({status})")
        print("-" * 50)
    else:
        print("Nenhuma coluna cr√≠tica definida para verifica√ß√£o.")

print("\n--- ‚úÖ VERIFICA√á√ÉO FINAL CONCLU√çDA ---")

--- üìä RELAT√ìRIO FINAL DE INTEGRIDADE DA CAMADA SILVER ---
Contagem de valores N√ÉO-NULOS em Chaves Prim√°rias (PK) e Chaves Estrangeiras (FK).
Se o valor 'N√£o-Nulos' for igual ao 'Contagem Total', a coluna est√° √≠ntegra para o join.

[DF: CIRCUITS_SILVER] - Contagem Total: 77
--------------------------------------------------
| N√£o-Nulos (circuit_id)        : 77              (‚úÖ OK)
| N√£o-Nulos (name)              : 77              (‚úÖ OK)
--------------------------------------------------

[DF: RACES_SILVER] - Contagem Total: 1125
--------------------------------------------------
| N√£o-Nulos (race_id)           : 1125            (‚úÖ OK)
| N√£o-Nulos (year)              : 1125            (‚úÖ OK)
| N√£o-Nulos (circuit_id)        : 1125            (‚úÖ OK)
--------------------------------------------------

[DF: DRIVERS_SILVER] - Contagem Total: 861
--------------------------------------------------
| N√£o-Nulos (driver_id)         : 861             (‚úÖ OK)
| N√£o-Nulos (f

___

### Popular os dados no Banco de dados - Schema Silver

Os pr√≥ximos trechos s√£o referentes √† popular o banco de dados

In [53]:
spark = SparkSession.builder \
    .appName("WriteSilverToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

url = "jdbc:postgresql://postgres:5432/f1_dw"   # host=postgres, db=f1_dw

properties = {
    "user": "user_f1",
    "password": "password_f1",
    "driver": "org.postgresql.Driver"
}

tabelas_silver = {
    "circuits_silver": circuits_silver,
    "races_silver": races_silver,
    "drivers_silver": drivers_silver,
    "constructors_silver": constructors_silver,
    "results_silver": results_silver,
    "constructor_results_silver": constructor_results_silver,
    "driver_standings_silver": driver_standings_silver,
    "constructor_standings_silver": constructor_standings_silver,
    "qualifying_silver": qualifying_silver,
    "lap_times_silver": lap_times_silver,
    "pit_stops_silver": pit_stops_silver,
    "seasons_silver": seasons_silver,
    "status_silver": status_silver,
    "sprint_results_silver": sprint_results_silver
}

25/11/25 00:50:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [55]:

engine = create_engine(
    "postgresql://user_f1:password_f1@localhost:5432/f1_dw"
)

for tabela, df in tabelas_silver.items():
    print(f"‚û°Ô∏è Convertendo {tabela} para pandas...")

    pdf = df.toPandas()

    print(f"‚û°Ô∏è Gravando {tabela} no PostgreSQL...")

    pdf.to_sql(
        tabela,
        engine,
        schema="silver",
        if_exists="append",  # ou "replace"
        index=False
    )

    print(f"‚úîÔ∏è {tabela} salva com {len(pdf)} linhas.")


‚û°Ô∏è Convertendo circuits_silver para pandas...
‚û°Ô∏è Gravando circuits_silver no PostgreSQL...
‚úîÔ∏è circuits_silver salva com 77 linhas.
‚û°Ô∏è Convertendo races_silver para pandas...
‚û°Ô∏è Gravando races_silver no PostgreSQL...
‚úîÔ∏è races_silver salva com 1125 linhas.
‚û°Ô∏è Convertendo drivers_silver para pandas...
‚û°Ô∏è Gravando drivers_silver no PostgreSQL...
‚úîÔ∏è drivers_silver salva com 861 linhas.
‚û°Ô∏è Convertendo constructors_silver para pandas...
‚û°Ô∏è Gravando constructors_silver no PostgreSQL...
‚úîÔ∏è constructors_silver salva com 212 linhas.
‚û°Ô∏è Convertendo results_silver para pandas...


25/11/25 00:56:48 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


‚û°Ô∏è Gravando results_silver no PostgreSQL...
‚úîÔ∏è results_silver salva com 26759 linhas.
‚û°Ô∏è Convertendo constructor_results_silver para pandas...
‚û°Ô∏è Gravando constructor_results_silver no PostgreSQL...
‚úîÔ∏è constructor_results_silver salva com 12625 linhas.
‚û°Ô∏è Convertendo driver_standings_silver para pandas...
‚û°Ô∏è Gravando driver_standings_silver no PostgreSQL...
‚úîÔ∏è driver_standings_silver salva com 34863 linhas.
‚û°Ô∏è Convertendo constructor_standings_silver para pandas...
‚û°Ô∏è Gravando constructor_standings_silver no PostgreSQL...
‚úîÔ∏è constructor_standings_silver salva com 13391 linhas.
‚û°Ô∏è Convertendo qualifying_silver para pandas...
‚û°Ô∏è Gravando qualifying_silver no PostgreSQL...
‚úîÔ∏è qualifying_silver salva com 10494 linhas.
‚û°Ô∏è Convertendo lap_times_silver para pandas...


                                                                                

‚û°Ô∏è Gravando lap_times_silver no PostgreSQL...
‚úîÔ∏è lap_times_silver salva com 589081 linhas.
‚û°Ô∏è Convertendo pit_stops_silver para pandas...
‚û°Ô∏è Gravando pit_stops_silver no PostgreSQL...
‚úîÔ∏è pit_stops_silver salva com 11371 linhas.
‚û°Ô∏è Convertendo seasons_silver para pandas...
‚û°Ô∏è Gravando seasons_silver no PostgreSQL...
‚úîÔ∏è seasons_silver salva com 75 linhas.
‚û°Ô∏è Convertendo status_silver para pandas...
‚û°Ô∏è Gravando status_silver no PostgreSQL...
‚úîÔ∏è status_silver salva com 139 linhas.
‚û°Ô∏è Convertendo sprint_results_silver para pandas...
‚û°Ô∏è Gravando sprint_results_silver no PostgreSQL...
‚úîÔ∏è sprint_results_silver salva com 360 linhas.
