In [12]:
import io
import os
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from PIL import Image
from IPython.display import display, clear_output

plt.rcParams["figure.dpi"] = 120



In [2]:
def unwrap_obj(x):
    if isinstance(x, np.ndarray) and x.dtype == object:
        try:
            return x.item()
        except Exception:

            return x.tolist()
    return x

def frame_to_jpeg_bytes(frame):
    frame = unwrap_obj(frame)
    if isinstance(frame, dict) and "data" in frame:
        buf = frame["data"]
        if isinstance(buf, np.ndarray):
            return buf.tobytes()
        if isinstance(buf, (bytes, bytearray)):
            return bytes(buf)
        return bytes(np.array(buf, dtype=np.uint8))
    if isinstance(frame, (bytes, bytearray)):
        return bytes(frame)
    if isinstance(frame, np.ndarray) and frame.dtype == np.uint8:
        return frame.tobytes()
    raise TypeError(f"Unsupported image frame type: {type(frame)}")

def get_width_1d(arr):
    arr = np.asarray(arr)
    if arr.ndim == 2 and arr.shape[1] == 1:
        return arr[:, 0].astype(float)
    if arr.ndim == 1:
        return arr.astype(float)
    raise ValueError(f"Unsupported fingertip_width shape: {arr.shape}")

def put_width_back(original_arr, width_1d):
    width_1d = np.asarray(width_1d, dtype=float)
    if original_arr.ndim == 2 and original_arr.shape[1] == 1:
        return width_1d.reshape(-1, 1)
    if original_arr.ndim == 1:
        return width_1d
    raise ValueError(f"Unsupported fingertip_width shape: {original_arr.shape}")

def plot_width_to_png(width, i, y_min=0.0, y_max=0.03, title="width"):
    """
    Render a small plot to PNG bytes, so we can display it via widgets.Image (stable across machines).
    """
    width = np.asarray(width, dtype=float)
    fig = plt.figure(figsize=(6.0, 1.8))
    ax = fig.add_subplot(111)
    ax.plot(width)
    ax.axvline(i, linewidth=1.5)
    ax.set_ylim(y_min, y_max)
    ax.set_title(title)
    ax.set_xlabel("frame")
    ax.set_ylabel("m")
    ax.grid(True, alpha=0.25)
    buf = io.BytesIO()
    fig.tight_layout()
    fig.savefig(buf, format="png")
    plt.close(fig)
    return buf.getvalue()



In [3]:
def detect_anomaly_segments(width, value_min=0.0, value_max=0.03, jump_thresh=0.02, merge_gap=2):
    """
    Return:
      bad_mask: (N,) bool
      segments: list of (start, end) inclusive, merged by small gaps
    """
    w = np.asarray(width, dtype=float)
    n = len(w)
    bad = np.zeros(n, dtype=bool)

    # out of range
    bad |= (w < value_min) | (w > value_max)

    # jump
    dw = np.abs(np.diff(w))
    jump_idx = np.where(dw >= jump_thresh)[0] + 1  # mark current frame i
    bad[jump_idx] = True

    # mask -> segments
    idx = np.where(bad)[0]
    if len(idx) == 0:
        return bad, []

    segs = []
    s = idx[0]
    prev = idx[0]
    for k in idx[1:]:
        if k <= prev + 1:
            prev = k
        else:
            segs.append((s, prev))
            s = k
            prev = k
    segs.append((s, prev))

    # merge small gaps (optional)
    merged = []
    for s, e in segs:
        if not merged:
            merged.append([s, e])
        else:
            ps, pe = merged[-1]
            if s - pe <= merge_gap:
                merged[-1][1] = e
            else:
                merged.append([s, e])
    merged = [(a, b) for a, b in merged]
    return bad, merged


