<h1 style="text-align:center;">Data collection, pre-processing and aggregation / Collecte,pré-traitement et aggrégation des données</h1>

### Importing librairies / Importer les librairies

In [1]:
# Importing librairies / Importer les librairies
import pandas as pd
import numpy as np
import pandas as pd
from typing import Optional
import json
import math
from typing import Optional, List, Dict
from urllib.request import urlopen
from hashlib import sha1
import os, re, glob

### Data recovery and pre-processing / Récupération et pré-traitement des données


#### Events

In [2]:
def build_events_all() -> pd.DataFrame:
    # Match IDs to fetch / Liste des match_id à récupérer
    match_ids = [1886347, 1899585, 1925299, 1953632, 1996435,2006229, 2011166, 2013725, 2015213, 2017461]

    # Columns to keep / Colonnes à conserver
    columns_to_keep_events = [
        "index", "match_id", "time_start", "time_end", "period", "event_type","event_subtype", "player_id", "player_name",
        "player_position","player_in_possession_id", "player_in_possession_name", "team_id", "team_shortname","channel_start",
        "third_start", "penalty_area_start", "channel_end", "third_end", "penalty_area_end","associated_player_possession_end_type",
        "associated_off_ball_run_subtype","game_state", "team_score", "opponent_team_score","team_in_possession_phase_type",
        "team_out_of_possession_phase_type","lead_to_shot", "lead_to_goal", "speed_avg", "speed_avg_band", "start_type", "end_type",
        "pass_outcome", "player_targeted_id","player_targeted_name", "player_targeted_channel_pass", "player_targeted_third_pass",
        "player_targeted_average_speed", "player_targeted_speed_avg_band","player_targeted_xthreat", "player_targeted_dangerous",
        "xthreat", "dangerous","affected_line_breaking_passing_option_xthreat","affected_line_breaking_passing_option_dangerous",
        "pressing_chain", "possession_danger", "beaten_by_possession","beaten_by_movement", "stop_possession_danger",
        "reduce_possession_danger"
    ]

    def fetch_and_filter_match(match_id: int) -> Optional[pd.DataFrame]:
        # Read the CSV / Lire le CSV
        url = f"https://raw.githubusercontent.com/SkillCorner/opendata/master/data/matches/{match_id}/{match_id}_dynamic_events.csv"
        try:
            df = pd.read_csv(url, low_memory=False)
        except Exception as e:
            print(f"[WARN] Could not read match_id={match_id} ({e}). / Lecture impossible pour match_id={match_id} ({e}).")
            return None

        # Keep only the desired columns / Ne garder que les colonnes souhaitées
        keep = [c for c in columns_to_keep_events if c in df.columns]
        df = df[keep]
        return df

    parts = []
    for mid in match_ids:
        part = fetch_and_filter_match(mid)
        if part is not None:
            parts.append(part)

    if not parts:
        raise RuntimeError("No file could be retrieved. / Aucun fichier n'a pu être récupéré.")

    events_all = pd.concat(parts, ignore_index=True)
    
    return events_all

# Data recovery and pre-processing / Récupération et pré-traitement des données
events_all = build_events_all()


#### Player

In [3]:

# Match IDs to fetch / Liste des match_id à récupérer
match_ids = [1886347, 1899585, 1925299, 1953632, 1996435, 2006229, 2011166, 2013725, 2015213, 2017461]

# Read json url / Lecture du fichier json
def _read_json_url(url: str) -> Optional[dict]:
    try:
        with urlopen(url) as resp:
            return json.loads(resp.read().decode("utf-8"))
    except Exception as e:
        print(f"[WARN] Impossible de lire {url} ({e})")
        return None

# Function to retrieve player information for each match / Fonction pour récupérer les informations des joueurs sur chaque match
def _normalize_player_record(p: dict, match_id: int, team_short_map: Dict[int, str]) -> Optional[dict]:
    try:
        player_id = int(p.get("id"))
    except:
        return None

    # Retrieve of the number of minutes played / Récupération du nombre de minutes jouées
    total = (p.get("playing_time") or {}).get("total") or {}
    minutes_played = total.get("minutes_played")
    try:
        minutes_played = float(minutes_played) if minutes_played is not None else 0.0
    except:
        minutes_played = 0.0

    # Retrieve of the player position / Récupération de la position du joueur
    role = p.get("player_role") or {}
    position_group = role.get("position_group")

    # Retrieve of the team id / Récupération de l'identifiant de son équipe
    team_id = p.get("team_id")
    team_shortname = team_short_map.get(team_id)

    # Retrieve of the player name / Récupération du nom du joueurs
    player_name = p.get("short_name")

    return {
        "match_id": match_id,"player_id": player_id,"player_name_json": player_name,"player_position_json": position_group,
        "number": p.get("number"),"birthday": p.get("birthday"),"minutes_played": minutes_played,"team_id": team_id,
        "team_shortname_json": team_shortname}

# Retrieving all player information from the URL of the JSON file for each match
# Récupération de toutes les informations des joueurs à partir de l'url du fichier json de chaque match
def build_player_all(match_ids: List[int]) -> pd.DataFrame:
    # Link url / Lien url
    base_url_tpl = "https://raw.githubusercontent.com/SkillCorner/opendata/master/data/matches/{mid}/{mid}_match.json"

    # Loop to retrieve each match / Boucle pour récupérer chaque match
    rows = []
    for mid in match_ids:
        url = base_url_tpl.format(mid=mid)
        data = _read_json_url(url)
        if not data:
            continue

        team_short_map = {}
        ht = data.get("home_team") or {}
        at = data.get("away_team") or {}
        if "id" in ht:
            team_short_map[ht["id"]] = ht.get("short_name")
        if "id" in at:
            team_short_map[at["id"]] = at.get("short_name")

        players = data.get("players") or []
        for p in players:
            rec = _normalize_player_record(p, match_id=mid, team_short_map=team_short_map)
            if rec is not None:
                rows.append(rec)

    if not rows:
        raise RuntimeError("Aucun joueur récupéré depuis les JSONs.")

    player_all = pd.DataFrame(rows)

    player_all["minutes_played"] = pd.to_numeric(player_all["minutes_played"], errors="coerce").fillna(0.0)

    return player_all

