## ⚽ Football ETL – Transfermarkt ➜ Spark ➜ MongoDB + Neo4j
###  João Vitor Averaldo Antunes (813979) · Pedro Enrico Barchi Nogueira (813099) · Rafael Mori Pinheiro (813851)

UFSCar-CC-So-PMD2025-Grupo 10

### Profa. Dra. Sahudy Montenegro González


---
Carregamento de Paths e Dependências

In [0]:
BASE_PATH = "dbfs:/FileStore/transfermarkt"      

MONGO_URI = "mongodb://USER:PWD@host:27017/football"
NEO4J_URL = "neo4j+s://neo4j-host:7687"
NEO4J_AUTH = ("neo4j", "your-password")

from pyspark.sql import SparkSession, functions as F, types as T, Window, Row
from typing import List

spark = (
    SparkSession.builder
      .appName("FootballPipeline")
      .config("spark.mongodb.write.connection.uri", MONGO_URI)
      .config("spark.mongodb.write.database", "football")
      .getOrCreate()
)

--- 
Estruturação de Esquemas Explícitos

In [0]:
appearances_schema = T.StructType([
    T.StructField("appearance_id",              T.StringType()),
    T.StructField("game_id",                    T.LongType()),
    T.StructField("player_id",                  T.LongType()),
    T.StructField("player_club_id",             T.LongType()),
    T.StructField("player_current_club_id",     T.LongType()),
    T.StructField("date",                       T.DateType()),
    T.StructField("player_name",                T.StringType()),
    T.StructField("competition_id",             T.StringType()),
    T.StructField("yellow_cards",               T.IntegerType()),
    T.StructField("red_cards",                  T.IntegerType()),
    T.StructField("goals",                      T.IntegerType()),
    T.StructField("assists",                    T.IntegerType()),
    T.StructField("minutes_played",             T.IntegerType())
])

In [0]:
club_games_schema = T.StructType([
    T.StructField("game_id",            T.LongType()),
    T.StructField("club_id",            T.LongType()),
    T.StructField("own_goals",          T.IntegerType()),
    T.StructField("own_position",       T.IntegerType()),
    T.StructField("own_manager_name",   T.StringType()),
    T.StructField("opponent_id",        T.LongType()),
    T.StructField("opponent_goals",     T.IntegerType()),
    T.StructField("opponent_position",  T.IntegerType()),
    T.StructField("opponent_manager_name", T.StringType()),
    T.StructField("hosting",            T.StringType()),
    T.StructField("is_win",             T.IntegerType())
])

clubs_schema = T.StructType([
    T.StructField("club_id",                  T.LongType()),
    T.StructField("club_code",                T.StringType()),
    T.StructField("name",                     T.StringType()),
    T.StructField("domestic_competition_id",  T.StringType()),
    T.StructField("total_market_value",       T.StringType()),
    T.StructField("squad_size",               T.IntegerType()),
    T.StructField("average_age",              T.DoubleType()),
    T.StructField("foreigners_number",        T.IntegerType()),
    T.StructField("foreigners_percentage",    T.DoubleType()),
    T.StructField("national_team_players",    T.IntegerType()),
    T.StructField("stadium_name",             T.StringType()),
    T.StructField("stadium_seats",            T.IntegerType()),
    T.StructField("net_transfer_record",      T.StringType()),
    T.StructField("coach_name",               T.StringType()),
    T.StructField("last_season",              T.IntegerType()),
    T.StructField("filename",                 T.StringType()),
    T.StructField("url",                      T.StringType())
])


In [0]:
competitions_schema = T.StructType([
    T.StructField("competition_id",          T.StringType()),
    T.StructField("competition_code",        T.StringType()),
    T.StructField("name",                    T.StringType()),
    T.StructField("sub_type",                T.StringType()),
    T.StructField("type",                    T.StringType()),
    T.StructField("country_id",              T.IntegerType()),
    T.StructField("country_name",            T.StringType()),
    T.StructField("domestic_league_code",    T.StringType()),
    T.StructField("confederation",           T.StringType()),
    T.StructField("url",                     T.StringType()),
    T.StructField("is_major_national_league",T.BooleanType())
])

