Imports

In [None]:
import os
from pathlib import Path
import numpy as np
import pandas as pd
from dash import Dash, dcc, html, Input, Output, State, dash_table
import plotly.express as px
import plotly.graph_objects as go
from flask_caching import Cache
from copy import deepcopy
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import silhouette_score

Konstante

In [None]:
DEFAULT_FEATURES_PATH = "/Users/tunahansari/football_ra/out_1hz_clean/master_1hz_4s_ready.parquet"
FEATURES_PATH = os.getenv("FEATURES_PATH", DEFAULT_FEATURES_PATH)
MAX_HEATMAP_POINTS = int(os.getenv("MAX_HEATMAP_POINTS", 60_000))
SPEED_HIST_BINS, RQA_SHOW_MAX_GAMES = 36, 6
RQA_DEFAULT_RR, RQA_DEFAULT_LMIN, RQA_DEFAULT_VMIN = 0.10, 2, 2
RQA_FEATURE_WEIGHTS, RQA_STANDARDIZE = (1.0, 1.0, 0.6), True
RQA_CLASSIC_DEFAULT_MAXPTS = int(os.getenv("RQA_CLASSIC_MAXPTS", 3000))
RQA_CLASSIC_DEFAULT_DECIM = int(os.getenv("RQA_CLASSIC_DECIM", 1))

Laden der Daten

In [None]:
# ========================
# DATA LOADING
# ========================
NEEDED_COLS = [
    "play_uuid", "player_id", "t_sec", "x_norm", "y", "player_name", "position_code",
    "team_id", "game_id", "home_abbr", "away_abbr", "offense_team_id", "defense_team_id",
    "play_quarter", "play_down", "play_yards_to_go", "play_type", "dx", "dy", "speed", "heading_deg"
]

def _detect_y_col(df): return "y" if "y" in df.columns else "step_y" if "step_y" in df.columns else None

def load_data(path):
    path = str(path)
    if not Path(path).exists():
        raise FileNotFoundError(f"Features file not found: {path}")
    df0 = pd.read_parquet(path, columns=None)
    y_col = _detect_y_col(df0)
    if not y_col: raise KeyError("Neither 'y' nor 'step_y' found.")
    cols = [c for c in NEEDED_COLS if c in df0.columns]
    for c in [y_col, "x_norm", "t_sec", "game_id", "play_uuid", "player_id"]:
        if c not in cols and c in df0.columns: cols.append(c)
    df = df0[cols].copy()
    del df0
    for cat in ["player_id", "player_name", "position_code", "team_id", "game_id", "home_abbr", "away_abbr", "play_type", "play_uuid"]:
        if cat in df.columns: df[cat] = df[cat].astype("category")
    need = {"game_id", "play_uuid", "player_id", "t_sec", "x_norm", y_col}
    if missing := [c for c in need if c not in df.columns]:
        raise ValueError(f"Fehlende Kernspalten im FEATURES_PATH: {missing}")
    df = df.sort_values(["game_id", "play_uuid", "player_id", "t_sec"], kind="mergesort")
    grp = ["game_id", "play_uuid", "player_id"]
    need_recalc_dx = ("dx" not in df.columns) or df["dx"].isna().any()
    need_recalc_dy = ("dy" not in df.columns) or df["dy"].isna().any()
    if need_recalc_dx: df["dx"] = df.groupby(grp, observed=True)["x_norm"].diff().fillna(0.0)
    if need_recalc_dy: df["dy"] = df.groupby(grp, observed=True)[y_col].diff().fillna(0.0)
    if any([need_recalc_dx, need_recalc_dy]) or ("speed" not in df.columns) or df["speed"].isna().any():
        df["speed"] = np.hypot(df["dx"], df["dy"]).astype(float)
    if ("heading_deg" not in df.columns) or df["heading_deg"].isna().any():
        df["heading_deg"] = np.degrees(np.arctan2(df["dy"], df["dx"])).fillna(0.0)
    game_labels = {}
    if {"game_id", "home_abbr", "away_abbr"}.issubset(df.columns):
        gmeta = df.groupby("game_id", observed=True)[["home_abbr", "away_abbr"]].first()
        game_labels = {gid: f"{row['home_abbr']} vs {row['away_abbr']}  •  {gid}" for gid, row in gmeta.iterrows()}
    return df, y_col, game_labels

DF, Y_COL, GAME_LABELS = load_data(FEATURES_PATH)
DF["play_uuid_str"] = DF["play_uuid"].astype(str)
opt = lambda lst: [{"label": str(v), "value": v} for v in lst]
positions_all = sorted(map(str, DF["position_code"].dropna().unique())) if "position_code" in DF else []
playtypes_all = sorted(map(str, DF["play_type"].dropna().unique())) if "play_type" in DF else []
players_all = DF["player_name"].dropna().value_counts().head(200).index.tolist() if "player_name" in DF else []
games_all = DF["game_id"].dropna().unique().tolist() if "game_id" in DF else []