# Function execution / Execution de la fonction
player_all = build_player_all(match_ids)

### Matches informations / Informations des matches

In [4]:
# Building a function to retrieve basic information about each match 
# Construction d'une fonction pour récupérer les informations de base de chaque match
def build_match_info_for_competition(url: str,out_dir: str = os.path.join("src", "data", "info_matches")) -> pd.DataFrame:
    data = _read_json_url(url)
    if not data:
        raise RuntimeError(f"Aucun match récupéré depuis {url}.")

    matches = pd.DataFrame(data)

    if matches.empty:
        raise RuntimeError(f"Aucun match valide dans {url}.")

    # Recovery of competition_edition_id / Récupération de competition_edition_id
    competition_edition_id = matches["competition_edition_id"].iloc[0]

    # Extracting team information / Extraction des infos d'équipe
    def get_team_id(team_dict):
        if isinstance(team_dict, dict):
            return team_dict.get("id")
        return None

    def get_team_name(team_dict):
        if isinstance(team_dict, dict):
            return team_dict.get("short_name")
        return None

    # Construction of the final dataframe / Construction du dataframe final
    out_df = pd.DataFrame({
        "match_id": matches["id"],"date_time": pd.to_datetime(matches["date_time"]).dt.strftime("%Y-%m-%d"),
        "home_team_id": matches["home_team"].apply(get_team_id),"home_team_name": matches["home_team"].apply(get_team_name),
        "away_team_id": matches["away_team"].apply(get_team_id),"away_team_name": matches["away_team"].apply(get_team_name),
    })

    # Creation of the discharge file / Creation du dossier de sortie
    os.makedirs(out_dir, exist_ok=True)

    out_name = f"informations_matchs_{competition_edition_id}.csv"
    out_path = os.path.join(out_dir, out_name)

    out_df.to_csv(out_path, index=False, encoding="utf-8")

    print("[INFO] File created / Fichier créé")

    return out_df

# Function execution / Execution de la fonction
url_competition = "https://raw.githubusercontent.com/SkillCorner/opendata/master/data/matches.json"
df_info = build_match_info_for_competition(url_competition)

[INFO] File created / Fichier créé


### Calculation of statistics / Calcul des statistiques

### Matches

In [5]:
# List of each subcategory / Liste de chaque sous-catégorie
PHASES = ["build_up", "create", "finish", "direct", "quick_break", "transition", "set_play", "chaotic"]
RUN_SUBTYPES   = ["behind","coming_short","cross_receiver","dropping_off","overlap","pulling_half_space","pulling_wide",
                  "run_ahead_of_the_ball","support","underlap"]
PRESS_SUBTYPES = ["pressing","pressure","counter_press","recovery_press","other"]
GAME_STATES    = ["winning","drawing","loosing"]
SPEED_BANDS_MOVEMENT = ["jogging","running","hsr","sprinting"]
SPEED_BANDS_RUNS     = ["running","hsr","sprinting"]
LATERAL = ["left","center","right"]
THIRDS  = ["middle_third","attacking_third"]
PENALTY = ["penalty"]

