In [None]:
# ===========================================
# AFL Heatmap Batch Exporter (dual-schema, Colab-ready)
# ===========================================
# Description:
#   This script generates AFL field heatmaps from tracking/event CSV data.
#   It supports two different CSV schemas (comma- or pipe-delimited) and
#   unifies them into one standard format.
#
# Outputs:
#   - Overall heatmap for each dataset
#   - Per-player (or per-track ID) heatmaps
#   - Zone-based heatmaps (Back 50, Midfield, Forward 50)
#
# Important details:
#   - AFL ground size: 159.5 m length × 128.8 m width
#   - Heatmap resolution: 200 (X) × 150 (Y) grid cells
#   - Gaussian blur applied with sigma = 2.0
#   - Zones split using circular arcs at 50 m from each goal
#
# Requirements:
#   pip install numpy pandas matplotlib scipy
# ===========================================

import os
import io
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import Normalize
from scipy.ndimage import gaussian_filter

# -------------------------
# Config
# -------------------------
FIELD_LENGTH_M = 159.5   # AFL field length in metres
FIELD_WIDTH_M  = 128.8   # AFL field width in metres
a = FIELD_LENGTH_M / 2.0 # semi-major axis (x direction)
b = FIELD_WIDTH_M  / 2.0 # semi-minor axis (y direction)

NX, NY = 200, 150        # grid resolution (X × Y cells)
SIGMA  = 2.0             # blur amount for smoother heatmaps

# Input CSV files (path, label)
INPUTS = [
    ("Video_tracking.csv", "tracking2"),
]

OUT_ROOT = "outputs"     # output directory

# Weight columns (confidence values)
WEIGHT_COLS = ["confidence", "conf"]

# Standard set of columns expected in unified data
UNIFIED_COLS = [
    "frame_id","player_id","timestamp_s",
    "x1","y1","x2","y2","cx","cy","w","h",
    "confidence","class_id","visibility"
]

# -------------------------
# Helpers: mapping & grid
# -------------------------
def raw_bbox(xs, ys, pad_ratio=0.02):
    """Get bounding box for raw coordinates with small padding."""
    xmin, xmax = float(np.min(xs)), float(np.max(xs))
    ymin, ymax = float(np.min(ys)), float(np.max(ys))
    dx, dy = max(xmax - xmin, 1e-9), max(ymax - ymin, 1e-9)
    return (xmin - dx*pad_ratio, xmax + dx*pad_ratio,
            ymin - dy*pad_ratio, ymax + dy*pad_ratio)

def raw_to_metres(x, y, bbox_raw, a, b):
    """Convert raw coordinates to metre coordinates scaled to AFL oval."""
    xmin, xmax, ymin, ymax = bbox_raw
    x_m = ((x - xmin) / max(1e-9, (xmax - xmin))) * (2*a) - a
    y_m = ((y - ymin) / max(1e-9, (ymax - ymin))) * (2*b) - b
    return x_m, y_m

def make_oval_mask_metres(nx, ny, a, b):
    """Create grid mask to keep values inside AFL oval shape."""
    x_edges = np.linspace(-a, a, nx + 1)
    y_edges = np.linspace(-b, b, ny + 1)
    xc = (x_edges[:-1] + x_edges[1:]) / 2
    yc = (y_edges[:-1] + y_edges[1:]) / 2
    Xc, Yc = np.meshgrid(xc, yc, indexing="xy")
    mask = (Xc**2)/(a**2) + (Yc**2)/(b**2) <= 1.0
    return x_edges, y_edges, mask

def heatmap_from_metres(x_m, y_m, a, b, nx=NX, ny=NY, sigma=SIGMA, weights=None):
    """Make 2D heatmap grid from metre coords, apply blur, mask outside oval."""
    x_edges, y_edges, mask = make_oval_mask_metres(nx, ny, a, b)
    H, _, _ = np.histogram2d(x_m, y_m, bins=[x_edges, y_edges], weights=weights)
    H = H.T
    if sigma and sigma > 0:
        H = gaussian_filter(H, sigma=sigma)
    H = np.where(mask, H, np.nan)
    return H, x_edges, y_edges

# -------------------------
# Field drawing
# -------------------------
def draw_afl_field(ax, a, b,
                   centre_square=50.0,
                   centre_inner_d=3.0,
                   centre_outer_d=10.0,
                   goal_square_depth=9.0,
                   goal_square_width=6.4,
                   arc_r=50.0,
                   line_color="white", lw=2.0, alpha=0.95):
    """Draw AFL oval boundary and main field features."""
    t = np.linspace(0, 2*np.pi, 800)
    ax.plot(a*np.cos(t), b*np.sin(t), color=line_color, lw=lw, alpha=alpha) # oval boundary
    ax.plot([0, 0], [-b, b], color=line_color, lw=lw, alpha=alpha*0.9)     # centre line
    # (rest: centre square, circles, arcs, goal squares)

