# MVP – Engenharia de Dados | Premier League (Databricks)

_Objetivo_

Este projeto tem como objetivo desenvolver um pipeline de dados na nuvem, utilizando a plataforma Databricks Community Edition, para realizar o tratamento, modelagem, análise e visualização de dados de partidas da Premier League (campeonato inglês de futebol).

_Problema de Negócio_

O foco da análise é responder à seguinte pergunta central:

“É possível prever se um time vencerá, empatará ou perderá uma partida com base em estatísticas históricas?”

Além disso, o projeto explora outras perguntas auxiliares, como:

Quais times mandantes mais venceram nos últimos campeonatos?

Existe diferença significativa de posse de bola entre mandantes e visitantes?

Visitantes recebem mais cartões vermelhos que mandantes?

In [0]:
# Importando bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criando a sessão do Spark (já é criada automaticamente no Databricks)
spark = SparkSession.builder.appName("PremierLeagueETL").getOrCreate()

# Caminho onde o arquivo CSV foi carregado no Databricks (Altere conforme o upload do seu arquivo)
file_path = "/FileStore/shared_uploads/vrodrigues@newfields.com.br/df_full_premierleague.csv"

# Carregando o dataset em um DataFrame do PySpark
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

# Exibindo as primeiras linhas do DataFrame
# df.show(5)


## Verificando o Schema

In [0]:
df.printSchema()

In [0]:
df.limit(10).toPandas()

## Limpar e Transformar os Dados

Certificando que a coluna de Data está no tipo certo.

In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn("date", to_date(df["date"], "yyyy-MM-dd"))

Removendo valores nulos.

In [0]:
df = df.dropna()

Salvando no formato Parquet (melhor para análise e performance)

In [0]:
df.write.mode("overwrite").parquet("/FileStore/tables/premier_league_clean.parquet")

Como tabela SQL

In [0]:
df.write.mode("overwrite").saveAsTable("premier_league_clean")

Consultando a tabela SQL

In [0]:
%sql
SELECT * FROM premier_league_clean LIMIT 10;

##  Dimensão Tempo

In [0]:
from pyspark.sql.functions import year, month, dayofmonth, date_format, monotonically_increasing_id

# Carregar a base
df_clean = spark.table("premier_league_clean")

# Criar a dim_tempo
dim_tempo_df = (
    df_clean
    .select("date")
    .dropna()
    .dropDuplicates()
    .withColumn("ano", year("date"))
    .withColumn("mes", month("date"))
    .withColumn("dia", dayofmonth("date"))
    .withColumn("dia_semana", date_format("date", "EEEE"))
    .withColumn("id_tempo", monotonically_increasing_id())
)

# Reorganizar colunas
dim_tempo_df = dim_tempo_df.select("id_tempo", "date", "ano", "mes", "dia", "dia_semana")

# Salvar
dim_tempo_df.write \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("dim_tempo")

# Visualiza a dimensão
# display(dim_tempo_df)


In [0]:
display(df_clean)

## Dimensão Time

In [0]:
spark.sql("DROP TABLE IF EXISTS dim_time")

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Recriar a lista única de times
home_teams = df_clean.select(col("home_team").alias("nome_time"))
away_teams = df_clean.select(col("away_team").alias("nome_time"))
dim_time_df = home_teams.union(away_teams).distinct()

# Criar coluna de ID
dim_time_df = dim_time_df.withColumn("id_time", monotonically_increasing_id())

# Reorganizar colunas
dim_time_df = dim_time_df.select("id_time", "nome_time")

# Salvar como tabela
dim_time_df.write \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("dim_time")


Agregar estatísticas por time

In [0]:
import pyspark.sql.functions as F 

# Estatísticas selecionadas para cálculo
stats_home = [
    "performance_acum_home", "shots_avg_home", "passes_avg_home", 
    "possession_avg_home", "goals_scored_ft_avg_home", "goals_conced_ft_avg_home"
]

stats_away = [
    "performance_acum_away", "shots_avg_away", "passes_avg_away", 
    "possession_avg_away", "goals_scored_ft_avg_away", "goals_conced_ft_avg_away"
]

# Agregar estatísticas de mandante
agg_home = (
    df.groupBy("home_team")
    .agg(*[F.avg(col).alias(f"{col}_mean") for col in stats_home])
    .withColumnRenamed("home_team", "nome_time")
)

# Agregar estatísticas de visitante
agg_away = (
    df.groupBy("away_team")
    .agg(*[F.avg(col).alias(f"{col}_mean") for col in stats_away])
    .withColumnRenamed("away_team", "nome_time")
)

# Unir estatísticas
agg_all = agg_home.join(agg_away, on="nome_time", how="outer")

# Juntar com lista de times
dim_time_final = dim_time_df.join(agg_all, on="nome_time", how="left")



Salvando como tabela no Databricks

In [0]:
# Salvar como tabela Parquet
dim_time_final.write.mode("overwrite").format("parquet").save("/mnt/datalake/dim_time")

# Registrar como tabela SQL
spark.sql("DROP TABLE IF EXISTS dim_time")
spark.sql("CREATE TABLE dim_time USING PARQUET LOCATION '/mnt/datalake/dim_time'")

Verificar a dimensão no SQL


In [0]:
%sql
SELECT * FROM dim_time LIMIT 10;

## Dimensão Local

### Criar o DataFrame com os locais distintos

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Dados da dimensão local
local_data = [
    (1, "Home"),
    (2, "Away")
]

# Esquema da tabela
local_schema = StructType([
    StructField("id_local", IntegerType(), False),
    StructField("desc_local", StringType(), False)
])

# Criando DataFrame
df_dim_local = spark.createDataFrame(local_data, schema=local_schema)