# Function to retrieve statistics by game / Fonction pour récupérer les statistiques par match
def build_match_stats(events_all: pd.DataFrame, out_dir: str = os.path.join("src","data","matches")) -> pd.DataFrame:
    # Time sorting / Tri temporel
    base = events_all.copy()
    time_cols = [c for c in ["time_end","time_start"] if c in base.columns]
    if time_cols:
        base = base.sort_values(time_cols, ascending=True)
    elif "index" in base.columns:
        base = base.sort_values(["index"], ascending=True)

    # Standardizations / Normalisations
    for col in ["event_subtype","team_in_possession_phase_type","game_state","channel_end","third_end","speed_avg_band"]:
        if col in base.columns:
            base[col] = base[col].astype(str).str.lower()
    if "game_state" in base.columns:
        base["game_state"] = base["game_state"].replace({"losing":"loosing"})

    # Mapping zones and speed / Mapping zones et vitesse
    def map_lateral(v):
        if v == "center": return "center"
        if v in ("half_space_left","wide_left"): return "left"
        if v in ("half_space_right","wide_right"): return "right"
        return None

    def map_third(v):
        if v in THIRDS: return v
        return None

    def map_penalty(flag):
        try:
            return "penalty" if bool(flag) else None
        except:
            return None

    def map_speed_move(v):
        return v if v in SPEED_BANDS_MOVEMENT else None

    def map_speed_run(v):
        return v if v in SPEED_BANDS_RUNS else None

    # Player informations / Infos joueur
    info = (
        base[["match_id","player_id","player_name","player_position","team_shortname"]]
        .dropna(subset=["match_id","player_id"])
        .drop_duplicates(subset=["match_id","player_id"], keep="last")
    )

    # Helpers pivot
    def _pivot_fixed(df_metric: pd.DataFrame, value_col: str, prefix: str, colname: str, buckets: list) -> pd.DataFrame:
        if df_metric.empty:
            return pd.DataFrame(columns=["match_id","player_id"] + [f"{prefix}_{b}" for b in buckets])
        wide = (
            df_metric.pivot_table(index=["match_id","player_id"], columns=colname, values=value_col,
                                  aggfunc="sum", fill_value=0.0).rename_axis(None, axis=1)
        )
        for b in buckets:
            if b not in wide.columns:
                wide[b] = 0.0
        wide = wide[buckets]
        wide.columns = [f"{prefix}_{b}" for b in buckets]
        return wide.reset_index()

    def _pivot_phase(df_metric: pd.DataFrame, value_col: str, prefix: str) -> pd.DataFrame:
        return _pivot_fixed(df_metric, value_col, prefix, "team_in_possession_phase_type", PHASES)

    def _pivot_subtype(df_metric: pd.DataFrame, value_col: str, prefix: str, subtype_list: list, colname: str) -> pd.DataFrame:
        return _pivot_fixed(df_metric, value_col, prefix, colname, subtype_list)

    def _pivot_state(df_metric: pd.DataFrame, value_col: str, prefix: str) -> pd.DataFrame:
        return _pivot_fixed(df_metric, value_col, prefix, "game_state", GAME_STATES)

    def _pivot_speed_move(df_metric: pd.DataFrame, value_col: str, prefix: str) -> pd.DataFrame:
        return _pivot_fixed(df_metric, value_col, prefix, "speed_avg_band", SPEED_BANDS_MOVEMENT)

    def _pivot_speed_runs(df_metric: pd.DataFrame, value_col: str, prefix: str) -> pd.DataFrame:
        return _pivot_fixed(df_metric, value_col, prefix, "speed_avg_band", SPEED_BANDS_RUNS)
    
    # total movement, by phase, by state, by zones, by speed / top_movement global, par phase, par state, par zones, par vitesse
    
    # Retrieves the value of xthreat and the associated information during an event involving a pass option
    # Récupère la valeur de xthreat et les informations associées lors d'un évenement impliquant une option de passe
    agg_move = (
        base.loc[base["event_type"] == "passing_option"]
            .groupby(["match_id","player_id"])["xthreat"]
            .sum().reset_index().rename(columns={"xthreat":"top_movement"})
    )
    move_phase = base.loc[base["event_type"] == "passing_option",
                          ["match_id","player_id","team_in_possession_phase_type","xthreat"]] \
                   .dropna(subset=["match_id","player_id","team_in_possession_phase_type"])
    agg_move_phase  = move_phase.groupby(["match_id","player_id","team_in_possession_phase_type"], as_index=False)["xthreat"].sum() \
                                .rename(columns={"xthreat":"top_movement"})
    wide_move_phase = _pivot_phase(agg_move_phase, "top_movement", "top_movement")

    move_state = base.loc[base["event_type"] == "passing_option",
                          ["match_id","player_id","game_state","xthreat"]] \
                   .dropna(subset=["match_id","player_id","game_state"])
    agg_move_state  = move_state.groupby(["match_id","player_id","game_state"], as_index=False)["xthreat"].sum() \
                                .rename(columns={"xthreat":"top_movement"})
    wide_move_state = _pivot_state(agg_move_state, "top_movement", "top_movement")

    move_zone = base.loc[base["event_type"] == "passing_option",
                         ["match_id","player_id","channel_end","third_end","penalty_area_end","xthreat"]].copy()
    move_zone["lateral_zone"] = move_zone["channel_end"].map(map_lateral)
    move_zone["third_zone"]   = move_zone["third_end"].map(map_third)
    move_zone["pen_zone"]     = move_zone["penalty_area_end"].map(map_penalty)
    wide_move_lat   = _pivot_subtype(move_zone.dropna(subset=["lateral_zone"]), "xthreat", "top_movement", LATERAL, "lateral_zone")
    wide_move_third = _pivot_subtype(move_zone.dropna(subset=["third_zone"]),   "xthreat", "top_movement", THIRDS,  "third_zone")
    wide_move_pen   = _pivot_subtype(move_zone.dropna(subset=["pen_zone"]),     "xthreat", "top_movement", PENALTY, "pen_zone")

    move_speed = base.loc[base["event_type"] == "passing_option",
                          ["match_id","player_id","speed_avg_band","xthreat"]].copy()
    move_speed["speed_avg_band"] = move_speed["speed_avg_band"].map(map_speed_move)
    wide_move_speed = _pivot_speed_move(move_speed.dropna(subset=["speed_avg_band"]), "xthreat", "top_movement")

    # top_off_ball_runs overall, by phase, by subtype, by state, by zone, by speed
    # top_off_ball_runs global, par phase, par subtype, par state, par zones, par vitesse

    # Retrieves the value of xthreat and the associated information during an event involving a off ball run
    # Récupère la valeur de xthreat et les informations associées lors d'un évenement impliquant une course sans ballon
    agg_run = (
        base.loc[base["event_type"] == "off_ball_run"]
            .groupby(["match_id","player_id"])["xthreat"]
            .sum().reset_index().rename(columns={"xthreat":"top_off_ball_runs"})
    )
    run_phase = base.loc[base["event_type"] == "off_ball_run",
                         ["match_id","player_id","team_in_possession_phase_type","xthreat"]] \
                .dropna(subset=["match_id","player_id","team_in_possession_phase_type"])
    agg_run_phase  = run_phase.groupby(["match_id","player_id","team_in_possession_phase_type"], as_index=False)["xthreat"].sum() \
                              .rename(columns={"xthreat":"top_off_ball_runs"})
    wide_run_phase = _pivot_phase(agg_run_phase, "top_off_ball_runs", "top_off_ball_runs")

    run_subtype = base.loc[(base["event_type"] == "off_ball_run") & (base["event_subtype"].isin(RUN_SUBTYPES)),
                           ["match_id","player_id","event_subtype","xthreat"]] \
                 .dropna(subset=["match_id","player_id","event_subtype"])
    agg_run_subtype  = run_subtype.groupby(["match_id","player_id","event_subtype"], as_index=False)["xthreat"].sum() \
                                   .rename(columns={"xthreat":"top_off_ball_runs"})
    wide_run_subtype = _pivot_subtype(agg_run_subtype, "top_off_ball_runs", "top_off_ball_runs", RUN_SUBTYPES, "event_subtype")

    run_state = base.loc[base["event_type"] == "off_ball_run",
                         ["match_id","player_id","game_state","xthreat"]] \
               .dropna(subset=["match_id","player_id","game_state"])
    agg_run_state  = run_state.groupby(["match_id","player_id","game_state"], as_index=False)["xthreat"].sum() \
                              .rename(columns={"xthreat":"top_off_ball_runs"})
    wide_run_state = _pivot_state(agg_run_state, "top_off_ball_runs", "top_off_ball_runs")

    run_zone = base.loc[base["event_type"] == "off_ball_run",
                        ["match_id","player_id","channel_end","third_end","penalty_area_end","xthreat"]].copy()
    run_zone["lateral_zone"] = run_zone["channel_end"].map(map_lateral)
    run_zone["third_zone"]   = run_zone["third_end"].map(map_third)
    run_zone["pen_zone"]     = run_zone["penalty_area_end"].map(map_penalty)
    wide_run_lat   = _pivot_subtype(run_zone.dropna(subset=["lateral_zone"]), "xthreat", "top_off_ball_runs", LATERAL, "lateral_zone")
    wide_run_third = _pivot_subtype(run_zone.dropna(subset=["third_zone"]),   "xthreat", "top_off_ball_runs", THIRDS,  "third_zone")
    wide_run_pen   = _pivot_subtype(run_zone.dropna(subset=["pen_zone"]),     "xthreat", "top_off_ball_runs", PENALTY, "pen_zone")

    run_speed = base.loc[base["event_type"] == "off_ball_run",
                         ["match_id","player_id","speed_avg_band","xthreat"]].copy()
    run_speed["speed_avg_band"] = run_speed["speed_avg_band"].map(map_speed_run)
    wide_run_speed = _pivot_speed_runs(run_speed.dropna(subset=["speed_avg_band"]), "xthreat", "top_off_ball_runs")

    # top_choice global, by state, by zones / top_choice global, par state, par zones
    # top_choice en pourcentage: global, par state, par zones, par phase

    # We calculate the percentage of correct choices made by each player in relation to the possible pass options (more than two)
    # and the xthreat value. We also collect information associated with this choice.
    # On calcule le pourcentage de bon choix effectué par joueur au regard des options de passes possible (au delà de 2),
    # et de la valeur du xthreat. On récupère également les informations associées à ce choix.
    def is_pivot(row) -> bool:
        return (
            row.get("event_type") == "player_possession"
            and row.get("end_type") == "pass"
            and float(row.get("player_targeted_xthreat", 0.0)) > 0.0
        )
    
    rows_global = []
    rows_state  = []
    rows_lat    = []
    rows_third  = []
    rows_pen    = []
    rows_phase  = []
    
    EPS = 1e-12
    
    for mid, dfm in base.groupby("match_id", sort=False):
        dfm = dfm.reset_index(drop=True)
    
        current_actor_id = None
        chosen_target_id = None
        chosen_target_name = None
        chosen_x = None
        current_state = None
        current_phase = None
    
        chosen_lat = None
        chosen_thd = None
        chosen_pen = None
    
        options_any = [] 
    
        def push_decision():
            # Validates a decision if at least 2 options in total / Valide une décision si au moins 2 options au total
            if (current_actor_id is None) or (chosen_x is None):
                return
            total_opts = 1 + len(options_any)
            if total_opts < 2:
                # no attempt if only one option / pas de tentative si une seule option
                return
            best_other = float(np.max(options_any)) if len(options_any) > 0 else -np.inf
            success = 1 if (chosen_x + EPS >= best_other) else 0
            attempt = 1
    
            rows_global.append((mid, current_actor_id, success, attempt))
            if current_state is not None:
                rows_state.append((mid, current_actor_id, current_state, success, attempt))
            if chosen_lat is not None:
                rows_lat.append((mid, current_actor_id, chosen_lat, success, attempt))
            if chosen_thd is not None:
                rows_third.append((mid, current_actor_id, chosen_thd, success, attempt))
            if chosen_pen is not None:
                rows_pen.append((mid, current_actor_id, chosen_pen, success, attempt))
            if current_phase is not None:
                rows_phase.append((mid, current_actor_id, current_phase, success, attempt))
                
        # We compare the possible pass options, memorising the passer's choice.
        # On compare les options de passes possibles, en mémorisant le choix du passeur
        for _, row in dfm.iterrows():
            if is_pivot(row):
                push_decision()
                current_actor_id   = row["player_id"]
                chosen_x           = float(row["player_targeted_xthreat"])
                chosen_target_id   = row.get("player_targeted_id")
                chosen_target_name = row.get("player_targeted_name")
                current_state      = row.get("game_state")
                current_phase      = row.get("team_in_possession_phase_type")  # nouveau
                chosen_lat = None; chosen_thd = None; chosen_pen = None
                options_any = []
                continue
    
            if (current_actor_id is not None) and (chosen_x is not None):
                if row.get("event_type") == "passing_option":
                    opt_tid   = row.get("player_targeted_id")
                    opt_tname = row.get("player_targeted_name")
                    xv        = float(row.get("xthreat", 0.0))
    
                    is_chosen = False
                    if pd.notna(opt_tid) and pd.notna(chosen_target_id):
                        is_chosen = (opt_tid == chosen_target_id)
                    if (not is_chosen) and isinstance(opt_tname, str) and isinstance(chosen_target_name, str):
                        is_chosen = (opt_tname == chosen_target_name)
                    if (not is_chosen) and abs(xv - chosen_x) <= EPS:
                        is_chosen = True
    
                    if is_chosen:
                        # the selected pass area is memorised / on mémorise la zone de la passe choisie
                        chosen_lat = map_lateral(row.get("channel_end"))
                        chosen_thd = map_third(row.get("third_end"))
                        chosen_pen = map_penalty(row.get("penalty_area_end"))
                    else:
                        options_any.append(xv)
        push_decision()
        
    # We calculate the percentage of correct choices made on the match, as well as the total number of choices.
    # On calcule le pourcentage de bon choix effectué sur le match, ainsi que le nombre de choix au total.
    def _to_pct_global(rows):
        if not rows:
            return pd.DataFrame(columns=["match_id","player_id","top_choice","top_choice_attempts"])
        df = pd.DataFrame(rows, columns=["match_id","player_id","success","attempt"])
        agg = df.groupby(["match_id","player_id"], as_index=False)[["success","attempt"]].sum()
        agg["top_choice"] = np.where(agg["attempt"] > 0, agg["success"] / agg["attempt"], 0.0)
        agg["top_choice_attempts"] = agg["attempt"]
        return agg[["match_id","player_id","top_choice","top_choice_attempts"]]

    
    def _to_pct_bucket(rows, bucket_col_name, prefix):
        if not rows:
            return pd.DataFrame(columns=["match_id","player_id"])
        df = pd.DataFrame(rows, columns=["match_id","player_id",bucket_col_name,"success","attempt"])
        agg = df.groupby(["match_id","player_id",bucket_col_name], as_index=False)[["success","attempt"]].sum()
        agg["top_choice"] = np.where(agg["attempt"] > 0, agg["success"] / agg["attempt"], 0.0)

        pct_df = agg.rename(columns={"top_choice": "value"})
        att_df = agg.rename(columns={"attempt": "value"})

        if bucket_col_name == "game_state":
            wide_pct = _pivot_state(pct_df, "value", "top_choice")
            wide_att = _pivot_state(att_df, "value", "top_choice_attempts")
        elif bucket_col_name == "lateral":
            wide_pct = _pivot_subtype(pct_df, "value", "top_choice", LATERAL, "lateral")
            wide_att = _pivot_subtype(att_df, "value", "top_choice_attempts", LATERAL, "lateral")
        elif bucket_col_name == "third":
            wide_pct = _pivot_subtype(pct_df, "value", "top_choice", THIRDS, "third")
            wide_att = _pivot_subtype(att_df, "value", "top_choice_attempts", THIRDS, "third")
        elif bucket_col_name == "pen":
            wide_pct = _pivot_subtype(pct_df, "value", "top_choice", PENALTY, "pen")
            wide_att = _pivot_subtype(att_df, "value", "top_choice_attempts", PENALTY, "pen")
        elif bucket_col_name == "team_in_possession_phase_type":
            wide_pct = _pivot_phase(pct_df, "value", "top_choice")
            wide_att = _pivot_phase(att_df, "value", "top_choice_attempts")
        else:
            return pd.DataFrame(columns=["match_id","player_id"])

        wide = wide_pct.merge(wide_att, on=["match_id","player_id"], how="left")
        return wide

    # Finals tables / Tables finales
    tc_df            = _to_pct_global(rows_global)
    wide_choice_state= _to_pct_bucket(rows_state,  "game_state", "top_choice")
    wide_choice_lat  = _to_pct_bucket(rows_lat,    "lateral","top_choice")
    wide_choice_third= _to_pct_bucket(rows_third,  "third","top_choice")
    wide_choice_pen  = _to_pct_bucket(rows_pen,    "pen","top_choice")
    wide_choice_phase= _to_pct_bucket(rows_phase,  "team_in_possession_phase_type", "top_choice")

    # top_press global, by subtype, and by phase of play / top_press global, par subtype, et par phase de jeu

    # We recover the influence of the pressure exerted by the player on the danger of the opposing action and related information
    # On récupère l'influence de la pression émise par le joueur sur le danger de l'action adverse et les informations associées
    press = base.loc[base.get("possession_danger", False) == True].copy()
    if not press.empty:
        b_bp  = press["beaten_by_possession"].fillna(False)
        b_bm  = press["beaten_by_movement"].fillna(False)
        b_red = press["reduce_possession_danger"].fillna(False)
        b_aff = press["affected_line_breaking_passing_option_dangerous"].fillna(False)
        b_stp = press["stop_possession_danger"].fillna(False)

        Penaltie      = np.where((b_bp | b_bm), -0.5, 0.0)
        little_bonus  = np.where((b_red | b_aff),  0.5, 0.0)
        bonus         = np.where(b_stp,            1, 0.0)
        is_neutral    = (~(b_bp | b_bm | b_red | b_aff | b_stp)).astype(bool)
        neutral_bonus = np.where(is_neutral,       0.25, 0.0)

        press["press_score"] = Penaltie + little_bonus + bonus + neutral_bonus

        agg_press = press.groupby(["match_id","player_id"])["press_score"].sum().reset_index().rename(columns={"press_score":"top_press"})

        press_subtype = press.loc[press["event_subtype"].isin(PRESS_SUBTYPES),
                                  ["match_id","player_id","event_subtype","press_score"]].dropna(subset=["match_id","player_id","event_subtype"])
        agg_press_subtype  = press_subtype.groupby(["match_id","player_id","event_subtype"], as_index=False)["press_score"].sum() \
                                         .rename(columns={"press_score":"top_press"})
        wide_press_subtype = _pivot_subtype(agg_press_subtype, "top_press", "top_press", PRESS_SUBTYPES, "event_subtype")

        press_state = press.loc[press["game_state"].isin(GAME_STATES),
                                ["match_id","player_id","game_state","press_score"]].dropna(subset=["match_id","player_id","game_state"])
        agg_press_state  = press_state.groupby(["match_id","player_id","game_state"], as_index=False)["press_score"].sum() \
                                       .rename(columns={"press_score":"top_press"})
        wide_press_state = _pivot_state(agg_press_state, "top_press", "top_press")
    else:
        agg_press = pd.DataFrame(columns=["match_id","player_id","top_press"])
        wide_press_subtype = pd.DataFrame(columns=["match_id","player_id"] + [f"top_press_{s}" for s in PRESS_SUBTYPES])
        wide_press_state   = pd.DataFrame(columns=["match_id","player_id"] + [f"top_press_{s}" for s in GAME_STATES])

    # Merge / Fusion
    result_global = (
        agg_move.merge(agg_run,  on=["match_id","player_id"], how="outer")
                .merge(tc_df,    on=["match_id","player_id"], how="outer")
                .merge(agg_press,on=["match_id","player_id"], how="outer")
    )

    result = (
        info.merge(result_global,       on=["match_id","player_id"], how="left")
            # movement
            .merge(wide_move_phase,     on=["match_id","player_id"], how="left")
            .merge(wide_move_state,     on=["match_id","player_id"], how="left")
            .merge(wide_move_lat,       on=["match_id","player_id"], how="left")
            .merge(wide_move_third,     on=["match_id","player_id"], how="left")
            .merge(wide_move_pen,       on=["match_id","player_id"], how="left")
            .merge(wide_move_speed,     on=["match_id","player_id"], how="left")
            # runs
            .merge(wide_run_phase,      on=["match_id","player_id"], how="left")
            .merge(wide_run_subtype,    on=["match_id","player_id"], how="left")
            .merge(wide_run_state,      on=["match_id","player_id"], how="left")
            .merge(wide_run_lat,        on=["match_id","player_id"], how="left")
            .merge(wide_run_third,      on=["match_id","player_id"], how="left")
            .merge(wide_run_pen,        on=["match_id","player_id"], how="left")
            .merge(wide_run_speed,      on=["match_id","player_id"], how="left")
            # choice
            .merge(wide_choice_phase,  on=["match_id","player_id"], how="left")
            .merge(wide_choice_state,   on=["match_id","player_id"], how="left")
            .merge(wide_choice_lat,     on=["match_id","player_id"], how="left")
            .merge(wide_choice_third,   on=["match_id","player_id"], how="left")
            .merge(wide_choice_pen,     on=["match_id","player_id"], how="left")

            # press
            .merge(wide_press_subtype,  on=["match_id","player_id"], how="left")
            .merge(wide_press_state,    on=["match_id","player_id"], how="left")
    )

    # Filling and rounding / Remplissage et arrondis
    metric_cols = [col for col in result.columns if col not in ["match_id","player_id","player_name","player_position","team_shortname"]]
    for c in metric_cols:
        result[c] = pd.to_numeric(result[c], errors="coerce").fillna(0.0).astype(float).round(2)

    # Final columns / Colonnes finales
    cols_m_phase  = [f"top_movement_{p}" for p in PHASES if f"top_movement_{p}" in result.columns]
    cols_c_phase = [f"top_choice_{p}" for p in PHASES if f"top_choice_{p}" in result.columns]
    cols_c_attempts_phase = [f"top_choice_attempts_{p}" for p in PHASES      if f"top_choice_attempts_{p}" in result.columns]
    cols_r_phase  = [f"top_off_ball_runs_{p}" for p in PHASES if f"top_off_ball_runs_{p}" in result.columns]
    cols_r_sub    = [f"top_off_ball_runs_{s}" for s in RUN_SUBTYPES if f"top_off_ball_runs_{s}" in result.columns]
    cols_p_sub    = [f"top_press_{s}" for s in PRESS_SUBTYPES if f"top_press_{s}" in result.columns]
    cols_m_state  = [f"top_movement_{s}" for s in GAME_STATES if f"top_movement_{s}" in result.columns]
    cols_r_state  = [f"top_off_ball_runs_{s}" for s in GAME_STATES if f"top_off_ball_runs_{s}" in result.columns]
    cols_c_state  = [f"top_choice_{s}" for s in GAME_STATES if f"top_choice_{s}" in result.columns]
    cols_c_attempts_state = [f"top_choice_attempts_{s}" for s in GAME_STATES if f"top_choice_attempts_{s}" in result.columns]
    cols_p_state  = [f"top_press_{s}" for s in GAME_STATES if f"top_press_{s}" in result.columns]
    cols_m_lat    = [f"top_movement_{b}" for b in LATERAL if f"top_movement_{b}" in result.columns]
    cols_m_thd    = [f"top_movement_{b}" for b in THIRDS  if f"top_movement_{b}" in result.columns]
    cols_m_pen    = ["top_movement_penalty"] if "top_movement_penalty" in result.columns else []
    cols_r_lat    = [f"top_off_ball_runs_{b}" for b in LATERAL if f"top_off_ball_runs_{b}" in result.columns]
    cols_r_thd    = [f"top_off_ball_runs_{b}" for b in THIRDS  if f"top_off_ball_runs_{b}" in result.columns]
    cols_r_pen    = ["top_off_ball_runs_penalty"] if "top_off_ball_runs_penalty" in result.columns else []
    cols_c_lat    = [f"top_choice_{b}" for b in LATERAL if f"top_choice_{b}" in result.columns]
    cols_c_attempts_lat   = [f"top_choice_attempts_{b}" for b in LATERAL     if f"top_choice_attempts_{b}" in result.columns]
    cols_c_thd    = [f"top_choice_{b}" for b in THIRDS  if f"top_choice_{b}" in result.columns]
    cols_c_attempts_thd   = [f"top_choice_attempts_{b}" for b in THIRDS      if f"top_choice_attempts_{b}" in result.columns]
    cols_c_pen    = ["top_choice_penalty"] if "top_choice_penalty" in result.columns else []
    cols_c_attempts_pen   = ["top_choice_attempts_penalty"] if "top_choice_attempts_penalty" in result.columns else []
    cols_m_spd    = [f"top_movement_{b}" for b in SPEED_BANDS_MOVEMENT if f"top_movement_{b}" in result.columns]
    cols_r_spd    = [f"top_off_ball_runs_{b}" for b in SPEED_BANDS_RUNS if f"top_off_ball_runs_{b}" in result.columns]

    # We sort the list of columns / On ordonne la liste de colonnes
    ordered_cols = [
        "player_id","player_name","player_position","team_shortname",
        "top_movement","top_off_ball_runs","top_choice","top_choice_attempts","top_press",
        *cols_m_phase, *cols_r_phase, *cols_r_sub, *cols_p_sub,
        *cols_m_state, *cols_r_state, *cols_c_state, *cols_c_attempts_state, *cols_p_state,
        *cols_m_lat, *cols_m_thd, *cols_m_pen,
        *cols_r_lat, *cols_r_thd, *cols_r_pen,
        *cols_c_lat, *cols_c_attempts_lat,
        *cols_c_thd, *cols_c_attempts_thd,
        *cols_c_pen, *cols_c_attempts_pen,
        *cols_c_phase, *cols_c_attempts_phase,
        *cols_m_spd, *cols_r_spd,
        "match_id",
    ]
    ordered_cols = [c for c in ordered_cols if c in result.columns]
    result = result[ordered_cols]

    # Save per match / Sauvegarde par match
    os.makedirs(out_dir, exist_ok=True)
    for mid, sub in result.groupby("match_id"):
        out_path = os.path.join(out_dir, f"{int(mid)}_stats.csv")
        sub_out = sub.drop(columns=["match_id"], errors="ignore").sort_values(
            [c for c in ["top_choice","top_press","top_movement","top_off_ball_runs"] if c in sub.columns] or ["player_name"],
            ascending=False
        )
        sub_out.to_csv(out_path, index=False)

    print("Update of statistics per match / Mise à jour des statistiques par match")
    return result