In [4]:
def make_episode_editor(
    npz_path: str,
    out_dir: str,
    image_key_left: str = "rgb2",
    image_key_right: str = "rgb3",
    width_key_left: str = "fingertip_width_left",
    width_key_right: str = "fingertip_width_right",
    value_min: float = 0.0,
    value_max: float = 0.03,
    jump_thresh: float = 0.02,
):
    os.makedirs(out_dir, exist_ok=True)
    data = np.load(npz_path, allow_pickle=True)

    if width_key_left not in data or width_key_right not in data:
        raise KeyError(f"Missing width keys in {npz_path}")

    width_left_raw = data[width_key_left]
    width_right_raw = data[width_key_right]
    width_left = get_width_1d(width_left_raw).copy()
    width_right = get_width_1d(width_right_raw).copy()

    n = len(width_left)
    if len(width_right) != n:
        raise ValueError(f"Length mismatch: left={len(width_left)} right={len(width_right)}")

    frames_left = data[image_key_left] if image_key_left in data else None
    frames_right = data[image_key_right] if image_key_right in data else None

    # ---- anomaly segments ----
    badL, segsL = detect_anomaly_segments(width_left, value_min, value_max, jump_thresh)
    badR, segsR = detect_anomaly_segments(width_right, value_min, value_max, jump_thresh)

    # union segments: easiest = union mask then re-segment
    bad_union = badL | badR
    _, segsU = detect_anomaly_segments(bad_union.astype(float), 0.5, 1.5, 0.5, merge_gap=2)  # trick: treat True as 1.0

    seg_state = {"k": 0}  # pointer into segsU

    # ---- UI ----
    info = widgets.HTML()
    left_title = widgets.HTML()
    right_title = widgets.HTML()

    left_img = widgets.Image(format="jpeg", width=520)
    right_img = widgets.Image(format="jpeg", width=520)

    # width plots as images
    plotL = widgets.Image(format="png", width=520)
    plotR = widgets.Image(format="png", width=520)

    frame_slider = widgets.IntSlider(value=0, min=0, max=n-1, step=1, description="Frame", continuous_update=False,
                                     layout=widgets.Layout(width="75%"))
    prev_btn = widgets.Button(description="‚óÄ Prev", layout=widgets.Layout(width="90px"))
    next_btn = widgets.Button(description="Next ‚ñ∂", layout=widgets.Layout(width="90px"))

    # anomaly navigation
    btn_prev_seg = widgets.Button(description="‚èÆ Prev anomaly", button_style="info")
    btn_next_seg = widgets.Button(description="‚è≠ Next anomaly", button_style="info")
    seg_label = widgets.HTML()

    # edit sliders (range based on current episode)
    lmin, lmax = float(np.min(width_left)), float(np.max(width_left))
    rmin, rmax = float(np.min(width_right)), float(np.max(width_right))

    edit_left = widgets.FloatSlider(value=float(width_left[0]), min=lmin, max=lmax,
                                    step=(lmax - lmin) / 500 if lmax > lmin else 1e-4,
                                    description="L width", readout_format=".5f",
                                    continuous_update=False, layout=widgets.Layout(width="75%"))
    edit_right = widgets.FloatSlider(value=float(width_right[0]), min=rmin, max=rmax,
                                     step=(rmax - rmin) / 500 if rmax > rmin else 1e-4,
                                     description="R width", readout_format=".5f",
                                     continuous_update=False, layout=widgets.Layout(width="75%"))

    apply_left_btn = widgets.Button(description="‚úÖ Apply Left (frame)", button_style="success")
    apply_right_btn = widgets.Button(description="‚úÖ Apply Right (frame)", button_style="success")
    save_btn = widgets.Button(description="üíæ Save NPZ", button_style="warning")

    # range controls
    range_start = widgets.IntSlider(value=0, min=0, max=n-1, step=1, description="Start", continuous_update=False)
    range_end   = widgets.IntSlider(value=0, min=0, max=n-1, step=1, description="End", continuous_update=False)
    btn_set_start = widgets.Button(description="Set Start = Frame")
    btn_set_end   = widgets.Button(description="Set End = Frame")

    btn_apply_left_range  = widgets.Button(description="Apply Left to Range", button_style="success")
    btn_apply_right_range = widgets.Button(description="Apply Right to Range", button_style="success")
    btn_interp_left  = widgets.Button(description="Interpolate Left (start‚Üîend)", button_style="info")
    btn_interp_right = widgets.Button(description="Interpolate Right (start‚Üîend)", button_style="info")

    def _sorted_range():
        a, b = int(range_start.value), int(range_end.value)
        if a > b:
            a, b = b, a
        return a, b

    def render(i: int):
        # labels
        is_bad = bool(bad_union[i])
        badge = '<span style="color:red"><b>ANOMALY</b></span>' if is_bad else '<span style="color:green"><b>OK</b></span>'

        left_title.value  = f"<b>LEFT</b> | frame={i} | width={width_left[i]:.5f} | {badge}"
        right_title.value = f"<b>RIGHT</b> | frame={i} | width={width_right[i]:.5f} | {badge}"

        edit_left.value = float(width_left[i])
        edit_right.value = float(width_right[i])

        # images
        if frames_left is not None:
            try:
                left_img.value = frame_to_jpeg_bytes(frames_left[i])
            except Exception as e:
                left_img.value = b""
                left_title.value += f' <span style="color:red">(decode failed: {e})</span>'
        else:
            left_img.value = b""
            left_title.value += ' <span style="color:gray">(no image key)</span>'

        if frames_right is not None:
            try:
                right_img.value = frame_to_jpeg_bytes(frames_right[i])
            except Exception as e:
                right_img.value = b""
                right_title.value += f' <span style="color:red">(decode failed: {e})</span>'
        else:
            right_img.value = b""
            right_title.value += ' <span style="color:gray">(no image key)</span>'

        # plots (always in [0,0.03] view for clarity)
        plotL.value = plot_width_to_png(width_left, i, y_min=value_min, y_max=value_max, title="Left grasping width")
        plotR.value = plot_width_to_png(width_right, i, y_min=value_min, y_max=value_max, title="Right grasping width")

        # segment label
        if len(segsU) == 0:
            seg_label.value = "<b>Anomaly segments:</b> 0"
        else:
            k = seg_state["k"]
            k = int(np.clip(k, 0, len(segsU)-1))
            seg_state["k"] = k
            s, e = segsU[k]
            seg_label.value = f"<b>Anomaly segments:</b> {len(segsU)} | current: {k+1}/{len(segsU)} = [{s}, {e}]"

        info.value = (
            f"<b>NPZ:</b> {os.path.basename(npz_path)} | <b>N:</b> {n} "
            f"| <b>rule:</b> width‚àà[{value_min:.2f},{value_max:.2f}]m, jump‚â•{jump_thresh:.2f}m "
            f"| <b>L range:</b> [{lmin:.5f},{lmax:.5f}] | <b>R range:</b> [{rmin:.5f},{rmax:.5f}]"
        )

    # frame change
    def on_frame_change(change):
        if change["name"] == "value":
            render(int(change["new"]))
    frame_slider.observe(on_frame_change, names="value")

    # prev/next frame
    prev_btn.on_click(lambda _: setattr(frame_slider, "value", max(frame_slider.min, frame_slider.value - 1)))
    next_btn.on_click(lambda _: setattr(frame_slider, "value", min(frame_slider.max, frame_slider.value + 1)))

    # apply at frame
    def on_apply_left(_):
        i = int(frame_slider.value)
        width_left[i] = float(edit_left.value)
        # refresh anomaly list after changes? (optional; keep fast by not recalculating every click)
        render(i)
    def on_apply_right(_):
        i = int(frame_slider.value)
        width_right[i] = float(edit_right.value)
        render(i)
    apply_left_btn.on_click(on_apply_left)
    apply_right_btn.on_click(on_apply_right)

    # range set
    btn_set_start.on_click(lambda _: setattr(range_start, "value", int(frame_slider.value)))
    btn_set_end.on_click(lambda _: setattr(range_end, "value", int(frame_slider.value)))

    # range apply
    def _apply_left_range(_):
        a, b = _sorted_range()
        width_left[a:b+1] = float(edit_left.value)
        render(int(frame_slider.value))
    def _apply_right_range(_):
        a, b = _sorted_range()
        width_right[a:b+1] = float(edit_right.value)
        render(int(frame_slider.value))
    btn_apply_left_range.on_click(_apply_left_range)
    btn_apply_right_range.on_click(_apply_right_range)

    # interpolate
    def _interp_left(_):
        a, b = _sorted_range()
        if a == b:
            return
        ya, yb = float(width_left[a]), float(width_left[b])
        width_left[a:b+1] = np.linspace(ya, yb, b-a+1)
        render(int(frame_slider.value))
    def _interp_right(_):
        a, b = _sorted_range()
        if a == b:
            return
        ya, yb = float(width_right[a]), float(width_right[b])
        width_right[a:b+1] = np.linspace(ya, yb, b-a+1)
        render(int(frame_slider.value))
    btn_interp_left.on_click(_interp_left)
    btn_interp_right.on_click(_interp_right)

    # anomaly jump
    def _jump_to_seg(k):
        if len(segsU) == 0:
            return
        k = int(np.clip(k, 0, len(segsU)-1))
        seg_state["k"] = k
        s, e = segsU[k]
        frame_slider.value = int(s)  # jump to segment start

    btn_next_seg.on_click(lambda _: _jump_to_seg(seg_state["k"] + 1))
    btn_prev_seg.on_click(lambda _: _jump_to_seg(seg_state["k"] - 1))

    # save
    def do_save():
        out_dict = {k: data[k] for k in data.files}
        out_dict[width_key_left] = put_width_back(width_left_raw, width_left)
        out_dict[width_key_right] = put_width_back(width_right_raw, width_right)
        out_path = os.path.join(out_dir, os.path.basename(npz_path))
        np.savez(out_path, **out_dict)
        info.value = info.value + f' | <span style="color:green"><b>Saved:</b> {out_path}</span>'
        return out_path
    save_btn.on_click(lambda _: do_save())

    # layout
    top = widgets.HBox([prev_btn, frame_slider, next_btn])
    anomaly_bar = widgets.HBox([btn_prev_seg, btn_next_seg, seg_label])

    imgs = widgets.HBox([
        widgets.VBox([left_title, left_img, plotL]),
        widgets.VBox([right_title, right_img, plotR]),
    ])

    edits = widgets.VBox([
        widgets.HBox([edit_left, apply_left_btn]),
        widgets.HBox([edit_right, apply_right_btn]),
        save_btn
    ])

    range_ui = widgets.VBox([
        widgets.HTML("<b>Range edit</b> (set start/end, then Apply/Interpolate)"),
        widgets.HBox([range_start, btn_set_start]),
        widgets.HBox([range_end, btn_set_end]),
        widgets.HBox([btn_apply_left_range, btn_interp_left]),
        widgets.HBox([btn_apply_right_range, btn_interp_right]),
    ])

    ui = widgets.VBox([info, top, anomaly_bar, imgs, edits, range_ui])

    render(0)
    return ui, do_save