In [0]:
game_events_schema = T.StructType([
    T.StructField("game_event_id",   T.StringType()),
    T.StructField("date",            T.DateType()),
    T.StructField("game_id",         T.LongType()),
    T.StructField("minute",          T.IntegerType()),
    T.StructField("type",            T.StringType()),
    T.StructField("club_id",         T.LongType()),
    T.StructField("player_id",       T.LongType()),
    T.StructField("description",     T.StringType()),
    T.StructField("player_in_id",    T.LongType()),
    T.StructField("player_assist_id",T.LongType())
])

game_lineups_schema = T.StructType([
    T.StructField("game_lineups_id", T.StringType()),
    T.StructField("date",            T.DateType()),
    T.StructField("game_id",         T.LongType()),
    T.StructField("player_id",       T.LongType()),
    T.StructField("club_id",         T.LongType()),
    T.StructField("player_name",     T.StringType()),
    T.StructField("type",            T.StringType()),
    T.StructField("position",        T.StringType()),
    T.StructField("number",          T.IntegerType()),
    T.StructField("team_captain",    T.IntegerType())
])

games_schema = T.StructType([
    T.StructField("game_id",              T.LongType()),
    T.StructField("competition_id",       T.StringType()),
    T.StructField("season",               T.IntegerType()),
    T.StructField("round",                T.StringType()),
    T.StructField("date",                 T.DateType()),
    T.StructField("home_club_id",         T.LongType()),
    T.StructField("away_club_id",         T.LongType()),
    T.StructField("home_club_goals",      T.IntegerType()),
    T.StructField("away_club_goals",      T.IntegerType()),
    T.StructField("home_club_position",   T.IntegerType()),
    T.StructField("away_club_position",   T.IntegerType()),
    T.StructField("home_club_manager_name", T.StringType()),
    T.StructField("away_club_manager_name", T.StringType()),
    T.StructField("stadium",              T.StringType()),
    T.StructField("attendance",           T.IntegerType()),
    T.StructField("referee",              T.StringType()),
    T.StructField("url",                  T.StringType()),
    T.StructField("home_club_formation",  T.StringType()),
    T.StructField("away_club_formation",  T.StringType()),
    T.StructField("home_club_name",       T.StringType()),
    T.StructField("away_club_name",       T.StringType()),
    T.StructField("aggregate",            T.StringType()),
    T.StructField("competition_type",     T.StringType())
])

In [0]:
valuations_schema = T.StructType([
    T.StructField("player_id",        T.LongType()),
    T.StructField("date",             T.DateType()),
    T.StructField("market_value_in_eur", T.DoubleType()),
    T.StructField("current_club_id",  T.LongType()),
    T.StructField("player_club_domestic_competition_id", T.StringType())
])

In [0]:
players_schema = T.StructType([
    T.StructField("player_id",            T.LongType()),
    T.StructField("first_name",           T.StringType()),
    T.StructField("last_name",            T.StringType()),
    T.StructField("name",                 T.StringType()),
    T.StructField("last_season",          T.IntegerType()),
    T.StructField("current_club_id",      T.LongType()),
    T.StructField("player_code",          T.StringType()),
    T.StructField("country_of_birth",     T.StringType()),
    T.StructField("city_of_birth",        T.StringType()),
    T.StructField("country_of_citizenship",T.StringType()),
    T.StructField("date_of_birth",        T.TimestampType()),
    T.StructField("sub_position",         T.StringType()),
    T.StructField("position",             T.StringType()),
    T.StructField("foot",                 T.StringType()),
    T.StructField("height_in_cm",         T.IntegerType()),
    T.StructField("contract_expiration_date", T.StringType()),
    T.StructField("agent_name",           T.StringType()),
    T.StructField("image_url",            T.StringType()),
    T.StructField("url",                  T.StringType()),
    T.StructField("current_club_domestic_competition_id", T.StringType()),
    T.StructField("current_club_name",    T.StringType()),
    T.StructField("market_value_in_eur",  T.DoubleType()),
    T.StructField("highest_market_value_in_eur", T.DoubleType())
])

