**Verificação Caminho**

In [0]:
dbutils.fs.ls('/Workspace/csv/')

Nesta etapa, foram carregados os arquivos CSV originais e gravados em tabelas Delta sem qualquer modificação no conteúdo original (somente metadados adicionados). Cada tabela Bronze contém:

- Dados brutos, como estão no CSV
- Data de ingestão (ingestao_dt)
- Fonte de origem (source)
- Colunas com nomes padronizados em snake_case
- Armazenamento em formato Delta Lake
    - Melhor performance
    - Suporte a versionamento
    - Leitura e escrita otimizadas

Essas tabelas Bronze servem como única fonte de verdade para a transformação Silver.

In [0]:
from pyspark.sql import functions as F

# ======================================================================
# 1. Definir caminhos dos CSVs dentro do Workspace
# ======================================================================

path_full = '/Workspace/csv/campeonato-brasileiro-full.csv'
path_cartoes = '/Workspace/csv/campeonato-brasileiro-cartoes.csv'
path_gols = '/Workspace/csv/campeonato-brasileiro-gols.csv'
path_estatisticas = '/Workspace/csv/campeonato-brasileiro-estatisticas-full.csv'


# ======================================================================
# 2. Função utilitária para criar a tabela Bronze corretamente
# ======================================================================

def create_bronze_table(df, table_name, source_path):
    """
    - Adiciona colunas de metadados (ingestao_dt e source)
    - Padroniza nome das colunas
    - Salva como tabela Delta (bronze)
    """
    
    # 2.1 Data de ingestão
    df_bronze = df.withColumn("ingestao_dt", F.current_timestamp())
    
    # 2.2 Nome da origem (para documentação/linhagem)
    df_bronze = df_bronze.withColumn("source", F.lit(source_path))
    
    # 2.3 Padronizar nome das colunas para snake_case
    df_bronze = df_bronze.toDF(*[
        c.lower()
         .replace(" ", "_")
         .replace("-", "_")
         .replace("ç", "c")
         .replace("ã", "a")
         .replace("á", "a")
        for c in df_bronze.columns
    ])
    
    # 2.4 Salvar como Delta (tabela Bronze)
    df_bronze.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(table_name)
    
    print(f"Tabela criada: {table_name}")


# ======================================================================
# 3. Ler arquivos CSV brutos
# ======================================================================

df_full = spark.read.csv(path_full, header=True, inferSchema=True)
df_cartoes = spark.read.csv(path_cartoes, header=True, inferSchema=True)
df_gols = spark.read.csv(path_gols, header=True, inferSchema=True)
df_estatisticas = spark.read.csv(path_estatisticas, header=True, inferSchema=True)


# ======================================================================
# 4. Criar tabelas Bronze
# ======================================================================

create_bronze_table(df_full, "bronze_full", path_full)
create_bronze_table(df_cartoes, "bronze_cartoes", path_cartoes)
create_bronze_table(df_gols, "bronze_gols", path_gols)
create_bronze_table(df_estatisticas, "bronze_estatisticas", path_estatisticas)


# Mostrar as primeiras linhas de cada DataFrame
df_full.show(5)
df_cartoes.show(5)
df_gols.show(5)
df_estatisticas.show(5)

A Camada Silver entrega dados limpos, padronizados e tipados corretamente, permitindo integração confiável entre tabelas e servindo como base sólida para a construção das tabelas Fato e Dimensão na Camada Gold.

In [0]:
from pyspark.sql import functions as F

# ======================================================================
# ETAPA 1 — Ler as tabelas BRONZE
# ======================================================================

bronze_full         = spark.table("bronze_full")
bronze_cartoes      = spark.table("bronze_cartoes")
bronze_gols         = spark.table("bronze_gols")
bronze_estatisticas = spark.table("bronze_estatisticas")


# ======================================================================
# ETAPA 2 — SILVER FULL (PARTIDAS)
# ======================================================================
# Ajustes:
# - try_to_timestamp para evitar erros
# - padronização de nomes
# - conversão de placares e chaves
# - tratamento do campo vencedor