In [5]:
def interactive_process_batch(
    npz_paths,
    out_dir,
    image_key_left="rgb2",
    image_key_right="rgb3",
    width_key_left="fingertip_width_left",
    width_key_right="fingertip_width_right",
    auto_save_on_next=False,
):
    if len(npz_paths) == 0:
        print("No episodes found.")
        return

    os.makedirs(out_dir, exist_ok=True)

    state = {"idx": 0, "last_saved": None}
    header = widgets.HTML()

    btn_save = widgets.Button(description="üíæ Save current", button_style="warning")
    btn_next = widgets.Button(description="‚û° Next episode", button_style="info")
    btn_prev = widgets.Button(description="‚¨Ö Prev episode", button_style="info")

    toolbar = widgets.HBox([btn_prev, btn_save, btn_next])

    hooks = {"save": None}
    current_ui_box = widgets.VBox([])  # ÊîæÂΩìÂâç episode ÁöÑ UI

    def show_idx():
        i = state["idx"]
        last = state["last_saved"]
        last_html = ""
        if last is not None:
            last_html = f' | <span style="color:green"><b>last saved:</b> {os.path.basename(last)}</span>'
        header.value = f"<b>[{i+1}/{len(npz_paths)}]</b> {os.path.basename(npz_paths[i])}{last_html}"

    def load_episode():
        # Ê∏ÖÂ±èÂπ∂ÈáçÁîªÊï¥‰ΩìÔºàÊõ¥Á®≥Ôºâ
        clear_output(wait=True)

        npz_path = npz_paths[state["idx"]]
        show_idx()

        ui, do_save = make_episode_editor(
            npz_path=npz_path,
            out_dir=out_dir,
            image_key_left=image_key_left,
            image_key_right=image_key_right,
            width_key_left=width_key_left,
            width_key_right=width_key_right,
        )
        hooks["save"] = do_save

        display(widgets.VBox([header, toolbar, ui]))

    def on_save(_):
        if hooks["save"] is not None:
            out_path = hooks["save"]()
            state["last_saved"] = out_path
            show_idx()

    def step(delta):
        if auto_save_on_next and hooks["save"] is not None:
            out_path = hooks["save"]()
            state["last_saved"] = out_path

        state["idx"] = int(np.clip(state["idx"] + delta, 0, len(npz_paths) - 1))
        load_episode()

    def on_next(_):
        step(+1)

    def on_prev(_):
        step(-1)

    btn_save.on_click(on_save)
    btn_next.on_click(on_next)
    btn_prev.on_click(on_prev)

    load_episode()