# Function execution / Execution de la fonction
result_df = build_match_stats(events_all)

Update of statistics per match / Mise à jour des statistiques par match


### Équipes / Teams

In [6]:
# Aggregate statistics by team / Aggrége les statistiques par équipe
def build_team_stats_table(match_dir: str = os.path.join("src", "data", "matches"),out_dir: str = os.path.join("src", "data", "teams"),
    out_name: str = "team.csv") -> pd.DataFrame:
    os.makedirs(out_dir, exist_ok=True)

    pattern = re.compile(r"(\d+)_stats\.csv$")
    files = sorted(glob.glob(os.path.join(match_dir, "*_stats.csv")))
    if not files:
        print("No _stats.csv file found / Aucun fichier _stats.csv trouvé.")
        return pd.DataFrame()

    frames = []
    for fp in files:
        try:
            df = pd.read_csv(fp)
            # Add match_id from the file name if not available / Ajouter match_id depuis le nom de fichier si absent
            if "match_id" not in df.columns:
                m = pattern.search(os.path.basename(fp))
                if m:
                    df = df.copy()
                    df["match_id"] = int(m.group(1))
                else:
                    print(f"Unrecognised file name, ignored / Nom de fichier non reconnu, ignoré: {fp}")
                    continue
            if {"team_shortname", "match_id"}.issubset(df.columns):
                frames.append(df)
            else:
                print(f"Missing columns in / Colonnes manquantes dans {fp}")
        except Exception as e:
            print(f"Reading error / Erreur lecture {fp}: {e}")

    if not frames:
        print("No usable data / Aucune donnée exploitable.")
        return pd.DataFrame()

    data = pd.concat(frames, ignore_index=True)
    data = data.dropna(subset=["team_shortname", "match_id"])
    for c in data.columns:
        if c not in {"team_shortname"}:
            data[c] = pd.to_numeric(data[c], errors="ignore")

    numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist()
    exclude_cols = {"match_id", "player_id"}
    metric_cols = [c for c in numeric_cols if c not in exclude_cols]
    if not metric_cols:
        print("No digital metric column found / Aucune colonne de métrique numérique trouvée.")
        return pd.DataFrame()

    # Team total in each match / Somme par équipe dans chaque match
    per_match_team = (
        data.groupby(["team_shortname", "match_id"], as_index=False)[metric_cols]
            .sum()
    )

    top_choice_cols = [
        c for c in metric_cols
        if c.startswith("top_choice") and not c.startswith("top_choice_attempts")
    ]

    # We construct the pairs (top_choice_xxx, top_choice_attempts_xxx) / On construit les paires (top_choice_xxx, top_choice_attempts_xxx) 
    pairs = []
    for col in top_choice_cols:
        suffix = col[len("top_choice"):]  # "", "_winning", "_left", etc.
        att_col = "top_choice_attempts" + suffix
        if att_col in metric_cols:
            pairs.append((col, att_col))

    if pairs:
        weighted_df = None

        for col, att_col in pairs:
            # For each match and team: attempts_tot and a weighted rate are calculated.
            # Pour chaque match et équipe: on calcule attempts_tot et un taux pondéré
            tmp = (
                data.groupby(["team_shortname", "match_id"])[[col, att_col]]
                    .apply(
                        lambda g: pd.Series({
                            col: (g[col] * g[att_col]).sum() / g[att_col].sum()
                                 if g[att_col].sum() > 0 else 0.0,
                            att_col: g[att_col].sum()
                        })
                    )
                    .reset_index()
            )

            if weighted_df is None:
                weighted_df = tmp
            else:
                weighted_df = weighted_df.merge(tmp, on=["team_shortname", "match_id"], how="outer")

        # In per_match_team, the values of top_choice and top_choice_attempts are replaced by these weighted values.
        # On remplace dans per_match_team les valeurs de top_choice et top_choice_attempts par ces valeurs pondérées
        if weighted_df is not None:
            drop_cols = []
            for col, att_col in pairs:
                if col in per_match_team.columns:
                    drop_cols.append(col)
                if att_col in per_match_team.columns:
                    drop_cols.append(att_col)
            drop_cols = list(set(drop_cols))

            per_match_team = (
                per_match_team.drop(columns=drop_cols, errors="ignore")
                              .merge(weighted_df, on=["team_shortname", "match_id"], how="left")
            )

    # Number of matches played per team / Nombre de matchs joués par équipe
    match_played = (
        per_match_team.groupby("team_shortname", as_index=False)["match_id"]
            .nunique()
            .rename(columns={"match_id": "match_played"})
    )

    # Average per match team statistics / Moyenne par match des stats d'équipe
    team_means = (
        per_match_team.groupby("team_shortname", as_index=False)[metric_cols]
            .mean()
    )

    # Final mergers / Fusion finales
    team_table = team_means.merge(match_played, on="team_shortname", how="left")

    # Creating id_team from the name / Création id_team à partir du nom
    team_table.insert(
        0,
        "id_team",
        team_table["team_shortname"].apply(
            lambda x: int(sha1(str(x).encode()).hexdigest(), 16) % (10**8)
        )
    )

    # Types and rounding / Types et arrondis
    for c in team_table.columns:
        if c == "match_played":
            team_table[c] = team_table[c].astype(int)
        elif c not in {"id_team", "team_shortname"}:
            team_table[c] = pd.to_numeric(team_table[c], errors="coerce").fillna(0.0).round(2)

    out_path = os.path.join(out_dir, out_name)
    team_table.to_csv(out_path, index=False)
    print("Written team table / Table équipes écrite")
    return team_table