silver_full = (
    bronze_full
        .withColumn("data_dt", F.to_date("data", "dd/MM/yyyy"))

        .withColumn(
            "data_hora_ts",
            F.expr("try_to_timestamp(concat(data, ' ', hora), 'dd/MM/yyyy HH:mm')")
        )

        .withColumn("partida_id", F.col("id").cast("int"))
        .withColumn("mandante_placar", F.col("mandante_placar").cast("int"))
        .withColumn("visitante_placar", F.col("visitante_placar").cast("int"))

        .withColumn(
            "mandante",
            F.initcap(F.trim(F.regexp_replace("mandante", "[\\u00A0\\u2007\\u202F]", "")))
        )
        .withColumn(
            "visitante",
            F.initcap(F.trim(F.regexp_replace("visitante", "[\\u00A0\\u2007\\u202F]", "")))
        )
        .withColumn(
            "arena",
            F.initcap(F.trim(F.regexp_replace("arena", "[\\u00A0\\u2007\\u202F]", "")))
        )

        .withColumn(
            "vencedor",
            F.when(F.col("vencedor") == "-", "Empate").otherwise(F.col("vencedor"))
        )
)

silver_full.write.format("delta").mode("overwrite").saveAsTable("silver_full")
print("Tabela criada: silver_full")



# ======================================================================
# ETAPA 3 — SILVER GOLS
# ======================================================================
# Ajustes:
# - padronizar clubes
# - converter chaves
# - criar minuto_int para análises

silver_gols = (
    bronze_gols
        .withColumn(
            "clube",
            F.initcap(F.trim(F.regexp_replace("clube", "[\\u00A0\\u2007\\u202F]", "")))
        )
        .withColumn("partida_id", F.col("partida_id").cast("int"))
        .withColumn("rodata", F.col("rodata").cast("int"))

        .withColumn(
            "minuto_int",
            F.when(
                F.col("minuto").contains("+"),
                F.split("minuto", "\\+").getItem(0).cast("int") +
                F.split("minuto", "\\+").getItem(1).cast("int")
            ).otherwise(F.col("minuto").cast("int"))
        )
)

silver_gols.write.format("delta").mode("overwrite").saveAsTable("silver_gols")
print("Tabela criada: silver_gols")



# ======================================================================
# ETAPA 4 — SILVER CARTÕES
# ======================================================================
# Ajustes:
# - normalização do clube
# - conversão numérica de camisa e minuto
# - padronização de tipos

silver_cartoes = (
    bronze_cartoes
        .withColumn(
            "clube",
            F.initcap(F.trim(F.regexp_replace("clube", "[\\u00A0\\u2007\\u202F]", "")))
        )
        .withColumn("partida_id", F.col("partida_id").cast("int"))
        .withColumn("rodata", F.col("rodata").cast("int"))
        .withColumn("num_camisa", F.col("num_camisa").cast("int"))

        .withColumn(
            "minuto_int",
            F.when(
                F.col("minuto").contains("+"),
                F.split("minuto", "\\+").getItem(0).cast("int") +
                F.split("minuto", "\\+").getItem(1).cast("int")
            ).otherwise(F.col("minuto").cast("int"))
        )
)

silver_cartoes.write.format("delta").mode("overwrite").saveAsTable("silver_cartoes")
print("Tabela criada: silver_cartoes")



# ======================================================================
# ETAPA 5 — SILVER ESTATÍSTICAS
# ======================================================================
# Ajustes:
# - normalização de clube
# - conversão das métricas numéricas
# - conversão das porcentagens para números
# - tratamento de strings "None"

silver_estatisticas = (
    bronze_estatisticas
        .withColumn(
            "clube",
            F.initcap(F.trim(F.regexp_replace("clube", "[\\u00A0\\u2007\\u202F]", "")))
        )

        .withColumn("partida_id", F.col("partida_id").cast("int"))
        .withColumn("rodata", F.col("rodata").cast("int"))

        .withColumn("chutes", F.col("chutes").cast("int"))
        .withColumn("chutes_no_alvo", F.col("chutes_no_alvo").cast("int"))
        .withColumn("passes", F.col("passes").cast("int"))
        .withColumn("faltas", F.col("faltas").cast("int"))
        .withColumn("cartao_amarelo", F.col("cartao_amarelo").cast("int"))
        .withColumn("cartao_vermelho", F.col("cartao_vermelho").cast("int"))
        .withColumn("impedimentos", F.col("impedimentos").cast("int"))
        .withColumn("escanteios", F.col("escanteios").cast("int"))

        .withColumn(
            "posse_de_bola_num",
            F.when(
                (F.col("posse_de_bola").isNull()) | (F.col("posse_de_bola") == "None"),
                None
            ).otherwise(
                F.regexp_replace("posse_de_bola", "%", "").cast("double")
            )
        )

        .withColumn(
            "precisao_passes_num",
            F.when(
                (F.col("precisao_passes").isNull()) | (F.col("precisao_passes") == "None"),
                None
            ).otherwise(
                F.regexp_replace("precisao_passes", "%", "").cast("double")
            )
        )
)