In [6]:
def replay_viewer(
    npz_path: str,
    image_key_left="rgb2",
    image_key_right="rgb3",
    width_key_left="fingertip_width_left",
    width_key_right="fingertip_width_right",
    value_min=0.0,
    value_max=0.03,
):
    data = np.load(npz_path, allow_pickle=True)

    wl = get_width_1d(data[width_key_left])
    wr = get_width_1d(data[width_key_right])
    n = len(wl)

    frames_left = data[image_key_left] if image_key_left in data else None
    frames_right = data[image_key_right] if image_key_right in data else None

    frame_slider = widgets.IntSlider(value=0, min=0, max=n-1, step=1, description="Frame",
                                     continuous_update=False, layout=widgets.Layout(width="80%"))

    left_title = widgets.HTML()
    right_title = widgets.HTML()

    left_img = widgets.Image(format="jpeg", width=520)
    right_img = widgets.Image(format="jpeg", width=520)

    plotL = widgets.Image(format="png", width=520)
    plotR = widgets.Image(format="png", width=520)

    info = widgets.HTML()

    def render(i):
        left_title.value  = f"<b>LEFT</b> | frame={i} | width={wl[i]:.5f} m"
        right_title.value = f"<b>RIGHT</b> | frame={i} | width={wr[i]:.5f} m"

        if frames_left is not None:
            left_img.value = frame_to_jpeg_bytes(frames_left[i])
        else:
            left_img.value = b""

        if frames_right is not None:
            right_img.value = frame_to_jpeg_bytes(frames_right[i])
        else:
            right_img.value = b""

        plotL.value = plot_width_to_png(wl, i, y_min=value_min, y_max=value_max, title="Left grasping width")
        plotR.value = plot_width_to_png(wr, i, y_min=value_min, y_max=value_max, title="Right grasping width")

        info.value = f"<b>Replay:</b> {os.path.basename(npz_path)} | <b>N:</b> {n}"

    def on_change(change):
        if change["name"] == "value":
            render(int(change["new"]))
    frame_slider.observe(on_change, names="value")

    ui = widgets.VBox([
        info,
        frame_slider,
        widgets.HBox([
            widgets.VBox([left_title, left_img, plotL]),
            widgets.VBox([right_title, right_img, plotR]),
        ])
    ])
    render(0)
    display(ui)



