# Job ETL: Raw -> Silver

Notebook responsavel por extrair dados da camada raw, higienizar e entregar na camada silver. Execute as celulas na ordem apresentada; ajuste apenas os parametros identificados.


## Fluxo geral

1. Configura Python, Spark e os caminhos do repositorio.
2. Define esquemas e metadados (primary keys, estrategia de escrita) para cada tabela raw.
3. Aplica funcoes utilitarias padronizadas: trim de strings, conversao de vazios para null e deduplicacao.
4. Persiste os datasets finais em parquet (`Data Layer/silver/dados_limpos/parquet/<tabela>`) e gera CSVs unicos `*_clean.csv` para compatibilidade.


In [10]:
from pathlib import Path

try:
    import pandas as pd
except ModuleNotFoundError as exc:
    raise ModuleNotFoundError(
        "Pandas nao encontrado. Instale com `pip install pandas` no mesmo ambiente do notebook."
    ) from exc

try:
    from pyspark.sql import functions as F, types as T
except ModuleNotFoundError as exc:
    raise ModuleNotFoundError(
        "PySpark nao encontrado. Instale com `pip install pyspark`."
    ) from exc

try:
    from spark_bootstrap import get_spark
except ModuleNotFoundError as exc:
    raise ModuleNotFoundError(
        "Nao foi possivel importar `spark_bootstrap`. Execute este notebook dentro da pasta `Notebooks/` ou ajuste `sys.path`."
    ) from exc


In [11]:
def find_repo_root(marker: str = "Data Layer", max_levels: int = 6) -> Path:
    current = Path.cwd().resolve()
    for _ in range(max_levels):
        if (current / marker).exists():
            return current
        current = current.parent
    raise FileNotFoundError(
        "Nao foi possivel localizar o diretorio raiz do repositorio a partir de `Data Layer`."
    )

REPO_ROOT = find_repo_root()
RAW_DIR = REPO_ROOT / "Data Layer" / "raw" / "dados_originais"
SILVER_DIR = REPO_ROOT / "Data Layer" / "silver" / "dados_limpos"
SILVER_PARQUET_DIR = SILVER_DIR / "parquet"
SILVER_PARQUET_DIR.mkdir(parents=True, exist_ok=True)

if not RAW_DIR.exists():
    raise FileNotFoundError(f"Diretorio raw nao encontrado: {RAW_DIR}")

spark = get_spark(app_name="F1 Raw to Silver ETL", base=REPO_ROOT / "Notebooks")
spark.conf.set("spark.sql.session.timeZone", "UTC")

print(f"Repositorio raiz: {REPO_ROOT}")
print(f"Raw: {RAW_DIR}")
print(f"Silver (parquet): {SILVER_PARQUET_DIR}")


Repositorio raiz: /Users/kalebmacedo/formula1-analytics-2
Raw: /Users/kalebmacedo/formula1-analytics-2/Data Layer/raw/dados_originais
Silver (parquet): /Users/kalebmacedo/formula1-analytics-2/Data Layer/silver/dados_limpos/parquet


25/10/06 11:04:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Definicao dos esquemas

O dicionario abaixo concentra os metadados de cada fonte raw. Ajuste aqui sempre que uma coluna for criada/alterada ou quando precisar mudar a configuracao de escrita.