silver_estatisticas.write.format("delta").mode("overwrite").saveAsTable("silver_estatisticas")
print("Tabela criada: silver_estatisticas")


A Camada Gold representa a fase final do processo de tratamento de dados, na qual as informações já limpas e padronizadas da Camada Silver são organizadas em um modelo estrela (Star Schema). O objetivo desta etapa é estruturar os dados de forma otimizada para análises, consultas e visualizações, facilitando a exploração do desempenho dos clubes, estatísticas das partidas e eventos do jogo.

In [0]:
from pyspark.sql import functions as F

# ================================================================
# 1) CARREGAR AS TABELAS SILVER
# ================================================================
silver_full         = spark.table("silver_full")
silver_gols         = spark.table("silver_gols")
silver_cartoes      = spark.table("silver_cartoes")
silver_estatisticas = spark.table("silver_estatisticas")


# ================================================================
# 2) DIMENSÕES
# ================================================================

# ------------------------------
# DIM_TEMPO
# ------------------------------
dim_tempo = (
    silver_full
        .select("data_dt")
        .where("data_dt IS NOT NULL")
        .distinct()
        .withColumn("ano", F.year("data_dt"))
        .withColumn("mes", F.month("data_dt"))
        .withColumn("dia", F.dayofmonth("data_dt"))
        .withColumn("dia_semana", F.date_format("data_dt", "E"))
)
dim_tempo.write.format("delta").mode("overwrite").saveAsTable("dim_tempo")


# ------------------------------
# DIM_CLUBE
# ------------------------------
dim_clube = (
    silver_full.select(F.col("mandante").alias("clube"))
    .union(silver_full.select(F.col("visitante").alias("clube")))
    .union(silver_gols.select("clube"))
    .union(silver_cartoes.select("clube"))
    .union(silver_estatisticas.select("clube"))
    .where("clube IS NOT NULL")
    .distinct()
    .withColumn("clube_id", F.monotonically_increasing_id())
)
dim_clube.write.format("delta").mode("overwrite").saveAsTable("dim_clube")


# ------------------------------
# DIM_ARENA
# ------------------------------
dim_arena = (
    silver_full
        .select("arena")
        .where("arena IS NOT NULL")
        .distinct()
        .withColumn("arena_id", F.monotonically_increasing_id())
)
dim_arena.write.format("delta").mode("overwrite").saveAsTable("dim_arena")


# ================================================================
# 3) CARREGAR AS DIMENSÕES PARA OS JOINS
# ================================================================
df_tempo = spark.table("dim_tempo")
df_clube = spark.table("dim_clube")
df_arena = spark.table("dim_arena")


# ================================================================
# 4) FATO_PARTIDA (SEM AMBIGUIDADES)
# ================================================================
fato_partida = (
    silver_full.alias("p")

        # DIM TEMPO
        .join(df_tempo.alias("t"), F.col("p.data_dt") == F.col("t.data_dt"), "left")

        # DIM CLUBE (MANDANTE)
        .join(
            df_clube.alias("c_m"),
            F.col("p.mandante") == F.col("c_m.clube"),
            "left"
        )

        # DIM CLUBE (VISITANTE)
        .join(
            df_clube.alias("c_v"),
            F.col("p.visitante") == F.col("c_v.clube"),
            "left"
        )

        # DIM ARENA
        .join(df_arena.alias("a"), F.col("p.arena") == F.col("a.arena"), "left")

        # SELECT SEM AMBIGUIDADE
        .select(
            F.col("p.partida_id"),
            F.col("p.data_dt"),
            F.col("p.data_hora_ts"),

            F.col("p.mandante"),
            F.col("c_m.clube_id").alias("mandante_id"),

            F.col("p.visitante"),
            F.col("c_v.clube_id").alias("visitante_id"),

            F.col("p.arena"),
            F.col("a.arena_id"),

            F.col("p.mandante_placar"),
            F.col("p.visitante_placar"),
            F.col("p.vencedor"),
            F.col("p.rodata")
        )
)

fato_partida.write.format("delta").mode("overwrite").saveAsTable("fato_partida")