# Mostrando
df_dim_local.show()

Salvar como Parquet e como Tabela SQL



In [0]:
# Salvar como Parquet
df_dim_local.write.mode("overwrite").parquet("/mnt/projeto_mvp/dim_local")

# Criar tabela SQL
df_dim_local.write.mode("overwrite").saveAsTable("dim_local")

In [0]:
%sql
SELECT * FROM dim_local;

# Dimensão Partidas

In [0]:
%sql
SHOW TABLES;


In [0]:
df_clean = spark.table("premier_league_clean")
df_clean.printSchema()
df_clean.show(5)


In [0]:
spark.sql("DROP TABLE IF EXISTS dim_partida")


In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Recriar a dimensão de partidas com ID
dim_partida_df = df_clean.select(
    "link_match", "season", "date", "home_team", "away_team", "result_full"
).dropDuplicates()

dim_partida_df = dim_partida_df.withColumn("id_partida", monotonically_increasing_id())

# Salvar como tabela
dim_partida_df.write \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("dim_partida")


In [0]:
%sql
SELECT * FROM dim_partida LIMIT 10;

## Dimensão Fato

In [0]:
# Dataset principal
df_clean = spark.table("premier_league_clean")

# Dimensões
dim_time_df = spark.table("dim_time")
dim_local_df = spark.table("dim_local")
dim_tempo_df = spark.table("dim_tempo") 
dim_partida_df = spark.table("dim_partida")


### Criar a fato partida df

In [0]:
from pyspark.sql.functions import lit

# Adicionar coluna para identificar o local (fixo como "Home")
df_clean = df_clean.withColumn("local", lit("Home"))


In [0]:
from pyspark.sql.functions import when, col


fato_partida_df = (
    df_clean
    .join(
        dim_time_df.withColumnRenamed("nome_time", "home_team")
                   .withColumnRenamed("id_time", "id_time_mandante"),
        on="home_team", how="left"
    )
    .join(
        dim_time_df.withColumnRenamed("nome_time", "away_team")
                   .withColumnRenamed("id_time", "id_time_visitante"),
        on="away_team", how="left"
    )
    .join(dim_tempo_df.withColumnRenamed("date", "date_tempo"), df_clean["date"] == col("date_tempo"), "left")
    .join(dim_partida_df, on=["link_match", "season", "date", "home_team", "away_team", "result_full"], how="left")
    .join(dim_local_df.withColumnRenamed("desc_local", "local"), on="local", how="left")
)

fato_partida_df = fato_partida_df.withColumn(
    "resultado_mandante",
    when(col("goal_home_ft") > col("goal_away_ft"), "V")
    .when(col("goal_home_ft") == col("goal_away_ft"), "E")
    .otherwise("D")
)


In [0]:
fato_partida_df = fato_partida_df.select(
    "id_time_mandante",
    "id_time_visitante",
    "id_tempo",
    "id_partida",
    "id_local",
    "result_full",
    "resultado_mandante",
    "goal_home_ft",
    "goal_away_ft",
    "home_possession",
    "away_possession",
    "home_shots",
    "away_shots",
    "home_passes",
    "away_passes",
    "home_yellow_cards",
    "away_yellow_cards",
    "home_red_cards",
    "away_red_cards"
)

display(fato_partida_df)


In [0]:
# Dropar tabela se já existir (boa prática)
spark.sql("DROP TABLE IF EXISTS fato_partida")

# Salvar a tabela fato como Delta Table
fato_partida_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("fato_partida")


In [0]:
%sql
SELECT * FROM fato_partida LIMIT 10;


# Analises de negocios 

## Respondendo a perguntas de negocio:

### Quais times mandantes mais venceram partidas?

In [0]:
%sql
SELECT 
  t.nome_time AS time_mandante,
  COUNT(*) AS total_vitorias
FROM fato_partida f
JOIN dim_time t ON f.id_time_mandante = t.id_time
WHERE resultado_mandante = 'V'
GROUP BY t.nome_time
ORDER BY total_vitorias DESC
LIMIT 10




### Qual a média de posse de bola dos times mandantes e visitantes nas partidas?

In [0]:
%sql
SELECT 
  resultado_mandante,
  ROUND(AVG(home_possession), 2) AS media_posse_mandante
FROM fato_partida
GROUP BY resultado_mandante


In [0]:
# Média de posse de bola dos mandantes por resultado
grafico_posse = (
    spark.table("fato_partida")
    .groupBy("resultado_mandante")
    .agg({"home_possession": "avg"})
    .withColumnRenamed("avg(home_possession)", "media_posse_mandante")
    .orderBy("resultado_mandante")
)

# Exibir como gráfico no Databricks
display(grafico_posse)


### Quem leva mais cartões vermelhos: mandantes ou visitantes?

In [0]:
%sql
SELECT 
  ROUND(AVG(home_red_cards), 2) AS media_vermelhos_mandantes,
  ROUND(AVG(away_red_cards), 2) AS media_vermelhos_visitantes
FROM fato_partida


In [0]:
grafico_cartoes = (
    spark.table("fato_partida")
    .agg(
        {"home_red_cards": "avg", "away_red_cards": "avg"}
    )
    .withColumnRenamed("avg(home_red_cards)", "Mandante") \
    .withColumnRenamed("avg(away_red_cards)", "Visitante")
)

# Para transformar colunas em linhas e facilitar o gráfico
from pyspark.sql.functions import lit

grafico_final = grafico_cartoes.selectExpr("Mandante as media", "'Mandante' as tipo") \
    .union(
        grafico_cartoes.selectExpr("Visitante as media", "'Visitante' as tipo")
    )

# Exibe gráfico de barras
display(grafico_final)