In [12]:
TABLE_CONFIG = {
    "circuits": {
        "source": RAW_DIR / "circuits.csv",
        "destination": SILVER_PARQUET_DIR / "circuits",
        "schema": T.StructType([
            T.StructField("circuitId", T.IntegerType(), True),
            T.StructField("circuitRef", T.StringType(), True),
            T.StructField("name", T.StringType(), True),
            T.StructField("location", T.StringType(), True),
            T.StructField("country", T.StringType(), True),
            T.StructField("lat", T.DoubleType(), True),
            T.StructField("lng", T.DoubleType(), True),
            T.StructField("alt", T.IntegerType(), True),
            T.StructField("url", T.StringType(), True),
        ]),
        "primary_keys": ["circuitId"],
        "coalesce": 1,
    },
    "constructor_results": {
        "source": RAW_DIR / "constructor_results.csv",
        "destination": SILVER_PARQUET_DIR / "constructor_results",
        "schema": T.StructType([
            T.StructField("constructorResultsId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("points", T.DoubleType(), True),
            T.StructField("status", T.StringType(), True),
        ]),
        "primary_keys": ["constructorResultsId"],
        "coalesce": 1,
    },
    "constructor_standings": {
        "source": RAW_DIR / "constructor_standings.csv",
        "destination": SILVER_PARQUET_DIR / "constructor_standings",
        "schema": T.StructType([
            T.StructField("constructorStandingsId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("points", T.DoubleType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("positionText", T.StringType(), True),
            T.StructField("wins", T.IntegerType(), True),
        ]),
        "primary_keys": ["constructorStandingsId"],
        "coalesce": 1,
    },
    "constructors": {
        "source": RAW_DIR / "constructors.csv",
        "destination": SILVER_PARQUET_DIR / "constructors",
        "schema": T.StructType([
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("constructorRef", T.StringType(), True),
            T.StructField("name", T.StringType(), True),
            T.StructField("nationality", T.StringType(), True),
            T.StructField("url", T.StringType(), True),
        ]),
        "primary_keys": ["constructorId"],
        "coalesce": 1,
    },
    "driver_standings": {
        "source": RAW_DIR / "driver_standings.csv",
        "destination": SILVER_PARQUET_DIR / "driver_standings",
        "schema": T.StructType([
            T.StructField("driverStandingsId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("points", T.DoubleType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("positionText", T.StringType(), True),
            T.StructField("wins", T.IntegerType(), True),
        ]),
        "primary_keys": ["driverStandingsId"],
        "coalesce": 1,
    },
    "drivers": {
        "source": RAW_DIR / "drivers.csv",
        "destination": SILVER_PARQUET_DIR / "drivers",
        "schema": T.StructType([
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("driverRef", T.StringType(), True),
            T.StructField("number", T.IntegerType(), True),
            T.StructField("code", T.StringType(), True),
            T.StructField("forename", T.StringType(), True),
            T.StructField("surname", T.StringType(), True),
            T.StructField("dob", T.DateType(), True),
            T.StructField("nationality", T.StringType(), True),
            T.StructField("url", T.StringType(), True),
        ]),
        "primary_keys": ["driverId"],
        "coalesce": 1,
    },
    "lap_times": {
        "source": RAW_DIR / "lap_times.csv",
        "destination": SILVER_PARQUET_DIR / "lap_times",
        "schema": T.StructType([
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("lap", T.IntegerType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("time", T.StringType(), True),
            T.StructField("milliseconds", T.IntegerType(), True),
        ]),
        "primary_keys": ["raceId", "driverId", "lap"],
        "coalesce": 1,
    },
    "pit_stops": {
        "source": RAW_DIR / "pit_stops.csv",
        "destination": SILVER_PARQUET_DIR / "pit_stops",
        "schema": T.StructType([
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("stop", T.IntegerType(), True),
            T.StructField("lap", T.IntegerType(), True),
            T.StructField("time", T.StringType(), True),
            T.StructField("duration", T.DoubleType(), True),
            T.StructField("milliseconds", T.IntegerType(), True),
        ]),
        "primary_keys": ["raceId", "driverId", "stop"],
        "coalesce": 1,
    },
    "qualifying": {
        "source": RAW_DIR / "qualifying.csv",
        "destination": SILVER_PARQUET_DIR / "qualifying",
        "schema": T.StructType([
            T.StructField("qualifyId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("number", T.IntegerType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("q1", T.StringType(), True),
            T.StructField("q2", T.StringType(), True),
            T.StructField("q3", T.StringType(), True),
        ]),
        "primary_keys": ["qualifyId"],
        "coalesce": 1,
    },
    "races": {
        "source": RAW_DIR / "races.csv",
        "destination": SILVER_PARQUET_DIR / "races",
        "schema": T.StructType([
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("year", T.IntegerType(), True),
            T.StructField("round", T.IntegerType(), True),
            T.StructField("circuitId", T.IntegerType(), True),
            T.StructField("name", T.StringType(), True),
            T.StructField("date", T.DateType(), True),
            T.StructField("time", T.StringType(), True),
            T.StructField("url", T.StringType(), True),
            T.StructField("fp1_date", T.DateType(), True),
            T.StructField("fp1_time", T.StringType(), True),
            T.StructField("fp2_date", T.DateType(), True),
            T.StructField("fp2_time", T.StringType(), True),
            T.StructField("fp3_date", T.DateType(), True),
            T.StructField("fp3_time", T.StringType(), True),
            T.StructField("quali_date", T.DateType(), True),
            T.StructField("quali_time", T.StringType(), True),
            T.StructField("sprint_date", T.DateType(), True),
            T.StructField("sprint_time", T.StringType(), True),
        ]),
        "primary_keys": ["raceId"],
        "coalesce": 1,
    },
    "results": {
        "source": RAW_DIR / "results.csv",
        "destination": SILVER_PARQUET_DIR / "results",
        "schema": T.StructType([
            T.StructField("resultId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("number", T.IntegerType(), True),
            T.StructField("grid", T.IntegerType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("positionText", T.StringType(), True),
            T.StructField("positionOrder", T.IntegerType(), True),
            T.StructField("points", T.DoubleType(), True),
            T.StructField("laps", T.IntegerType(), True),
            T.StructField("time", T.StringType(), True),
            T.StructField("milliseconds", T.IntegerType(), True),
            T.StructField("fastestLap", T.IntegerType(), True),
            T.StructField("rank", T.IntegerType(), True),
            T.StructField("fastestLapTime", T.StringType(), True),
            T.StructField("fastestLapSpeed", T.DoubleType(), True),
            T.StructField("statusId", T.IntegerType(), True),
        ]),
        "primary_keys": ["resultId"],
        "coalesce": 1,
    },
    "seasons": {
        "source": RAW_DIR / "seasons.csv",
        "destination": SILVER_PARQUET_DIR / "seasons",
        "schema": T.StructType([
            T.StructField("year", T.IntegerType(), True),
            T.StructField("url", T.StringType(), True),
        ]),
        "primary_keys": ["year"],
        "coalesce": 1,
    },
    "sprint_results": {
        "source": RAW_DIR / "sprint_results.csv",
        "destination": SILVER_PARQUET_DIR / "sprint_results",
        "schema": T.StructType([
            T.StructField("resultId", T.IntegerType(), True),
            T.StructField("raceId", T.IntegerType(), True),
            T.StructField("driverId", T.IntegerType(), True),
            T.StructField("constructorId", T.IntegerType(), True),
            T.StructField("number", T.IntegerType(), True),
            T.StructField("grid", T.IntegerType(), True),
            T.StructField("position", T.IntegerType(), True),
            T.StructField("positionText", T.StringType(), True),
            T.StructField("positionOrder", T.IntegerType(), True),
            T.StructField("points", T.DoubleType(), True),
            T.StructField("laps", T.IntegerType(), True),
            T.StructField("time", T.StringType(), True),
            T.StructField("milliseconds", T.IntegerType(), True),
            T.StructField("fastestLap", T.IntegerType(), True),
            T.StructField("fastestLapTime", T.StringType(), True),
            T.StructField("statusId", T.IntegerType(), True),
        ]),
        "primary_keys": ["resultId"],
        "coalesce": 1,
    },
    "status": {
        "source": RAW_DIR / "status.csv",
        "destination": SILVER_PARQUET_DIR / "status",
        "schema": T.StructType([
            T.StructField("statusId", T.IntegerType(), True),
            T.StructField("status", T.StringType(), True),
        ]),
        "primary_keys": ["statusId"],
        "coalesce": 1,
    },
}


In [13]:
from typing import Dict, List
NULL_TOKEN = f"{chr(92)}N"

def _trim_and_clean_strings(df):
    string_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, T.StringType)]
    for col_name in string_cols:
        df = df.withColumn(col_name, F.trim(F.col(col_name)))
        df = df.withColumn(col_name, F.when(F.col(col_name) == "", None).otherwise(F.col(col_name)))
    return df

def _read_raw(table_name: str):
    cfg = TABLE_CONFIG[table_name]
    path = cfg["source"]
    if not path.exists():
        raise FileNotFoundError(f"Arquivo raw nao encontrado: {path}")
    df = (
        spark.read.format("csv")
        .option("header", True)
        .option("encoding", "UTF-8")
        .option("nullValue", NULL_TOKEN)
        .option("mode", "PERMISSIVE")
        .schema(cfg["schema"])
        .load(str(path))
    )
    return df

def run_table(table_name: str, write_parquet: bool = True, write_csv: bool = True) -> Dict[str, str]:
    cfg = TABLE_CONFIG[table_name]

    df_raw = _read_raw(table_name)
    df_raw = df_raw.cache()
    raw_count = df_raw.count()

    df_clean = _trim_and_clean_strings(df_raw)
    df_clean = df_clean.select(*cfg["schema"].fieldNames())
    df_clean = df_clean.cache()
    before_dedup = df_clean.count()

    pk = cfg.get("primary_keys")
    if pk:
        df_dedup = df_clean.dropDuplicates(pk)
    else:
        df_dedup = df_clean.dropDuplicates()
    df_dedup = df_dedup.cache()
    after_dedup = df_dedup.count()
    df_clean.unpersist()
    df_raw.unpersist()

    order_cols = cfg.get("order_by") or pk
    if order_cols:
        df_dedup = df_dedup.orderBy(*order_cols)

    summary = {
        "table": table_name,
        "raw_rows": raw_count,
        "rows": after_dedup,
        "duplicates_removed": raw_count - after_dedup,
    }

    if write_parquet:
        destination = cfg["destination"]
        destination.parent.mkdir(parents=True, exist_ok=True)
        df_dedup.coalesce(cfg.get("coalesce", 1)).write.mode("overwrite").format("parquet").option(
            "compression", "snappy"
        ).save(str(destination))
        summary["parquet_path"] = str(destination)

    if write_csv:
        csv_path = SILVER_DIR / f"{table_name}_clean.csv"
        pdf = df_dedup.toPandas()
        pdf.to_csv(csv_path, index=False)
        summary["csv_path"] = str(csv_path)

    df_dedup.unpersist()
    return summary


## Execucao do lote

Ajuste a lista `TABLES_TO_PROCESS` se quiser processar apenas um subconjunto. O resumo final mostra quantidade de linhas lida, removida por deduplicacao e caminhos de escrita.


In [14]:
TABLES_TO_PROCESS: List[str] = list(TABLE_CONFIG.keys())

run_summary = []
for table in TABLES_TO_PROCESS:
    stats = run_table(table_name=table, write_parquet=True, write_csv=True)
    run_summary.append(stats)
    print(f"Tabela {table}: {stats['rows']} linhas escritas")

summary_df = pd.DataFrame(run_summary)
summary_df


Tabela circuits: 77 linhas escritas
Tabela constructor_results: 12625 linhas escritas
Tabela constructor_standings: 13391 linhas escritas
Tabela constructors: 212 linhas escritas
Tabela driver_standings: 34863 linhas escritas
Tabela drivers: 861 linhas escritas
Tabela lap_times: 589081 linhas escritas
Tabela pit_stops: 11371 linhas escritas
Tabela qualifying: 10494 linhas escritas


25/10/06 11:04:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Tabela races: 1125 linhas escritas
Tabela results: 26759 linhas escritas
Tabela seasons: 75 linhas escritas
Tabela sprint_results: 360 linhas escritas
Tabela status: 139 linhas escritas


Unnamed: 0,table,raw_rows,rows,duplicates_removed,parquet_path,csv_path
0,circuits,77,77,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
1,constructor_results,12625,12625,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
2,constructor_standings,13391,13391,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
3,constructors,212,212,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
4,driver_standings,34863,34863,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
5,drivers,861,861,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
6,lap_times,589081,589081,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
7,pit_stops,11371,11371,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
8,qualifying,10494,10494,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...
9,races,1125,1125,0,/Users/kalebmacedo/formula1-analytics-2/Data L...,/Users/kalebmacedo/formula1-analytics-2/Data L...


## Opcoes adicionais

- Para reprocessar uma tabela especifica basta executar `run_table("nome_tabela")`.
- Execute `spark.stop()` ao final se nao for usar mais a sessao.


In [15]:
# spark.stop()