# ================================================================
# 5) FATO_GOL
# ================================================================
fato_gol = (
    silver_gols.alias("g")
        .join(df_clube.alias("c"), F.col("g.clube") == F.col("c.clube"), "left")
        .join(silver_full.select("partida_id", "data_dt").alias("p"), "partida_id", "left")
        .join(df_tempo.alias("t"), "data_dt", "left")
        .select(
            "partida_id",
            F.col("g.clube"),
            F.col("c.clube_id"),
            "minuto",
            "minuto_int",
            "tipo_de_gol",
            "rodata"
        )
)

fato_gol.write.format("delta").mode("overwrite").saveAsTable("fato_gol")



# ================================================================
# 6) FATO_CARTAO
# ================================================================
fato_cartao = (
    silver_cartoes.alias("ca")
        .join(df_clube.alias("c"), F.col("ca.clube") == F.col("c.clube"), "left")
        .join(silver_full.select("partida_id", "data_dt").alias("p"), "partida_id", "left")
        .join(df_tempo.alias("t"), "data_dt", "left")
        .select(
            "partida_id",
            F.col("ca.clube"),
            F.col("c.clube_id"),
            "cartao",
            "atleta",
            "num_camisa",
            "posicao",
            "minuto",
            "minuto_int",
            "rodata"
        )
)

fato_cartao.write.format("delta").mode("overwrite").saveAsTable("fato_cartao")



# ================================================================
# 7) FATO_ESTATISTICA
# ================================================================
fato_estatistica = (
    silver_estatisticas.alias("e")
        .join(df_clube.alias("c"), F.col("e.clube") == F.col("c.clube"), "left")
        .join(silver_full.select("partida_id", "data_dt").alias("p"), "partida_id", "left")
        .join(df_tempo.alias("t"), "data_dt", "left")
        .select(
            "partida_id",
            F.col("e.clube"),
            F.col("c.clube_id"),
            "chutes",
            "chutes_no_alvo",
            "passes",
            "posse_de_bola_num",
            "precisao_passes_num",
            "faltas",
            "cartao_amarelo",
            "cartao_vermelho",
            "impedimentos",
            "escanteios",
            "rodata"
        )
)

fato_estatistica.write.format("delta").mode("overwrite").saveAsTable("fato_estatistica_time_partida")

print("✔ GOLD criado sem ambiguidades!")


**Print do esquema**

In [0]:
html = """
<div class="mermaid">
erDiagram
    dim_tempo {
        date data_dt PK
        int ano
        int mes
        int dia
        string dia_semana
    }

    dim_clube {
        int clube_id PK
        string clube
    }

    dim_arena {
        int arena_id PK
        string arena
    }

    fato_partida {
        int partida_id PK
        date data_dt FK
        int mandante_id FK
        int visitante_id FK
        int arena_id FK
        int mandante_placar
        int visitante_placar
        string vencedor
    }

    fato_gol {
        int partida_id FK
        int clube_id FK
        int minuto_int
    }

    fato_cartao {
        int partida_id FK
        int clube_id FK
        string cartao
        int minuto_int
    }

    fato_estatistica {
        int partida_id FK
        int clube_id FK
        int chutes
        int passes
        int posse
    }

    dim_tempo ||--o{ fato_partida : data
    dim_clube ||--o{ fato_partida : mandante
    dim_clube ||--o{ fato_partida : visitante
    dim_arena ||--o{ fato_partida : arena

    dim_clube ||--o{ fato_gol : clube
    dim_tempo ||--o{ fato_gol : data

    dim_clube ||--o{ fato_cartao : clube
    dim_tempo ||--o{ fato_cartao : data

    dim_clube ||--o{ fato_estatistica : clube
    dim_tempo ||--o{ fato_estatistica : data
</div>

<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
<script>
  mermaid.initialize({startOnLoad:true});
</script>
"""

displayHTML(html)


**Catalogo de Dados**

In [0]:
from pyspark.sql import functions as F

tables = [
    "silver_full",
    "silver_gols",
    "silver_cartoes",
    "silver_estatisticas",
    "dim_clube",
    "dim_tempo",
    "dim_arena",
    "fato_partida",
    "fato_gol",
    "fato_cartao",
    "fato_estatistica_time_partida"
]

catalog = []