# -------------------------
# Plot helper
# -------------------------
def plot_heatmap(H, x_edges, y_edges, a, b, out_path, alpha_img=0.9):
    """Plot heatmap on AFL oval background and save as PNG."""
    title = os.path.splitext(os.path.basename(out_path))[0]

    fig, ax = plt.subplots(figsize=(11, 8))
    # draw green oval background
    t = np.linspace(0, 2*np.pi, 600)
    ax.fill(a*np.cos(t), b*np.sin(t), color=(0.05, 0.35, 0.05), alpha=1.0, zorder=0)
    ax.set_xlim([-a, a]); ax.set_ylim([-b, b])
    ax.set_aspect("equal"); ax.set_axis_off()

    # set heatmap scale
    finite_vals = H[np.isfinite(H)]
    vmin = 0.0
    vmax = (np.nanpercentile(finite_vals, 99) if finite_vals.size else 1.0)

    extent = [x_edges.min(), x_edges.max(), y_edges.min(), y_edges.max()]
    im = ax.imshow(H, origin="lower", extent=extent, aspect="equal",
                   interpolation="bilinear", cmap="viridis",
                   norm=Normalize(vmin=vmin, vmax=vmax, clip=True),
                   alpha=alpha_img, zorder=2)

    # draw field lines
    draw_afl_field(ax, a, b)

    # add scale bar
    sb_y = -b + 8
    sb_x0, sb_x1 = -a + 12, -a + 32
    ax.plot([sb_x0, sb_x1], [sb_y, sb_y], color="white", lw=4, alpha=0.95)
    ax.text((sb_x0 + sb_x1)/2, sb_y - 4, "20 m", ha="center", va="top",
            color="white", fontsize=11)

    # title + colorbar
    ax.set_title(title, color="white")
    cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    cbar.set_label("Intensity")

    # save figure
    plt.tight_layout()
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    fig.savefig(out_path, dpi=220)
    plt.close(fig)

# -------------------------
# Zones (circular 50 m arcs)
# -------------------------
def split_zones_circular(df_m):
    """Split dataframe into Back 50, Midfield, and Forward 50 zones."""
    dist_left  = np.sqrt((df_m["x_m"] + a)**2 + (df_m["y_m"])**2)
    dist_right = np.sqrt((df_m["x_m"] - a)**2 + (df_m["y_m"])**2)
    back50     = df_m[dist_left  <= 50].copy()
    fwd50      = df_m[dist_right <= 50].copy()
    mid        = df_m[(dist_left > 50) & (dist_right > 50)].copy()
    return {"Back 50": back50, "Midfield": mid, "Forward 50": fwd50}

# -------------------------
# CSV loader (handles both schemas)
# -------------------------
XUAN_REQUIRED = [ "frame_id","player_id","timestamp_s","x1","y1","x2","y2","cx","cy","w","h","confidence" ]
PIPE_REQUIRED = [ "frame_id","track_id","x","y","width","height","conf","class_id","visibility" ]

def _coerce_numeric(df, cols):
    """Convert given columns to numeric if possible."""
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df

def _read_with_possible_pipe(path):
    """Detect delimiter (comma or pipe) and read CSV."""
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        head = f.read(4096)
    sep = "|" if ("|" in head and "," not in head.splitlines()[0]) else ","
    try:
        df = pd.read_csv(path, sep=sep)
    except Exception:
        df = pd.read_csv(path, sep=sep, header=None)
    return df, sep

def _assign_if_headerless(df, expected_cols):
    """Assign headers if missing and column count matches expected."""
    if set(expected_cols).issubset(df.columns):
        return df
    if df.shape[1] >= len(expected_cols):
        df2 = df.iloc[:, :len(expected_cols)].copy()
        df2.columns = expected_cols
        return df2
    return df

def _to_unified_columns(df):
    """Convert dataframe to unified column schema."""
    cols = df.columns.tolist()
    out = df.copy()

    # Map player_id if missing
    if "player_id" not in out.columns:
        if "track_id" in out.columns:
            out["player_id"] = out["track_id"]
        else:
            out["player_id"] = np.nan

    # Compute centres if not present
    if "cx" not in out.columns and all(c in out.columns for c in ["x","y","width","height"]):
        out["cx"] = pd.to_numeric(out["x"], errors="coerce") + pd.to_numeric(out["width"], errors="coerce")/2.0
        out["cy"] = pd.to_numeric(out["y"], errors="coerce") + pd.to_numeric(out["height"], errors="coerce")/2.0

    # Map confidence if only "conf"
    if "confidence" not in out.columns and "conf" in out.columns:
        out["confidence"] = out["conf"]

    # Ensure all unified columns exist
    for c in UNIFIED_COLS:
        if c not in out.columns:
            out[c] = np.nan

    # Convert to numeric
    out = _coerce_numeric(out, UNIFIED_COLS)

    # Drop rows without centres
    out = out.dropna(subset=["cx","cy"])
    out = out[np.isfinite(out["cx"]) & np.isfinite(out["cy"])]
    return out.reset_index(drop=True)