In [15]:
import os
import re
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, clear_output

EP_RE = re.compile(r"episode_(\d{8})_(\d{6})_.*\.npz$")

def parse_time_key(path: str):
    fn = os.path.basename(path)
    m = EP_RE.match(fn)
    if not m:
        return None
    ymd, hms = m.group(1), m.group(2)
    return int(ymd + hms)

def list_day_episodes(data_dir: str, date_yyyymmdd: str):
    files = []
    for fn in os.listdir(data_dir):
        if not fn.endswith(".npz"):
            continue
        m = EP_RE.match(fn)
        if not m:
            continue
        if m.group(1) != date_yyyymmdd:
            continue
        files.append(os.path.join(data_dir, fn))
    files.sort(key=lambda p: parse_time_key(p) or 0)
    return files


In [16]:
data_dir = "/mnt/WDC10T/tailai_ws/dataset/one_clip_mounting/data_processed_ft"
date = "20260227"
out_dir = "/mnt/WDC10T/tailai_ws/dataset/one_clip_mounting/data_processed"

npz_paths = list_day_episodes(data_dir, date)
print("found:", len(npz_paths))

interactive_process_batch(
    npz_paths,
    out_dir,
    image_key_left="rgb2",
    image_key_right="rgb3",
    auto_save_on_next=False,   # ‰Ω†ÊÉ≥ next Ëá™Âä®‰øùÂ≠òÂ∞±Êîπ True
)
"""
ÂÖàÁÇπ Next anomaly Ë∑≥Âà∞ÂºÇÂ∏∏ÊÆµÂºÄÂ§¥

Áî® Set Start = Frame

ÊãñÂà∞ÂºÇÂ∏∏ÊÆµÁªìÊùüÔºàÊàñËÄÖÁªßÁª≠ Next anomalyÔºåÁúãÊòØ‰∏çÊòØÂêå‰∏ÄÊÆµÔºâ

Áî® Set End = Frame

Â¶ÇÊûúÊï¥ÊÆµÊòØÂõ∫ÂÆöÈîôËØØÂÄºÔºöApply ... to Range

Â¶ÇÊûúÊòØÂπ≥ÊªëÂèòÂåñÔºöÂÖàÊää start/end ‰∏§Â∏ßË∞ÉÂØπÔºåÂÜç Interpolate ...
"""


VBox(children=(HTML(value='<b>[3/90]</b> episode_20260227_152627_165d55_processed.npz'), HBox(children=(Button‚Ä¶