for table in tables:
    df = spark.table(table)
    
    for field in df.schema.fields:
        col_name = field.name
        col_type = field.dataType.simpleString()

        # Domínio convertido para string
        dominio_values = (
            df.select(F.col(col_name).cast("string").alias("val"))
              .distinct()
              .limit(10)
              .collect()
        )
        dominio_str = ", ".join([row["val"] if row["val"] is not None else "NULL" 
                                 for row in dominio_values])

        # Para valores numéricos, pegar min/max, mas converter para string
        if col_type in ["int", "bigint", "double", "float", "long", "decimal"]:
            min_val = df.select(F.min(col_name)).first()[0]
            max_val = df.select(F.max(col_name)).first()[0]
            min_val = str(min_val) if min_val is not None else "NULL"
            max_val = str(max_val) if max_val is not None else "NULL"
        else:
            min_val = "N/A"
            max_val = "N/A"

        descricao = f"Coluna '{col_name}' pertencente à tabela '{table}'."

        catalog.append((table, col_name, col_type, dominio_str, min_val, max_val, descricao))

catalog_df = spark.createDataFrame(
    catalog,
    ["tabela", "coluna", "tipo", "dominio_exemplo", "minimo", "maximo", "descricao"]
)

display(catalog_df)


**Análise dos Dados**

In [0]:
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

# ================================================
# Tabelas analisadas
# ================================================
tables = [
    "silver_full",
    "silver_gols",
    "silver_cartoes",
    "silver_estatisticas"
]

# ================================================
# 1. Valores Nulos por Coluna
# ================================================
print("===== VALORES NULOS POR TABELA =====")

for table in tables:
    print(f"\n### {table} ###")
    df = spark.table(table)
    nulls = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])
    display(nulls)

# ================================================
# 2. Tipos de Dados (Schema)
# ================================================
print("\n===== TIPOS DE DADOS =====")

for table in tables:
    print(f"\nSchema da tabela {table}:")
    spark.table(table).printSchema()

# ================================================
# 3. Domínio de Categorias (Top valores)
# ================================================
print("\n===== DOMÍNIO DAS CATEGORIAS =====")

dominio_cols = {
    "silver_cartoes": ["cartao", "posicao"],
    "silver_gols": ["tipo_de_gol"],
    "silver_full": ["mandante", "visitante", "arena"]
}

for table, cols in dominio_cols.items():
    df = spark.table(table)
    print(f"\n### Domínio – {table} ###")
    for col in cols:
        print(f"\nValores distintos em '{col}':")
        display(df.groupBy(col).count().orderBy(F.desc("count")))

# ================================================
# 4. Min/Max de Colunas Numéricas
# ================================================
print("\n===== MIN / MAX NUMÉRICOS =====")

for table in tables:
    df = spark.table(table)
    numeric_cols = [c for c, t in df.dtypes if t in ("int", "double", "float", "long", "bigint")]

    if not numeric_cols:
        continue

    print(f"\n### {table} ###")

    stats = df.select(
        *[F.min(c).alias(f"min_{c}") for c in numeric_cols],
        *[F.max(c).alias(f"max_{c}") for c in numeric_cols],
    )
    display(stats)

# ================================================
# 5. Histogramas (Distribuição)
# ================================================
print("\n===== HISTOGRAMAS =====")

# Gols por minuto
print("\nHistograma – Minuto dos Gols")
df_g = spark.table("silver_gols").select("minuto_int").dropna()
pdf_g = df_g.toPandas()

plt.hist(pdf_g["minuto_int"], bins=20)
plt.title("Distribuição – Minutos dos Gols")
plt.xlabel("Minuto")
plt.ylabel("Frequência")
plt.show()

# Posse de bola
print("\nHistograma – Posse de Bola")
df_p = spark.table("silver_estatisticas").select("posse_de_bola_num").dropna()
pdf_p = df_p.toPandas()

plt.hist(pdf_p["posse_de_bola_num"], bins=20)
plt.title("Distribuição – Posse de Bola (%)")
plt.xlabel("%")
plt.ylabel("Frequência")
plt.show()

# ================================================
# 6. Inconsistências Detectadas
# ================================================
print("\n===== INCONSISTÊNCIAS DETECTADAS =====")

# Datas inválidas (timestamp null)
print("\nRegistros com data/hora inválida:")
df_full = spark.table("silver_full")
display(df_full.filter("data_hora_ts IS NULL"))

# Minutos fora do intervalo
print("\nMinutos inválidos em gols:")
df_gols = spark.table("silver_gols")
display(df_gols.filter("minuto_int < 0 OR minuto_int > 130"))

# Posse de bola inválida
print("\nPosse de bola fora do intervalo 0–100%:")
df_est = spark.table("silver_estatisticas")
display(df_est.filter("posse_de_bola_num < 0 OR posse_de_bola_num > 100"))

print("\n=== FIM DA ANÁLISE DE QUALIDADE ===")