transfers_schema = T.StructType([
    T.StructField("player_id",        T.LongType()),
    T.StructField("transfer_date",    T.DateType()),
    T.StructField("transfer_season",  T.StringType()),
    T.StructField("from_club_id",     T.LongType()),
    T.StructField("to_club_id",       T.LongType()),
    T.StructField("from_club_name",   T.StringType()),
    T.StructField("to_club_name",     T.StringType()),
    T.StructField("transfer_fee",     T.DoubleType()),
    T.StructField("market_value_in_eur", T.DoubleType()),
    T.StructField("player_name",      T.StringType())
])

---
Carga dos Dados Brutos


In [0]:
def load_csv(name, schema):
    return (spark.read
            .option("header", True)
            .schema(schema)
            .csv(f"{BASE_PATH}/{name}.csv"))

apps_bronze         = load_csv("appearances",        appearances_schema)
club_games_bronze   = load_csv("club_games",         club_games_schema)
clubs_bronze        = load_csv("clubs",              clubs_schema)
competitions_bronze = load_csv("competitions",       competitions_schema)
events_bronze       = load_csv("game_events",        game_events_schema)
lineups_bronze      = load_csv("game_lineups",       game_lineups_schema)
games_bronze        = load_csv("games",              games_schema)
valuations_bronze   = load_csv("player_valuations",  valuations_schema)
players_bronze      = load_csv("players",            players_schema)
transfers_bronze    = load_csv("transfers",          transfers_schema)

--- 
Limpeza e Tipagem

In [0]:
def base_clean(df, pk: List[str]):
    return (df
            .dropDuplicates(pk)
            .withColumn("_ingest_timestamp", F.current_timestamp()))

apps_silver = (base_clean(apps_bronze, ["appearance_id"])
               .filter("minutes_played IS NOT NULL"))

# exemplo conversão market value "+€3.05m" → 3.05
@F.udf("double")
def parse_euro(s):
    import re
    if s is None or s == "":
        return None
    m = re.search(r"[-+€]?\s*([\d\.]+)m", s)
    return float(m.group(1))*1e6 if m else None

clubs_silver = (base_clean(clubs_bronze, ["club_id"])
                .withColumn("total_market_value_eur", parse_euro("total_market_value"))
                .withColumn("net_transfer_record_eur", parse_euro("net_transfer_record"))
                .drop("total_market_value","net_transfer_record"))

games_silver = base_clean(games_bronze, ["game_id"])
players_silver = base_clean(players_bronze, ["player_id"])
valuations_silver = base_clean(valuations_bronze, ["player_id","date"])
transfers_silver = base_clean(transfers_bronze, ["player_id","transfer_date"])
club_games_silver = base_clean(club_games_bronze, ["game_id","club_id"])
competitions_silver = base_clean(competitions_bronze, ["competition_id"])
events_silver = base_clean(events_bronze, ["game_event_id"])
lineups_silver = base_clean(lineups_bronze, ["game_lineups_id"])

# Persistir como Delta para reuso
(spark
 .createDataFrame([], T.StructType([])))  # placeholder para evitar saída longa

for name, df in [("apps",apps_silver),("clubs",clubs_silver),("games",games_silver),
                 ("players",players_silver),("valuations",valuations_silver),
                 ("transfers",transfers_silver),("club_games",club_games_silver),
                 ("competitions",competitions_silver),("events",events_silver),
                 ("lineups",lineups_silver)]:
    df.write.mode("overwrite").format("delta").save(f"/mnt/football/silver/{name}")

---
Estruturação Eixo 1

In [0]:
w_date = Window.partitionBy("player_id").orderBy("date")

valuations_enriched = (valuations_silver
                       .join(players_silver.select("player_id","date_of_birth"), "player_id")
                       .withColumn("age",
                                   F.floor(F.months_between("date","date_of_birth")/12))
                       .withColumn("prev_value",
                                   F.lag("market_value_in_eur").over(w_date))
                       .withColumn("pct_growth",
                                   (F.col("market_value_in_eur")-F.col("prev_value"))/F.col("prev_value")))

# ROI de transferências (simples: variação de valor em 12 meses após compra)
buy_side = (transfers_silver
            .select("player_id","transfer_date","to_club_id","transfer_fee")
            .withColumnRenamed("transfer_date","buy_date"))