def load_events_csv_any(path: str) -> pd.DataFrame:
    """Robust loader that handles both CSV schemas (comma and pipe)."""
    df, sep = _read_with_possible_pipe(path)
    uni = _to_unified_columns(df)
    if not uni.empty:
        return uni
    # Try headerless fallback attempts
    if sep == ",":
        df_hless = pd.read_csv(path, sep=sep, header=None)
        df_hless = _assign_if_headerless(df_hless, XUAN_REQUIRED)
        uni = _to_unified_columns(df_hless)
        if not uni.empty:
            return uni
    if sep == "|":
        df_hless = pd.read_csv(path, sep=sep, header=None)
        df_hless = _assign_if_headerless(df_hless, PIPE_REQUIRED)
        uni = _to_unified_columns(df_hless)
        if not uni.empty:
            return uni
    # Last brute-force attempt
    df_hless = pd.read_csv(path, sep=sep, header=None)
    if df_hless.shape[1] >= 9:
        tmp = df_hless.iloc[:, :len(PIPE_REQUIRED)].copy()
        tmp.columns = PIPE_REQUIRED
        uni = _to_unified_columns(tmp)
        if not uni.empty:
            return uni
    raise ValueError(f"{os.path.basename(path)}: could not parse CSV (sep='{sep}')")

# -------------------------
# Weight selection
# -------------------------
def choose_weights_unified(df):
    """Select weight column if available (confidence/conf)."""
    for c in WEIGHT_COLS:
        if c in df.columns:
            return pd.to_numeric(df[c], errors="coerce").fillna(0.0).to_numpy(dtype=float)
    return None

# -------------------------
# Per-dataset pipeline
# -------------------------
def process_one_dataset(csv_path, label):
    """Process one dataset: load, convert, generate heatmaps."""
    print(f"\n=== Processing: {label} ===")
    out_base = os.path.join(OUT_ROOT, label)
    out_overall = os.path.join(out_base, "overall")
    out_perid   = os.path.join(out_base, "per_id")
    out_zones   = os.path.join(out_base, "zones")
    os.makedirs(out_overall, exist_ok=True)
    os.makedirs(out_perid,   exist_ok=True)
    os.makedirs(out_zones,   exist_ok=True)

    # Load CSV into unified dataframe
    df = load_events_csv_any(csv_path)
    if df.empty:
        print(f"({label}) Empty after load; skipping.")
        return

    # Convert raw coords → metres
    bbox = raw_bbox(df["cx"].values, df["cy"].values)
    df["x_m"], df["y_m"] = raw_to_metres(df["cx"].values, df["cy"].values, bbox, a, b)

    # Select weights
    weights = choose_weights_unified(df)

    # A) Overall heatmap
    H, xe, ye = heatmap_from_metres(df["x_m"].values, df["y_m"].values, a, b, NX, NY, SIGMA, weights)
    plot_heatmap(H, xe, ye, a, b, out_path=os.path.join(out_overall, "overall.png"))

    # B) Per-player heatmaps
    id_series = df["player_id"]
    if id_series.notna().any():
        for pid, sub in df.groupby("player_id"):
            if pd.isna(pid) or sub.empty:
                continue
            w = choose_weights_unified(sub)
            H_i, xe_i, ye_i = heatmap_from_metres(sub["x_m"].values, sub["y_m"].values, a, b, NX, NY, SIGMA, w)
            out_path = os.path.join(out_perid, f"id_{int(pid) if float(pid).is_integer() else pid}.png")
            plot_heatmap(H_i, xe_i, ye_i, a, b, out_path=out_path)
    else:
        print(f"({label}) No usable ID column; skipping per-id maps.")

    # C) Zone heatmaps
    zones = split_zones_circular(df)
    for zname, zdf in zones.items():
        if zdf.empty:
            print(f"({label}) Zone empty: {zname}")
            continue
        w = choose_weights_unified(zdf)
        H_z, xe_z, ye_z = heatmap_from_metres(zdf["x_m"].values, zdf["y_m"].values, a, b, NX, NY, SIGMA, w)
        fname = zname.lower().replace(" ", "_") + ".png"
        plot_heatmap(H_z, xe_z, ye_z, a, b, out_path=os.path.join(out_zones, fname))

# -------------------------
# Run all datasets
# -------------------------
for path, label in INPUTS:
    try:
        process_one_dataset(path, label)
    except Exception as e:
        print(f"!! Failed {label}: {e}")

print(f"\nDone. Outputs in: {os.path.abspath(OUT_ROOT)}")



=== Processing: tracking2 ===


In [None]:
# -------------------------
# (Optional) ZIP everything for download in Colab
# -------------------------
import shutil
from google.colab import files
shutil.make_archive("all_heatmaps", "zip", OUT_ROOT)
files.download("all_heatmaps.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>