# connect

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
base_path = f"abfss://lakehouse@playersanalytics60300294.dfs.core.windows.net"
spark.conf.set(
"fs.azure.account.key.playersanalytics60300294.dfs.core.windows.net",
"bKSw/Bor6nERyhVn7ZidKwHfMf1jASd77r5MRsWT4t+b++uoSOEfT9tEh33BUFbaCk2rFCWR7Hi6+ASt05hgSA=="
)

In [0]:
appearances = spark.read.parquet(f"{base_path}/processed/appearances/")
games        = spark.read.parquet(f"{base_path}/processed/games/")
players      = spark.read.parquet(f"{base_path}/processed/players/")
player_vals  = spark.read.parquet(f"{base_path}/processed/player_valuations/")
clubs        = spark.read.parquet(f"{base_path}/processed/clubs/")
competitions = spark.read.parquet(f"{base_path}/processed/competitions/")
club_games   = spark.read.parquet(f"{base_path}/processed/club_games/")

In [0]:
print("appearances:")
appearances_copy.printSchema()
print("players:")
players_copy.printSchema()
print("player_valuations:")
player_vals_copy.printSchema()

appearances:
root
 |-- appearance_id: string (nullable = true)
 |-- game_id: string (nullable = true)
 |-- player_id: string (nullable = true)
 |-- player_club_id: string (nullable = true)
 |-- player_current_club_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- yellow_cards: string (nullable = true)
 |-- red_cards: string (nullable = true)
 |-- goals: string (nullable = true)
 |-- assists: string (nullable = true)
 |-- minutes_played: string (nullable = true)

players:
root
 |-- player_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- last_season: string (nullable = true)
 |-- current_club_id: string (nullable = true)
 |-- player_code: string (nullable = true)
 |-- country_of_birth: string (nullable = true)
 |-- city_of_birth: string (nullable = true)
 |-- country_of_citizens

In [0]:
players_clean = (
    players
    .filter(F.col("player_id").isNotNull())
    .withColumn("player_id", F.col("player_id").cast("int"))
    .withColumn("height_in_cm", F.col("height_in_cm").cast("int"))
    .withColumn("date_of_birth", F.to_date("date_of_birth"))  # assumes yyyy-MM-dd
    .withColumn("market_value_in_eur", F.col("market_value_in_eur").cast("double"))
    .withColumn("highest_market_value_in_eur", F.col("highest_market_value_in_eur").cast("double"))
)

In [0]:
# ---- Clean games ----
games_clean = (
    games
    .filter(F.col("game_id").isNotNull())
    .withColumn("game_id", F.col("game_id").cast("int"))
    .withColumn("season", F.col("season").cast("int"))
    .withColumn("date", F.to_date("date"))
)

In [0]:
appearances_clean = (
    appearances
    .filter(F.col("player_id").isNotNull() & F.col("game_id").isNotNull())
    .withColumn("appearance_id", F.col("appearance_id").cast("string"))
    .withColumn("game_id", F.col("game_id").cast("int"))
    .withColumn("player_id", F.col("player_id").cast("int"))
    .withColumn("player_club_id", F.col("player_club_id").cast("int"))
    .withColumn("player_current_club_id", F.col("player_current_club_id").cast("int"))
    .withColumn("date", F.to_date("date"))
    .withColumn("yellow_cards", F.col("yellow_cards").cast("int"))
    .withColumn("red_cards", F.col("red_cards").cast("int"))
    .withColumn("goals", F.col("goals").cast("int"))
    .withColumn("assists", F.col("assists").cast("int"))
    .withColumn("minutes_played", F.col("minutes_played").cast("int"))
)

In [0]:
player_vals_clean = (
    player_vals
    .filter(F.col("player_id").isNotNull())
    .withColumn("player_id", F.col("player_id").cast("int"))
    .withColumn("current_club_id", F.col("current_club_id").cast("int"))
    .withColumn("date", F.to_date("date"))
    .withColumn("market_value_in_eur", F.col("market_value_in_eur").cast("double"))
)

In [0]:
print("=== CLEANED SCHEMAS ===")
appearances_clean.printSchema()
games_clean.printSchema()
players_clean.printSchema()
player_vals_clean.printSchema()

