In [None]:

import os
from typing import Tuple

import numpy as np

from pong_simulation.pong_simulation import PongH5Reader


In [None]:
def compute_cumulative_score_over_time(h5_path: str, start_frac: float = 0.0, until_frac: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
    """Compute cumulative max score up to each timestep, within a time slice.

    Scoring rule: each side-to-side-to-side alternation without a restart adds 1 point.
    Equivalently, within a round (between restarts), every two left/right transitions add 1.
    """
    if not os.path.exists(h5_path):
        raise FileNotFoundError(f"HDF5 file not found: {h5_path}")

    with PongH5Reader(h5_path) as data:
        ball = data.ball_pos[...]
        attrs = data.attrs
        dt = float(attrs.get("dt", 0.0))
        Tsteps = int(ball.shape[0])

        if not (0.0 <= start_frac <= 1.0) or not (0.0 <= until_frac <= 1.0):
            raise ValueError("--from and --until must be between 0 and 1 (inclusive)")
        if start_frac >= until_frac:
            raise ValueError("--from must be less than --until")

        start_idx = int(np.floor(start_frac * Tsteps))
        end_idx = int(np.ceil(until_frac * Tsteps))
        start_idx = max(0, min(start_idx, Tsteps))
        end_idx = max(0, min(end_idx, Tsteps))
        if end_idx - start_idx <= 0:
            raise ValueError("Slicing range is empty after applying --from/--until")

        x = ball[:, 0].astype(float)
        mid = 0.5 * float(np.nanmax(x)) if x.size else 0.0
        side = np.sign(x - mid).astype(int)  # -1: left, 0: mid, +1: right
        # Forward-fill zeros (midline) to avoid spurious transitions
        if side.size:
            # If initial is 0, find first non-zero and backfill
            if side[0] == 0:
                nz = np.flatnonzero(side)
                if nz.size:
                    side[:nz[0]] = side[nz[0]]
                else:
                    side[:] = -1  # default to left if entirely zero
            for i in range(1, side.size):
                if side[i] == 0:
                    side[i] = side[i - 1]

        trans = np.zeros_like(side, dtype=int)
        if side.size >= 2:
            trans[1:] = (side[1:] != side[:-1]).astype(int)
        cumsum_trans = np.cumsum(trans)

        # Restart indices (global), include 0 as implicit start
        rst = detect_round_starts(ball)
        rst = np.r_[0, rst]
        rst_ext = np.r_[rst, Tsteps]

        cumulative_score = np.zeros(Tsteps, dtype=int)
        base = 0
        for k in range(len(rst)):
            seg_start = int(rst[k])
            seg_end_excl = int(rst_ext[k + 1])  # exclusive
            if seg_end_excl <= seg_start:
                continue
            # Precompute base for this segment over time
            for t in range(seg_start, seg_end_excl):
                seg_trans = cumsum_trans[t] - cumsum_trans[seg_start]
                seg_points = seg_trans // 2
                cumulative_score[t] = base + int(seg_points)
            # Completed points in this segment
            seg_trans_total = cumsum_trans[seg_end_excl - 1] - cumsum_trans[seg_start]
            base += int(seg_trans_total // 2)

        # Slice to window
        time_s = np.arange(start_idx, end_idx, dtype=float) * dt
        score_slice = cumulative_score[start_idx:end_idx]

    return time_s, score_slice, side


In [None]:
time