Play-Features und Cluster

In [None]:
# ========================
# PLAY-FEATURES & CLUSTER
# ========================
def make_features_from_timeseries(df, id_col="play_uuid", y_col="y"):
    feats = []
    for pid, g in df.groupby(id_col, observed=True):
        gg = g.sort_values("t_sec", kind="mergesort")
        row = {id_col: pid, "n_samples": len(gg)}
        for col in ["x_norm", y_col, "speed"]:
            if col in gg.columns:
                vals = gg[col].to_numpy(float)
                med = float(np.nanmedian(vals))
                mad = float(np.nanmedian(np.abs(vals - med)))
                q75, q25 = np.nanpercentile(vals, 75), np.nanpercentile(vals, 25)
                iqr = float(q75 - q25)
                slope = float(np.polyfit(np.arange(len(vals)), vals, 1)[0]) if len(vals) >= 3 else 0.0
                base = col if col != y_col else "y"
                row.update({f"{base}_med": med, f"{base}_mad": mad, f"{base}_iqr": iqr, f"{base}_trend_lr": slope})
        feats.append(row)
    return pd.DataFrame(feats)

FEATURES = make_features_from_timeseries(DF, id_col="play_uuid", y_col=Y_COL)

def cluster_fit_add_labels(FEATURES, id_col="play_uuid", use_pca=True, pca_var=0.90):
    feature_cols = [c for c in FEATURES.select_dtypes(include=[np.number]).columns if c not in {id_col, 'n_samples'}]
    if len(feature_cols) < 2: raise ValueError("Zu wenig numerische Feature-Spalten für Clustering.")
    X = FEATURES[feature_cols].astype(float).fillna(FEATURES[feature_cols].median(numeric_only=True))
    Xs = StandardScaler().fit_transform(X)
    Xc = PCA(n_components=pca_var, svd_solver="full", random_state=0).fit_transform(Xs) if use_pca else Xs
    best = (-np.inf, None, None)
    for k in range(2, 9):
        km = KMeans(n_clusters=k, n_init=20, random_state=0)
        lab = km.fit_predict(Xc)
        sil = silhouette_score(Xc, lab) if len(set(lab)) > 1 else -np.inf
        if sil > best[0]: best = (sil, k, km)
    sil, k_best, km_best = best
    labs_km = km_best.predict(Xc)
    labs_agg = AgglomerativeClustering(n_clusters=k_best, linkage="ward").fit_predict(Xc)
    OUT = FEATURES.copy()
    OUT["cl_kmeans"], OUT["cl_agg"] = labs_km, labs_agg
    meta = {"k_best": k_best, "silhouette": sil, "n_features": len(feature_cols), "pca_used": use_pca, "pca_var": pca_var}
    return OUT, meta

FEATURES, CLUSTER_META = cluster_fit_add_labels(FEATURES)
DF = DF.merge(FEATURES[["play_uuid", "cl_kmeans", "cl_agg"]], on="play_uuid", how="left")
clusters_all = sorted(DF["cl_kmeans"].dropna().unique().astype(int).tolist()) if "cl_kmeans" in DF else []

App Cache und Speicher

In [None]:
app = Dash(__name__)
app.title = "Football RA • CRP • RQA Dashboard"
cache = Cache(app.server, config={"CACHE_TYPE": "SimpleCache", "CACHE_DEFAULT_TIMEOUT": 300})

def _key(x):
    if x is None: return "Ø"
    if isinstance(x, (list, tuple)): return tuple(x)
    return x

@cache.memoize()
def filtered_df_cache(positions, playtypes, players, games, clusters, t0, t1, cols_tuple):
    q = DF
    if positions and "position_code" in q: q = q[q["position_code"].isin(positions)]
    if playtypes and "play_type" in q: q = q[q["play_type"].isin(playtypes)]
    if players and "player_name" in q: q = q[q["player_name"].isin(players)]
    if games and "game_id" in q: q = q[q["game_id"].isin(games)]
    if clusters and "cl_kmeans" in q: q = q[q["cl_kmeans"].isin(clusters)]
    if "t_sec" in q: q = q[(q["t_sec"] >= t0) & (q["t_sec"] <= t1)]
    cols = [c for c in cols_tuple if c in q.columns]
    return q[cols].copy()

def filtered_df(sel, cols):
    return filtered_df_cache(
        _key(sel.get("positions")), _key(sel.get("playtypes")), _key(sel.get("players")),
        _key(sel.get("games")), _key(sel.get("clusters")), sel.get("t_range", (0, 3))[0],
        sel.get("t_range", (0, 3))[1], tuple(cols)
    )