=== CLEANED SCHEMAS ===
root
 |-- appearance_id: string (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- player_club_id: integer (nullable = true)
 |-- player_current_club_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- player_name: string (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- yellow_cards: integer (nullable = true)
 |-- red_cards: integer (nullable = true)
 |-- goals: integer (nullable = true)
 |-- assists: integer (nullable = true)
 |-- minutes_played: integer (nullable = true)

root
 |-- game_id: integer (nullable = true)
 |-- competition_id: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- round: string (nullable = true)
 |-- date: date (nullable = true)
 |-- home_club_id: string (nullable = true)
 |-- away_club_id: string (nullable = true)
 |-- home_club_goals: string (nullable = true)
 |-- away_club_goals: string (nullable = true)
 |-- home_club_position

In [0]:
apps_games = (
    appearances_clean.alias("a")
    .join(
        games_clean.select("game_id", "season").alias("g"),
        on="game_id",
        how="left"
    )
)

player_season_stats = (
    apps_games
    .groupBy("player_id", "season")
    .agg(
        F.first("player_name", ignorenulls=True).alias("player_name"),
        F.first("player_club_id", ignorenulls=True).alias("club_id"),
        F.countDistinct("game_id").alias("matches_played"),
        F.sum("minutes_played").alias("total_minutes"),
        F.sum("goals").alias("total_goals"),
        F.sum("assists").alias("total_assists"),
        F.sum("yellow_cards").alias("yellow_cards"),
        F.sum("red_cards").alias("red_cards")
    )
)

In [0]:
player_season_features = (
    player_season_stats
    .withColumn(
        "goals_per90",
        F.when(
            F.col("total_minutes") > 0,
            F.col("total_goals") * 90.0 / F.col("total_minutes")
        ).otherwise(F.lit(0.0))
    )
    .withColumn(
        "assists_per90",
        F.when(
            F.col("total_minutes") > 0,
            F.col("total_assists") * 90.0 / F.col("total_minutes")
        ).otherwise(F.lit(0.0))
    )
    .withColumn(
        "cards_per90",
        F.when(
            F.col("total_minutes") > 0,
            (F.col("yellow_cards") + F.col("red_cards")) * 90.0 / F.col("total_minutes")
        ).otherwise(F.lit(0.0))
    )
)

In [0]:
players_dim = (
    players_clean
    .select(
        "player_id",
        "name",
        "position",
        "sub_position",
        "date_of_birth",
        "current_club_id"
    )
)

player_season_enriched = (
    player_season_features.alias("ps")
    .join(players_dim.alias("p"), on="player_id", how="left")
    .withColumn(
        "age_at_season_start",
        F.when(
            F.col("date_of_birth").isNotNull() & F.col("season").isNotNull(),
            F.col("season") - F.year("date_of_birth")
        ).otherwise(F.lit(None).cast("int"))
    )
)


In [0]:
clubs_dim = (
    clubs
    .select(
        F.col("club_id").alias("club_id_club"),
        F.col("name").alias("club_name")
    )
)

# Add club name (avoid duplicate club_id by renaming)
player_season_enriched = (
    player_season_enriched.alias("ps")
    .join(
        clubs_dim.alias("c"),
        on=F.col("ps.club_id") == F.col("c.club_id_club"),
        how="left"
    )
    .drop("club_id_club")
)

In [0]:
player_vals_with_season = (
    player_vals_clean
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .withColumn(
        "season",
        F.when(F.col("month") >= 7, F.col("year"))
         .otherwise(F.col("year") - 1)
    )
)

player_season_values = (
    player_vals_with_season
    .groupBy("player_id", "season")
    .agg(
        F.max("market_value_in_eur").alias("season_market_value_eur")
    )
)

player_season_values.show(5, truncate=False)


+---------+------+-----------------------+
|player_id|season|season_market_value_eur|
+---------+------+-----------------------+
|2306     |2004  |800000.0               |
|3165     |2004  |6000000.0              |
|5672     |2004  |4000000.0              |
|6855     |2004  |2000000.0              |
|10504    |2004  |150000.0               |
+---------+------+-----------------------+
only showing top 5 rows


In [0]:
player_season_gold = (
    player_season_enriched.alias("f")
    .join(
        player_season_values.alias("v"),
        on=["player_id", "season"],
        how="inner"   # keep only seasons with a value label
    )
)

# Cast market value to long (non-scientific)
player_season_gold = player_season_gold.withColumn(
    "season_market_value_eur",
    F.col("season_market_value_eur").cast("long")
)

player_season_gold.printSchema()
player_season_gold.show(10, truncate=False)

root
 |-- player_id: integer (nullable = true)
 |-- season: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- club_id: integer (nullable = true)
 |-- matches_played: long (nullable = false)
 |-- total_minutes: long (nullable = true)
 |-- total_goals: long (nullable = true)
 |-- total_assists: long (nullable = true)
 |-- yellow_cards: long (nullable = true)
 |-- red_cards: long (nullable = true)
 |-- goals_per90: double (nullable = true)
 |-- assists_per90: double (nullable = true)
 |-- cards_per90: double (nullable = true)
 |-- name: string (nullable = true)
 |-- position: string (nullable = true)
 |-- sub_position: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- current_club_id: string (nullable = true)
 |-- age_at_season_start: integer (nullable = true)
 |-- club_name: string (nullable = true)
 |-- season_market_value_eur: long (nullable = true)

+---------+------+-------------------+-------+--------------+-------------+-----------+---

In [0]:
gold_path = f"{base_path}/gold/player_season_value_features/"

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

(
    player_season_gold
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(gold_path)
)

print("Gold written to:", gold_path)

Gold written to: abfss://lakehouse@playersanalytics60300294.dfs.core.windows.net/gold/player_season_value_features/


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS football_lakehouse")

# Save as managed table directly from DataFrame
(
    player_season_gold
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("football_lakehouse.player_season_value_features")
)

In [0]:
spark.sql("""
SELECT player_id, player_name, date_of_birth ,season, total_goals, total_assists,
       goals_per90, assists_per90, season_market_value_eur
FROM football_lakehouse.player_season_value_features
ORDER BY season DESC, season_market_value_eur DESC
LIMIT 20
""").show(truncate=False)


+---------+-------------------+-------------+------+-----------+-------------+-------------------+-------------------+-----------------------+
|player_id|player_name        |date_of_birth|season|total_goals|total_assists|goals_per90        |assists_per90      |season_market_value_eur|
+---------+-------------------+-------------+------+-----------+-------------+-------------------+-------------------+-----------------------+
|371998   |Vinicius Junior    |2000-07-12   |2024  |19         |14           |0.4882924043403769 |0.35979440319817246|200000000              |
|418560   |Erling Haaland     |2000-07-21   |2024  |30         |4            |0.7853403141361257 |0.10471204188481675|200000000              |
|937958   |Lamine Yamal       |2007-07-13   |2024  |14         |21           |0.3565365025466893 |0.534804753820034  |180000000              |
|581678   |Jude Bellingham    |2003-06-29   |2024  |13         |13           |0.3300423131170663 |0.3300423131170663 |180000000              |