# Function execution / Execution de la fonction
teams_df = build_team_stats_table()


Written team table / Table équipes écrite


### Player

In [7]:
# Aggregation of player statistics / Agrégation des statistiques des joueurs
def build_players_table(result_df: pd.DataFrame,player_all: pd.DataFrame,out_dir: str = os.path.join("src", "data", "players"),
    out_name: str = "player.csv",per90: bool = True) -> pd.DataFrame:
    os.makedirs(out_dir, exist_ok=True)

    id_cols = ["player_id", "player_name", "player_position", "team_shortname", "match_id"]

    # We collect statistics per player / On récupère les statistiques par joueur
    left_keep  = [c for c in id_cols if c in result_df.columns]
    right_keep = ["match_id", "player_id", "minutes_played",
                  "team_shortname_json", "player_name_json", "player_position_json"]

    merged = result_df[left_keep].merge(
        player_all[right_keep],
        on=["match_id", "player_id"],
        how="left"
    )

    metric_cols = [
        c for c in result_df.columns
        if c not in {"match_id","player_id","player_name","player_position","team_shortname"}
    ]
    metric_cols = [c for c in metric_cols if pd.api.types.is_numeric_dtype(result_df[c])]

    # Data Merge / Fusion des données
    merged = merged.merge(
        result_df[["match_id","player_id"] + metric_cols],
        on=["match_id","player_id"],
        how="left"
    )

    # Replacing missing values / On remplace les valeurs manquantes
    merged["player_name"] = merged["player_name"].fillna(merged["player_name_json"])
    merged["player_position"] = merged["player_position"].fillna(merged["player_position_json"])
    merged["team_shortname"] = merged["team_shortname"].fillna(merged["team_shortname_json"])
    merged["minutes_played"] = pd.to_numeric(merged["minutes_played"], errors="coerce").fillna(0.0)

    # Keys for aggregation / Clés d'agrégation
    group_keys = ["player_id","player_name","player_position","team_shortname"]

    # Matches played / Matchs joués
    match_played = (
        merged.loc[merged["minutes_played"] > 0]
              .groupby(group_keys, as_index=False)["match_id"].nunique()
              .rename(columns={"match_id":"match_played"})
    )

    # Total minutes / Minutes totales
    minutes_total = (
        merged.groupby(group_keys, as_index=False)["minutes_played"]
              .sum()
              .rename(columns={"minutes_played":"minutes_total"})
    )

    # top_choice categories
    pct_cols      = [c for c in metric_cols if c.startswith("top_choice") and not c.startswith("top_choice_attempts")]
    attempts_cols = [c for c in metric_cols if c.startswith("top_choice_attempts")]
    other_cols    = [c for c in metric_cols if c not in pct_cols + attempts_cols]

    # Sum for non-choice metrics / Somme pour métriques hors top_choice
    agg_other = merged.groupby(group_keys, as_index=False)[other_cols].sum() if other_cols else merged[group_keys].drop_duplicates()

    # Weighted top_choice and average attempts / top_choice pondéré et attempts moyens
    from functools import reduce
    frames_choice = []

    # Pair (pct, attempts) / Paires (pct, attempts)
    pairs = []
    for pct in pct_cols:
        suffix = pct[len("top_choice"):]
        att = "top_choice_attempts" + suffix
        if att in attempts_cols:
            pairs.append((pct, att))

    for pct_col, att_col in pairs:
        tmp = (
            merged.groupby(group_keys)[[pct_col, att_col]]
                  .apply(
                      lambda g: pd.Series({
                          # Weighted average of top_choice / Moyenne pondérée du top_choice
                          pct_col: (g[pct_col] * g[att_col]).sum() / g[att_col].sum()
                                   if g[att_col].sum() > 0 else 0.0,

                          # Average number of decisions per match / Nombre moyen de décisions par match
                          att_col: g[att_col].mean()
                      })
                  )
                  .reset_index()
        )
        frames_choice.append(tmp)

    totals_choice = (
        reduce(lambda left, right: left.merge(right, on=group_keys, how="outer"), frames_choice)
        if frames_choice else None
    )

    # Merge all metrics / Fusion finale des métriques
    if totals_choice is not None:
        totals = agg_other.merge(totals_choice, on=group_keys, how="left")
    else:
        totals = agg_other.copy()

    # Add minutes and matches / Ajout minutes et matchs
    players_table = (
        totals.merge(minutes_total, on=group_keys, how="left")
              .merge(match_played,  on=group_keys, how="left")
    )

    players_table["match_played"] = players_table["match_played"].fillna(0).astype(int)

    # Remove players under 90 minutes played / Enlever les joueurs ayant moins de 90 minutes jouées
    players_table = players_table[players_table["minutes_total"] >= 90]

    # per90 only for non-top_choice metrics / per90 seulement pour les métriques non top_choice
    if per90:
        for c in other_cols:
            per90_col = f"{c}_per90"
            players_table[per90_col] = np.where(
                players_table["minutes_total"] > 0,
                90 * players_table[c] / players_table["minutes_total"],
                0.0
            )

    # Rounding / Arrondis
    for c in players_table.columns:
        if c not in group_keys:
            players_table[c] = pd.to_numeric(players_table[c], errors="coerce").fillna(0.0).round(2)

    # Save / Sauvegarde
    out_path = os.path.join(out_dir, out_name)
    players_table.to_csv(out_path, index=False)
    print("Player table written / Table joueurs écrite")

    return players_table

# Function execution / Exécution de la fonction
players_df = build_players_table(result_df, player_all)

Player table written / Table joueurs écrite