def valid_options_from(df):
    return (
        sorted(df["position_code"].dropna().unique().astype(str).tolist()) if "position_code" in df else [],
        sorted(df["play_type"].dropna().unique().astype(str).tolist()) if "play_type" in df else [],
        sorted(df["player_name"].dropna().unique().astype(str).tolist()) if "player_name" in df else [],
        df["game_id"].dropna().unique().tolist() if "game_id" in df else [],

Layout

In [None]:
controls = html.Div([
    html.Div([html.Label("Position(en)"), dcc.Dropdown(id="positions", options=opt(positions_all), multi=True, placeholder="z. B. WR, DB …", persistence=True)], style={"flex":1,"minWidth":220,"marginRight":12}),
    html.Div([html.Label("Play-Typ(en)"), dcc.Dropdown(id="play_types", options=opt(playtypes_all), multi=True, placeholder="z. B. Pass, Rush …", persistence=True)], style={"flex":1,"minWidth":220,"marginRight":12}),
    html.Div([html.Label("Spieler"), dcc.Dropdown(id="players", options=opt(players_all), multi=True, placeholder="Spieler wählen …", persistence=True)], style={"flex":1,"minWidth":260,"marginRight":12}),
    html.Div([html.Label("Spiele"), dcc.Dropdown(id="games", options=[{"label": GAME_LABELS.get(g, str(g)), "value": g} for g in games_all], multi=True, placeholder="Optional Spiele …", persistence=True)], style={"flex":1,"minWidth":260,"marginRight":12}),
    html.Div([html.Label("Cluster (K-Means)"), dcc.Dropdown(id="clusters", options=[{"label": "alle", "value": "__ALL__"}] + [{"label": str(c), "value": int(c)} for c in clusters_all], multi=True, placeholder="Cluster wählen …", persistence=True)], style={"flex":1,"minWidth":220,"marginRight":12}),
    html.Div([html.Label("t_sec"), dcc.RangeSlider(id="t_range", min=0, max=3, step=1, value=[0,3], marks={i:str(i) for i in range(4)}, updatemode="mouseup")], style={"flex":1,"minWidth":220}),
], style={"display":"flex","flexWrap":"wrap","gap":8,"alignItems":"flex-end","marginBottom":10})

app.layout = html.Div([
    html.H3("Football RA • CRP • RQA Dashboard"),
    html.Div([html.Span("Daten: "), html.Code(Path(FEATURES_PATH).name), html.Span(f"  | Zeilen: {len(DF):,}", style={"opacity": .7, "marginLeft": 10})], style={"marginBottom": 8}),
    controls,
    dcc.Tabs(id="tabs", value="tab-overview", children=[
        dcc.Tab(label="Overview", value="tab-overview", children=[
            html.Div(id="kpi-row", style={"display":"flex","gap":12,"flexWrap":"wrap","marginBottom":8}),
            dcc.Graph(id="heatmap_xy", style={"height":"420px"}),
            dcc.Graph(id="profile_means", style={"height":"340px"}),
            dcc.Graph(id="speed_hist", style={"height":"300px"}),
        ]),
        dcc.Tab(label="CRP (Offense vs Defense)", value="tab-crp", children=[
            dcc.Graph(id="crp_chart", style={"height":"420px","marginTop":"10px"}),
            dash_table.DataTable(id="crp_table", page_size=10, 
                                 style_table={"overflowX":"auto"},
                                 style_cell={"padding":"6px","fontFamily":"monospace","fontSize":12}),
        ]),
        dcc.Tab(label="RQA (pro Spiel – illustrativ)", value="tab-rqa", children=[
            html.Div([
                html.Div([html.Label("Ziel-Recurrence Rate (RR)"), dcc.Slider(id="rqa_target_rr", min=0.02, max=0.15, step=0.005, value=RQA_DEFAULT_RR, marks={0.05:"5%",0.1:"10%",0.125:"12.5%",0.15:"15%"})], style={"minWidth":280,"marginRight":16}),
                html.Div([html.Label("l_min"), dcc.Dropdown(id="rqa_lmin", options=[{"label":i,"value":i} for i in [2,3,4]], value=RQA_DEFAULT_LMIN, clearable=False)], style={"width":220,"marginRight":16}),
                html.Div([html.Label("v_min"), dcc.Dropdown(id="rqa_vmin", options=[{"label":i,"value":i} for i in [2,3,4]], value=RQA_DEFAULT_VMIN, clearable=False)], style={"width":220,"marginRight":16}),
                html.Div([html.Label(" "), html.Button("RQA berechnen", id="rqa_compute", n_clicks=0, style={"width":"200px","height":"38px"})]),
            ], style={"display":"flex","flexWrap":"wrap","alignItems":"flex-end","gap":8,"marginBottom":8}),
            html.Div(id="rqa_plots_grid", style={"display":"grid","gridTemplateColumns":"repeat(auto-fit, minmax(260px, 1fr))","gap":"12px"}),
            dash_table.DataTable(id="rqa_table", page_size=10,
                                 style_table={"overflowX":"auto"},
                                 style_cell={"padding":"6px","fontFamily":"monospace","fontSize":12}),
            html.Div(id="rqa_note", style={"opacity":.7,"marginTop":6})
        ]),
        dcc.Tab(label="RQA (klassisch • komplette Serie)", value="tab-rqa-classic", children=[
            html.Div([
                html.Div([html.Label("RR-Modus"), dcc.RadioItems(id="rqac_rr_mode", options=[{"label":"Dynamisch (Slider)", "value":"dynamic"}, {"label":"Vorgefertigt (5% / 10% / 15%)", "value":"preset"}], value="dynamic", inline=True)], style={"minWidth":360,"marginRight":16}),
                html.Div([html.Label("Ziel-RR (dynamisch)"), dcc.Slider(id="rqac_target_rr", min=0.02, max=0.15, step=0.005, value=RQA_DEFAULT_RR, marks={0.05:"5%",0.10:"10%",0.125:"12.5%",0.15:"15%"})], style={"minWidth":280,"marginRight":16}),
                html.Div([html.Label("RR (vorgefertigt)"), dcc.Dropdown(id="rqac_rr_preset", options=[{"label":"5%","value":0.05}, {"label":"10%","value":0.10}, {"label":"15%","value":0.15}], value=RQA_DEFAULT_RR, clearable=False)], style={"width":180,"marginRight":16}),
                html.Div([html.Label("l_min"), dcc.Dropdown(id="rqac_lmin", options=[{"label":i,"value":i} for i in [2,3,4,5]], value=RQA_DEFAULT_LMIN, clearable=False)], style={"width":160,"marginRight":16}),
                html.Div([html.Label("v_min"), dcc.Dropdown(id="rqac_vmin", options=[{"label":i,"value":i} for i in [2,3,4,5]], value=RQA_DEFAULT_VMIN, clearable=False)], style={"width":160,"marginRight":16}),
                html.Div([html.Label("Decimation (jeder k-te Punkt)"), dcc.Input(id="rqac_decim", type="number", min=1, step=1, value=RQA_CLASSIC_DEFAULT_DECIM, style={"width":"120px"})], style={"marginRight":16}),
                html.Div([html.Label("Max. Punkte (Cap)"), dcc.Input(id="rqac_maxpts", type="number", min=200, step=100, value=RQA_CLASSIC_DEFAULT_MAXPTS, style={"width":"140px"})], style={"marginRight":16}),
                html.Div([html.Label(" "), html.Button("Klassische RQA berechnen", id="rqac_compute", n_clicks=0, style={"width":"240px","height":"38px"})]),
            ], style={"display":"flex","flexWrap":"wrap","alignItems":"flex-end","gap":8,"marginBottom":8}),
            dcc.Graph(id="rqac_plot", style={"height":"560px"}),
            dash_table.DataTable(id="rqac_table", page_size=5,
                                 style_table={"overflowX":"auto"},
                                 style_cell={"padding":"6px","fontFamily":"monospace","fontSize":12}),
            html.Div(id="rqac_note", style={"opacity":.7,"marginTop":6})
        ]),
    ]),
])


Filter synchronisation

In [None]:
def _options_union_keep_selected(all_values, filtered_values, selected_values):
    sel_set = set(map(str, selected_values or []))
    vals = set(map(str, filtered_values or [])) | sel_set
    if not vals: vals = set(map(str, all_values or [])) | sel_set
    return opt(sorted(vals))

@app.callback(
    Output("positions","options"), Output("play_types","options"),
    Output("players","options"), Output("games","options"),
    Input("positions","value"), Input("play_types","value"),
    Input("players","value"), Input("games","value"),
)
def sync_filters(pos_v, pt_v, pl_v, gm_v):
    sel_for_pos = dict(positions=[], playtypes=pt_v or [], players=pl_v or [], games=gm_v or [], clusters=[], t_range=(0,3))
    q_pos = filtered_df(sel_for_pos, ["position_code"])
    pos_vals = q_pos["position_code"].dropna().unique().tolist() if "position_code" in q_pos else positions_all
    pos_opts = _options_union_keep_selected(positions_all, pos_vals, pos_v)
    sel_for_pt = dict(positions=pos_v or [], playtypes=[], players=pl_v or [], games=gm_v or [], clusters=[], t_range=(0,3))
    q_pt = filtered_df(sel_for_pt, ["play_type"])
    pt_vals = q_pt["play_type"].dropna().unique().tolist() if "play_type" in q_pt else playtypes_all
    pt_opts = _options_union_keep_selected(playtypes_all, pt_vals, pt_v)
    sel_for_pl = dict(positions=pos_v or [], playtypes=pt_v or [], players=[], games=gm_v or [], clusters=[], t_range=(0,3))
    q_pl = filtered_df(sel_for_pl, ["player_name"])
    pl_vals = q_pl["player_name"].dropna().unique().tolist() if "player_name" in q_pl else players_all
    pl_opts = _options_union_keep_selected(players_all, pl_vals, pl_v)
    sel_for_gm = dict(positions=pos_v or [], playtypes=pt_v or [], players=pl_v or [], games=[], clusters=[], t_range=(0,3))
    q_gm = filtered_df(sel_for_gm, ["game_id"])
    gm_vals = q_gm["game_id"].dropna().unique().tolist() if "game_id" in q_gm else games_all
    gm_opts = [{"label": GAME_LABELS.get(g, str(g)), "value": g} for g in sorted(gm_vals, key=str)]
    return pos_opts, pt_opts, pl_opts, gm_opts

Overview Callback

In [None]:
@app.callback(
    Output("kpi-row","children"), Output("heatmap_xy","figure"),
    Output("profile_means","figure"), Output("speed_hist","figure"),
    Input("positions","value"), Input("play_types","value"),
    Input("players","value"), Input("games","value"), Input("clusters","value"), Input("t_range","value"),
)
def update_overview(pos_v, pt_v, pl_v, gm_v, cl_v, tr_v):
    cl_v_norm = [] if not cl_v else [c for c in cl_v if c != "__ALL__"]
    sel = dict(positions=pos_v or [], playtypes=pt_v or [], players=pl_v or [], games=gm_v or [], clusters=cl_v_norm, t_range=tuple(tr_v or (0,3)))
    q = filtered_df(sel, ["play_uuid","player_id","game_id","t_sec","x_norm",Y_COL,"speed","cl_kmeans"])
    k = lambda label, val: html.Div([html.Div(label, style={"fontSize":12,"opacity":.7}), html.Div(f"{val}", style={"fontSize":22,"fontWeight":600})], style={"padding":"8px 12px","border":"1px solid #eee","borderRadius":8,"minWidth":140})
    kpis = [k("Zeilen", f"{len(q):,}"), k("Unique Plays", q["play_uuid"].nunique()), k("Unique Spieler", q["player_id"].nunique()), k("Unique Spiele", q["game_id"].nunique())]
    if CLUSTER_META: kpis.extend([k("K-Means k", CLUSTER_META.get("k_best", "—")), k("Silhouette", f"{CLUSTER_META.get('silhouette', float('nan')):.3f}")])
    h = q.sample(MAX_HEATMAP_POINTS, random_state=42) if len(q) > MAX_HEATMAP_POINTS else q
    hm = px.density_heatmap(h, x="x_norm", y=Y_COL, nbinsx=60, nbinsy=27, histnorm="").update_layout(title="Dichte: x_norm vs. y (gesampelt)").update_yaxes(scaleanchor="x", scaleratio=53.33/120)
    g = q.groupby("t_sec").agg(mean_x=("x_norm","mean"), mean_y=(Y_COL,"mean"), mean_v=("speed","mean")).reset_index()
    prof = go.Figure()
    for col, name in [("mean_x","mean x_norm"),("mean_y","mean y"),("mean_v","mean speed (yd/s)")]:
        prof.add_trace(go.Scatter(x=g["t_sec"], y=g[col], mode="lines+markers", name=name))
    prof.update_layout(title="Mittelwerte je t_sec", xaxis_title="t_sec", yaxis_title="Wert")
    hist = px.histogram(q, x="speed", nbins=SPEED_HIST_BINS, title="Geschwindigkeit (yd/s)")
    return kpis, hm, prof, hist

CRP Callback

In [None]:
def crp_off_vs_def(df):
    needed = {"t_sec","x_norm",Y_COL,"speed","team_id","offense_team_id","defense_team_id"}
    if not needed.issubset(df.columns):
        gg = df.groupby("t_sec").agg(x=("x_norm","mean"), y=(Y_COL,"mean"), v=("speed","mean")).reset_index()
        out = gg.rename(columns={"x":"off_x","y":"off_y","v":"off_v"})
        for c in ["def_x","def_y","def_v"]: out[c] = out[f"off_{c[-1]}"]
        for c in ["dx","dy","dv"]: out[c] = 0.0
        return out
    is_off = df["team_id"].astype("Int64") == df["offense_team_id"].astype("Int64")
    is_def = df["team_id"].astype("Int64") == df["defense_team_id"].astype("Int64")
    g_off = df[is_off].groupby("t_sec").agg(x=("x_norm","mean"), y=(Y_COL,"mean"), v=("speed","mean")).add_prefix("off_").reset_index()
    g_def = df[is_def].groupby("t_sec").agg(x=("x_norm","mean"), y=(Y_COL,"mean"), v=("speed","mean")).add_prefix("def_").reset_index()
    out = pd.merge(g_off, g_def, on="t_sec", how="outer").sort_values("t_sec")
    out[["off_x","off_y","off_v","def_x","def_y","def_v"]] = out[["off_x","off_y","off_v","def_x","def_y","def_v"]].ffill().bfill()
    out["dx"], out["dy"], out["dv"] = out["off_x"] - out["def_x"], out["off_y"] - out["def_y"], out["off_v"] - out["def_v"]
    return out

@app.callback(
    Output("crp_chart","figure"), Output("crp_table","columns"), Output("crp_table","data"),
    Input("positions","value"), Input("play_types","value"),
    Input("players","value"), Input("games","value"), Input("clusters","value"), Input("t_range","value"),
)
def update_crp(pos_v, pt_v, pl_v, gm_v, cl_v, tr_v):
    cl_v_norm = [] if not cl_v else [c for c in cl_v if c != "__ALL__"]
    sel = dict(positions=pos_v or [], playtypes=pt_v or [], players=pl_v or [], games=gm_v or [], clusters=cl_v_norm, t_range=tuple(tr_v or (0,3)))
    q = filtered_df(sel, ["t_sec","x_norm",Y_COL,"speed","team_id","offense_team_id","defense_team_id"])
    comp = crp_off_vs_def(q)
    fig = go.Figure()
    for col, name in [("off_x","Offense mean x"),("def_x","Defense mean x"),("dx","Δx (Off-Def)")]:
        fig.add_trace(go.Scatter(x=comp["t_sec"], y=comp[col], mode="lines+markers", name=name, line=dict(dash="dash") if col.startswith("d") else None))
    for col, name in [("off_v","Offense mean v"),("def_v","Defense mean v"),("dv","Δv (Off-Def)")]:
        fig.add_trace(go.Scatter(x=comp["t_sec"], y=comp[col], mode="lines+markers", name=name, line=dict(dash="dash") if col.startswith("d") else None))
    fig.update_layout(title="CRP: Offense vs Defense (x & v)", xaxis_title="t_sec")
    cols = [{"name": c, "id": c} for c in comp.columns]
    data = comp.round(3).to_dict("records")
    return fig, cols, data

RQA Helfer

In [None]:

def pairwise_dist(A, w):
    A, w = np.asarray(A, float), np.asarray(w, float).reshape(1, -1)
    diff = A[:, None, :] - A[None, :, :]
    return np.sqrt((diff**2 * w).sum(axis=2))

def recurrence_matrix(arr, target_rr, w=(1.0,1.0,1.0), standardize=True):
    A = np.asarray(arr, float)
    if standardize:
        mu, sd = A.mean(axis=0, keepdims=True), A.std(axis=0, keepdims=True) + 1e-9
        A = (A - mu) / sd
    w = np.asarray(w, float)
    D = pairwise_dist(A, w)
    tri = D[np.triu_indices_from(D, k=1)]
    eps = float(np.quantile(tri, target_rr)) if len(tri) > 0 else 0.0
    return (D <= eps).astype(int), eps

def rqa_metrics(R, l_min=2, v_min=2):
    N = R.size
    RR = R.sum() / N if N > 0 else 0.0
    diag_lengths, vert_lengths = [], []
    for k in range(-(R.shape[0]-1), R.shape[0]):
        d = np.diag(R, k)
        if d.size == 0: continue
        run = 0
        for val in d:
            if val == 1: run += 1
            else:
                if run >= l_min: diag_lengths.append(run)
                run = 0
        if run >= l_min: diag_lengths.append(run)
    DET = (sum(diag_lengths) / R.sum()) if R.sum() > 0 and diag_lengths else 0.0
    Lmax = max(diag_lengths) if diag_lengths else 0
    L = float(np.mean(diag_lengths)) if diag_lengths else 0.0
    if diag_lengths:
        _, cnts = np.unique(diag_lengths, return_counts=True)
        ENTR = float(-(cnts / cnts.sum() * np.log(cnts / cnts.sum() + 1e-12)).sum())
    else: ENTR = 0.0
    for j in range(R.shape[1]):
        col = R[:, j]
        run = 0
        for val in col:
            if val == 1: run += 1
            else:
                if run >= v_min: vert_lengths.append(run)
                run = 0
        if run >= v_min: vert_lengths.append(run)
    LAM = (sum(vert_lengths) / R.sum()) if R.sum() > 0 and vert_lengths else 0.0
    TT = float(np.mean(vert_lengths)) if vert_lengths else 0.0
    if vert_lengths:
        _, cnts_v = np.unique(vert_lengths, return_counts=True)
        ENTR_V = float(-(cnts_v / cnts_v.sum() * np.log(cnts_v / cnts_v.sum() + 1e-12)).sum())
    else: ENTR_V = 0.0
    return dict(RR=RR, DET=DET, L=L, Lmax=Lmax, ENTR=ENTR, LAM=LAM, TT=TT, ENTR_V=ENTR_V)

def game_traj(df_game, y_col):
    g = df_game.groupby("t_sec")[["x_norm", y_col, "speed"]].mean().reindex([0,1,2,3])
    return g.ffill().bfill().to_numpy(float)

def build_full_series(df, y_col, decim, maxpts):
    if df.empty: return np.empty((0, 3), float)
    cols = ["game_id","play_uuid","t_sec","x_norm",y_col,"speed"]
    q = df[cols].copy().sort_values(["game_id","play_uuid","t_sec"])
    q = q.groupby(["game_id","play_uuid","t_sec"], observed=True)[["x_norm", y_col, "speed"]].mean().reset_index()
    series = q[["x_norm", y_col, "speed"]].to_numpy(float)
    if decim is None or decim < 1: decim = 1
    series = series[::decim]
    if maxpts and series.shape[0] > maxpts: series = series[:maxpts, :]
    series = pd.DataFrame(series, columns=["x","y","v"]).ffill().bfill().to_numpy(float)
    return series

RQA Pro Spiel

In [None]:
@app.callback(
    Output("rqa_plots_grid","children"),
    Output("rqa_table","columns"),
    Output("rqa_table","data"),
    Output("rqa_note","children"),
    Input("rqa_compute","n_clicks"),
    State("positions","value"), State("play_types","value"),
    State("players","value"),  State("games","value"),
    State("t_range","value"),
    State("rqa_target_rr","value"), State("rqa_lmin","value"), State("rqa_vmin","value"),
    State("clusters","value"),
    prevent_initial_call=True
)
def compute_rqa(n_clicks, pos_v, pt_v, pl_v, gm_v, tr_v, target_rr, l_min, v_min, cl_v):
    cl_v_norm = [] if not cl_v else [c for c in cl_v if c != "__ALL__"]
    sel = dict(
        positions=pos_v or [], playtypes=pt_v or [], players=pl_v or [], games=gm_v or [],
        clusters=cl_v_norm, t_range=tuple(tr_v or (0,3))
    )
    q = filtered_df(sel, ["game_id","t_sec","x_norm",Y_COL,"speed","home_abbr","away_abbr","cl_kmeans"])

    plots = []
    rows = []

    for game_id, game_df in q.groupby("game_id", observed=True):
        if len(plots) >= RQA_SHOW_MAX_GAMES:
            break

        traj = game_traj(game_df, Y_COL)

        R, eps = recurrence_matrix(
            traj, target_rr,
            w=RQA_FEATURE_WEIGHTS,
            standardize=RQA_STANDARDIZE
        )
        metrics = rqa_metrics(R, l_min=l_min, v_min=v_min)

        home_abbr = game_df["home_abbr"].iloc[0] if "home_abbr" in game_df.columns else "Home"
        away_abbr = game_df["away_abbr"].iloc[0] if "away_abbr" in game_df.columns else "Away"
        fig = px.imshow(
            R, aspect="equal",
            title=f"{home_abbr} vs {away_abbr} (Game {game_id})<br>RR: {metrics['RR']:.3f}, ε: {eps:.4f}",
            color_continuous_scale=["#ffffff", "#000000"]
        )
        plots.append(html.Div([dcc.Graph(figure=fig, style={"height": "240px"})], style={"marginBottom": "12px"}))

        # Zeilen im Original-Format (numerisch), Rundung später zentral
        rows.append({
            "game_id": game_id,
            "epsilon": float(eps),
            "RR": float(metrics["RR"]),
            "DET": float(metrics["DET"]),
            "L": float(metrics["L"]),
            "Lmax": float(metrics["Lmax"]),
            "ENTR": float(metrics["ENTR"]),
            "LAM": float(metrics["LAM"]),
            "TT": float(metrics["TT"]),
            "ENTR_V": float(metrics["ENTR_V"]),
        })

    # Feste Spaltenreihenfolge und 3-Stellen-Rundung wie im Original
    col_order = ["game_id","epsilon","RR","DET","L","Lmax","ENTR","LAM","TT","ENTR_V"]
    if rows:
        data = pd.DataFrame(rows)[col_order].round(3).to_dict("records")
    else:
        data = []

    cols = [{"name": c, "id": c} for c in col_order]

    note = (
        f"Es werden max. {RQA_SHOW_MAX_GAMES} Spiele visualisiert. "
        f"Cluster-Filter: {cl_v_norm if cl_v_norm else 'alle'}. "
        f"Button gedrückt: {n_clicks}. "
        "Hinweis: Die pro-Spiel-Ansicht ist explorativ und nutzt 4 Zeitpunkte @ 1 Hz."
    )
    return plots, cols, data, note

RQA Serie

@app.callback(
    Output("rqac_plot","figure"),
    Output("rqac_table","columns"),
    Output("rqac_table","data"),
    Output("rqac_note","children"),
    Input("rqac_compute","n_clicks"),
    State("positions","value"), State("play_types","value"),
    State("players","value"),  State("games","value"),
    State("t_range","value"),
    State("rqac_rr_mode","value"),
    State("rqac_target_rr","value"), State("rqac_rr_preset","value"),
    State("rqac_lmin","value"), State("rqac_vmin","value"),
    State("rqac_decim","value"), State("rqac_maxpts","value"),
    State("clusters","value"),
    prevent_initial_call=True
)
def compute_rqa_classic(n_clicks, pos_v, pt_v, pl_v, gm_v, tr_v,
                        rr_mode, target_rr, rr_preset, l_min, v_min, decim, maxpts, cl_v):

    cl_v_norm = [] if not cl_v else [c for c in cl_v if c != "__ALL__"]
    sel = dict(
        positions=pos_v or [], playtypes=pt_v or [], players=pl_v or [], games=gm_v or [],
        clusters=cl_v_norm, t_range=tuple(tr_v or (0,3))
    )
    q = filtered_df(sel, ["game_id","play_uuid","t_sec","x_norm",Y_COL,"speed","cl_kmeans"])
    if q.empty:
        empty_fig = go.Figure().update_layout(title="Keine Daten für die aktuelle Auswahl.")
        return empty_fig, [], [], ""

    print("RQA classic — Cluster-Filter:", cl_v_norm if cl_v_norm else "alle",
          "| unique cl in q:", sorted(map(int, q["cl_kmeans"].dropna().unique())) if "cl_kmeans" in q else "—",
          "| N rows:", len(q))

    rr_value = target_rr if rr_mode == "dynamic" else rr_preset

    arr = build_full_series(q, Y_COL,
                            decim=decim or RQA_CLASSIC_DEFAULT_DECIM,
                            maxpts=maxpts or RQA_CLASSIC_DEFAULT_MAXPTS)

    R, eps = recurrence_matrix(
        arr, rr_value,
        w=RQA_FEATURE_WEIGHTS,
        standardize=RQA_STANDARDIZE
    )
    metrics = rqa_metrics(R, l_min=l_min, v_min=v_min)

    fig = px.imshow(
        R, aspect="equal",
        title=f"RQA (klassisch) - RR: {rr_value:.3f}, ε: {eps:.4f}, N: {len(arr)}",
        color_continuous_scale=["#ffffff", "#000000"]
    )

    # Eine Zeile, Metriken als Spalten – 3 Nachkommastellen
    row = {
        "N": int(len(arr)),
        "RR_target": float(rr_value),
        "epsilon": float(eps),
        "RR": float(metrics["RR"]),
        "DET": float(metrics["DET"]),
        "L": float(metrics["L"]),
        "Lmax": float(metrics["Lmax"]),
        "ENTR": float(metrics["ENTR"]),
        "LAM": float(metrics["LAM"]),
        "TT": float(metrics["TT"]),
        "ENTR_V": float(metrics["ENTR_V"]),
    }
    col_order = ["N","RR_target","epsilon","RR","DET","L","Lmax","ENTR","LAM","TT","ENTR_V"]
    data = [pd.Series(row)[col_order].round(3).to_dict()]
    cols = [{"name": c, "id": c} for c in col_order]

    note = (
        f"Serie basiert auf Filterauswahl. Cluster-Filter: {cl_v_norm if cl_v_norm else 'alle'}. "
        f"RR-Modus: {rr_mode}. Decimation: {int(decim or RQA_CLASSIC_DEFAULT_DECIM)}. "
        f"Max. Punkte: {int(maxpts or RQA_CLASSIC_DEFAULT_MAXPTS)}."
    )
    return fig, cols, data, note

Main

In [None]:
if __name__ == "__main__":
    app.run(debug=False, port=int(os.getenv("PORT", 8051)), use_reloader=False)