roi = (buy_side
       .join(valuations_enriched, (valuations_enriched.player_id == buy_side.player_id) &
                                  (valuations_enriched.date >= buy_side.buy_date) &
                                  (valuations_enriched.date <= F.add_months(buy_side.buy_date,12)))
       .groupBy(buy_side.player_id, "to_club_id","transfer_fee")
       .agg(F.max("market_value_in_eur").alias("value_after_12m"))
       .withColumn("roi_pct",
                   (F.col("value_after_12m")-F.col("transfer_fee"))/F.col("transfer_fee")))

---
Estruturação Eixo 2

In [0]:
apps_enriched = (apps_silver
                 .join(games_silver.select("game_id","home_club_id","away_club_id",
                                           "home_club_goals","away_club_goals"),"game_id")
                 .withColumn("is_home",
                             F.when(F.col("player_club_id")==F.col("home_club_id"),1).otherwise(0))
                 .withColumn("is_sub",
                             F.when((F.col("minutes_played")<90) & (F.col("minutes_played")>0),1).otherwise(0))
                )

tactical_impact = (apps_enriched
                   .groupBy("player_id")
                   .agg(F.avg("goals").alias("avg_goals"),
                        F.avg("assists").alias("avg_assists"),
                        F.avg("is_sub").alias("sub_rate")))

---
Gravação no MongoDB

In [0]:
players_doc = (players_silver
               .join(tactical_impact,"player_id", "left")
               .join(valuations_enriched.groupBy("player_id")
                     .agg(F.collect_list(F.struct("date","market_value_in_eur")).alias("valuations")),
                     "player_id", "left")
               .join(clubs_silver.select(F.col("club_id").alias("current_club_id"),
                                         F.col("name").alias("current_club_name")),
                     "current_club_id", "left")
               .drop("_ingest_timestamp"))

(players_doc
 .write
 .format("mongodb")
 .option("collection","players")
 .mode("overwrite")
 .save())

---
Gravação Neo4j

In [0]:
# Nós: Player
(players_silver
 .selectExpr("player_id as id","name","date_of_birth")
 .write
 .format("org.neo4j.spark.DataSource")
 .option("url", NEO4J_URL)
 .option("authentication.type","basic")
 .option("authentication.basic.username", NEO4J_AUTH[0])
 .option("authentication.basic.password", NEO4J_AUTH[1])
 .option("labels",":Player")
 .mode("overwrite")
 .save())

# Nós: Club
(clubs_silver
 .selectExpr("club_id as id","name")
 .write
 .format("org.neo4j.spark.DataSource")
 .option("url", NEO4J_URL)
 .option("authentication.type","basic")
 .option("authentication.basic.username", NEO4J_AUTH[0])
 .option("authentication.basic.password", NEO4J_AUTH[1])
 .option("labels",":Club")
 .mode("overwrite")
 .save())

# Arestas: TRANSFERRED_TO
(career_edges
 .selectExpr("player_id as source.id",
             "to_club_id as target.id",
             "date")
 .coalesce(1)                                     # evita deadlocks
 .write
 .format("org.neo4j.spark.DataSource")
 .option("url", NEO4J_URL)
 .option("authentication.type","basic")
 .option("authentication.basic.username", NEO4J_AUTH[0])
 .option("authentication.basic.password", NEO4J_AUTH[1])
 .option("relationship","TRANSFERRED_TO")
 .option("relationship.save.strategy","keys")
 .option("relationship.source.labels",":Player")
 .option("relationship.target.labels",":Club")
 .option("relationship.source.node.keys","id")
 .option("relationship.target.node.keys","id")
 .mode("append")
 .save())


---
Checagens e métricas de qualidade

In [0]:
quality_metrics = []

for name, df in [("players_doc", players_doc),
                 ("games_silver", games_silver),
                 ("valuations_enriched", valuations_enriched)]:
    quality_metrics.append(Row(
        table=name,
        rows=df.count(),
        nulls=df.select([F.count(F.when(F.col(c).isNull(),c)).alias(c) for c in df.columns])
              .select([F.sum(c).alias(c) for c in df.columns]).first().asDict(),
        timestamp=F.current_timestamp().cast("timestamp")
    ))

spark.createDataFrame(quality_metrics) \
      .write.mode("append").format("delta") \
      .save("/mnt/football/quality_metrics")