In [1]:
# Python way (do this early in the notebook)
import os
os.environ["PYDEVD_DISABLE_FILE_VALIDATION"] = "1"


In [2]:
# Colab magic (affects the whole process)
%env PYDEVD_DISABLE_FILE_VALIDATION=1


env: PYDEVD_DISABLE_FILE_VALIDATION=1


In [4]:
# All of these are NOT required for your Gradio + MediaPipe + TF app.
# They pull conflicting pins (NumPy 2.x, protobuf 5.x, headless OpenCV, etc.).
!pip -q uninstall -y \
  albumentations albucore \
  tensorflow-hub tf-keras tensorflow-text \
  grpcio-status opentelemetry-proto \
  pytensor thinc spacy \
  flax jax jaxlib jax_cuda12_plugin \
  orbax-checkpoint tensorstore ml-dtypes \
  tensorflow-decision-forests ydf \
  dopamine-rl keras-hub \
  opencv-python-headless || true


[0m

In [1]:
!pip -q install --upgrade \
  "numpy==1.26.4" \
  "protobuf==4.25.3" \
  "tensorflow==2.17.1" \
  "opencv-python==4.10.0.84" \
  "mediapipe==0.10.14" \
  "scikit-learn" \
  "matplotlib" \
  "tqdm" \
  "gradio"


In [2]:
import tensorflow as tf, mediapipe as mp, cv2, numpy as np, gradio as gr, sklearn
print("TF:", tf.__version__)          # expect 2.17.1 (or your chosen TF)
print("MP:", mp.__version__)          # 0.10.14 if you followed prior pins
print("NP:", np.__version__)          # 1.26.4
print("cv2:", cv2.__version__)        # 4.10.0.84
print("gradio:", gr.__version__)      # current


TF: 2.17.1
MP: 0.10.14
NP: 1.26.4
cv2: 4.11.0
gradio: 5.49.1


In [3]:
import os, cv2, math, json, shutil, glob, io, time
import numpy as np
import gradio as gr
import mediapipe as mp
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
USE_DRIVE = False  # << set True if you mounted Drive
BASE_DIR = "/content/drive/MyDrive/Colab Notebooks/Action Recognition"
DATA_PATH = os.path.join(BASE_DIR, "MP_Data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)

## First version

In [19]:
# @title
# === Gradio Action Detection App (fixed, image-safe) ===
# Requires (install first in Colab):
# !pip -q install gradio mediapipe opencv-python tensorflow scikit-learn matplotlib tqdm pillow

import os, io, glob, json, time, shutil
from pathlib import Path
import numpy as np
import cv2
import gradio as gr
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models

# ---- Config / Paths ----
USE_DRIVE = False
BASE_DIR = "/content/drive/MyDrive/Colab Notebooks/Action Recognition"  # your path
DATA_PATH = os.path.join(BASE_DIR, "MP_Data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)

# ---- MediaPipe for keypoints ----
import mediapipe as mp
mp_holistic = mp.solutions.holistic

def extract_keypoints(results):
    def arr(vals, pad):
        a = np.array(vals).flatten()
        if a.size == 0:
            return np.zeros(pad)
        if a.size < pad:
            a = np.pad(a, (0, pad - a.size))
        return a
    pose = arr([[r.x, r.y, r.z, r.visibility] for r in (results.pose_landmarks.landmark if results.pose_landmarks else [])], 132)
    face = arr([[r.x, r.y, r.z] for r in (results.face_landmarks.landmark if results.face_landmarks else [])], 1404)
    lh   = arr([[r.x, r.y, r.z] for r in (results.left_hand_landmarks.landmark if results.left_hand_landmarks else [])], 63)
    rh   = arr([[r.x, r.y, r.z] for r in (results.right_hand_landmarks.landmark if results.right_hand_landmarks else [])], 63)
    return np.concatenate([pose, face, lh, rh])  # (1662,)

def video_to_sequence_keypoints(video_path, sequence_length=30, stride=1):
    cap = cv2.VideoCapture(video_path)
    frames = []
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        i = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if i % max(1, int(stride)) != 0:
                i += 1
                continue
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img.flags.writeable = False
            results = holistic.process(img)
            img.flags.writeable = True
            frames.append(extract_keypoints(results))
            if len(frames) >= sequence_length:
                break
            i += 1
    cap.release()
    if len(frames) == 0:
        frames = [np.zeros(1662) for _ in range(sequence_length)]
    elif len(frames) < sequence_length:
        last = frames[-1]
        frames += [last] * (sequence_length - len(frames))
    else:
        frames = frames[:sequence_length]
    return np.stack(frames, axis=0)  # (T,1662)

# ---- Image helpers (return NumPy arrays for gr.Image) ----
def fig_to_array(fig):
    buf = io.BytesIO()
    fig.savefig(buf, format="png", bbox_inches="tight")
    buf.seek(0)
    arr = np.array(Image.open(buf).convert("RGB"))
    buf.close()
    return arr

def plot_confmat(cm, labels):
    fig = plt.figure(figsize=(4 + 0.3 * len(labels), 4 + 0.3 * len(labels)))
    plt.imshow(cm, interpolation='nearest')
    plt.title("Confusion Matrix")
    plt.colorbar()
    ticks = np.arange(len(labels))
    plt.xticks(ticks, labels, rotation=45, ha="right")
    plt.yticks(ticks, labels)
    thresh = cm.max() / 2
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, int(cm[i, j]), ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    arr = fig_to_array(fig)
    plt.close(fig)
    return arr  # NumPy array for gr.Image

# ---- Dataset & Model helpers ----
def list_actions():
    return sorted([d for d in os.listdir(DATA_PATH) if os.path.isdir(os.path.join(DATA_PATH, d))])

def count_sequences(action):
    return len([p for p in glob.glob(os.path.join(DATA_PATH, action, "*")) if os.path.isdir(p)])

def save_sequence(action, sequence_array):
    act_dir = os.path.join(DATA_PATH, action)
    os.makedirs(act_dir, exist_ok=True)
    next_id = count_sequences(action) + 1
    seq_dir = os.path.join(act_dir, str(next_id))
    os.makedirs(seq_dir, exist_ok=True)
    for i, f in enumerate(sequence_array):
        np.save(os.path.join(seq_dir, f"{i}.npy"), f)
    return next_id

def load_dataset(actions=None, sequence_length=30):
    if not actions:
        actions = list_actions()
    a2i = {a: i for i, a in enumerate(actions)}
    X, y = [], []
    for a in actions:
        seq_dirs = sorted(
            [d for d in glob.glob(os.path.join(DATA_PATH, a, "*")) if os.path.isdir(d)],
            key=lambda p: int(os.path.basename(p)) if os.path.basename(p).isdigit() else 0
        )
        for sd in seq_dirs:
            frames = []
            for i in range(int(sequence_length)):
                fpath = os.path.join(sd, f"{i}.npy")
                frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
            X.append(np.stack(frames))
            y.append(a2i[a])
    if len(X) == 0:
        return None, None, actions
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.int64), actions

def build_lstm_model(num_classes, sequence_length=30, feature_dim=1662,
                     lstm_units=128, dense_units=64, dropout=0.3, force_cpu=False):
    """
    No Masking layer (fixed-length sequences); force non-cuDNN LSTM so masks are irrelevant.
    """
    # Using activation / implementation settings that disable cuDNN fast path.
    lstm_kwargs = dict(
        return_sequences=True,
        activation="tanh",
        recurrent_activation="sigmoid",
        implementation=2,        # use standard kernel
        recurrent_dropout=0.0,
        dropout=0.0,
    )

    inputs = layers.Input(shape=(sequence_length, feature_dim))
    x = layers.LSTM(lstm_units, **lstm_kwargs)(inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LSTM(lstm_units, activation="tanh", recurrent_activation="sigmoid",
                    implementation=2, recurrent_dropout=0.0, dropout=0.0)(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(dense_units, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model



# ---- Tab 1 handlers (Collect) ----
def create_action_folder(action_name_text, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value=""), "0 / 0"
    p = os.path.join(DATA_PATH, action_name_text)
    os.makedirs(p, exist_ok=True)
    current = count_sequences(action_name_text)
    total = int(target_count)
    return f"Using folder: {p}", gr.update(value=action_name_text), f"{current} / {total}"

def record_and_save(action_name_text, video, sequence_length, stride, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Pick/create an action first.", None
    if video is None:
        return "Record or upload a short clip.", None
    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length), stride=max(1, int(stride)))
    _ = save_sequence(action_name_text, seq)
    progress = f"{count_sequences(action_name_text)} / {int(target_count)}"
    return f"Saved 1 sequence in '{action_name_text}'.", progress

def reset_action(action_name_text, really=False):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value="")
    if not really:
        return "Tick the checkbox to confirm reset.", gr.update(value=action_name_text)
    p = os.path.join(DATA_PATH, action_name_text)
    if os.path.exists(p):
        shutil.rmtree(p)
    os.makedirs(p, exist_ok=True)
    return f"Cleared data for '{action_name_text}'.", gr.update(value=action_name_text)

# ---- Tab 2 handler (Train/Evaluate) ----
def train_model(sequence_length, test_size, epochs, batch_size):
    X, y, actions = load_dataset(sequence_length=int(sequence_length))
    if X is None:
        return "Dataset empty. Collect sequences first.", None, None, None

    # --- Sanitize data: dtype, NaNs/Infs, optional normalization ---
    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y, dtype=np.int32)
    # Replace NaN/Inf with 0
    X[~np.isfinite(X)] = 0.0

    # Optional: simple per-feature standardization over the training set
    # (do it before splitting to avoid tiny-set blowups; this is a small demo)
    mean = X.mean(axis=(0, 1), keepdims=True)
    std = X.std(axis=(0, 1), keepdims=True) + 1e-6
    X = (X - mean) / std

    # --- Decide split strategy (avoid failing on tiny/imbalanced sets) ---
    counts = np.bincount(y, minlength=len(actions))
    can_strat = (len(X) > len(actions)) and np.all(counts >= 2)

    if can_strat:
        tsize = min(float(test_size), 0.2)
        Xtr, Xte, ytr, yte = train_test_split(
            X, y, test_size=tsize, random_state=42, stratify=y
        )
        val_data = (Xte, yte)
        msg = f"Stratified split. Class counts: {dict(zip(actions, counts))}"
    else:
        Xtr, ytr = X, y
        val_data = None
        msg = f"No validation split (class counts: {dict(zip(actions, counts))})."

    bs = max(1, min(int(batch_size), len(Xtr)))

    # --- Build & train (no cuDNN path) ---
    model = build_lstm_model(num_classes=len(actions), sequence_length=int(sequence_length))
    history = model.fit(
        Xtr, ytr, validation_data=val_data, epochs=int(epochs), batch_size=bs, verbose=0
    )

    # --- Save artifacts ---
    ts = int(time.time())
    model_path = os.path.join(MODELS_DIR, f"lstm_{ts}.keras")
    labels_path = os.path.join(MODELS_DIR, f"lstm_{ts}.labels.json")
    model.save(model_path)
    with open(labels_path, "w") as f:
        json.dump(actions, f)

    # --- Loss plot -> numpy array for gr.Image ---
    fig = plt.figure(figsize=(8, 3))
    plt.plot(history.history.get("loss", []), label="train")
    if "val_loss" in history.history:
        plt.plot(history.history["val_loss"], label="val")
    plt.title("Loss"); plt.xlabel("epoch"); plt.ylabel("loss"); plt.legend()
    loss_arr = fig_to_array(fig); plt.close(fig)

    # --- Validation metrics if we had a val set ---
    if val_data is not None and len(val_data[0]) > 0:
        Xte, yte = val_data
        ypred = np.argmax(model.predict(Xte, verbose=0), axis=1)
        cm_arr = plot_confmat(confusion_matrix(yte, ypred), actions)
        report = classification_report(yte, ypred, target_names=actions)
    else:
        cm_arr = None
        report = ("No validation metrics (insufficient sequences per class). "
                  "Collect ≥2 per action (10+ preferred).")

    summary = f"{msg}\nTrained on {len(Xtr)} sequences (batch_size={bs}).\nSaved model:\n{model_path}\n{labels_path}"
    return summary, loss_arr, cm_arr, report

    # after model/labels are saved in train_model()
    norm_path = os.path.join(MODELS_DIR, f"lstm_{ts}.norm.npz")
    np.savez_compressed(norm_path,
                        mean=mean.astype(np.float32),
                        std=std.astype(np.float32),
                        sequence_length=int(sequence_length))
    summary = (f"{msg}\nTrained on {len(Xtr)} sequences (batch_size={bs}).\n"
              f"Saved model:\n{model_path}\n{labels_path}\n{norm_path}")



# ---- Tab 3 handlers (Test) ----
def _load_model_bundle(model_path, labels_path):
    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)
    # try to find matching normalization file
    base = os.path.splitext(os.path.basename(model_path))[0]   # e.g., "lstm_173023..."
    norm_guess = os.path.join(os.path.dirname(model_path), f"{base}.norm.npz")
    norm = None
    if os.path.exists(norm_guess):
        norm = np.load(norm_guess)
    return model, actions, norm

def _apply_norm(seq, norm):
    if norm is None:
        return seq
    mean = norm["mean"]  # shape (1,1,1662)
    std  = norm["std"]   # shape (1,1,1662)
    return (seq - mean) / (std + 1e-6)

def _nonzero_landmark_ratio(seq):
    # seq shape: (T, 1662). Count frames with any nonzero landmark.
    nonzero = (np.abs(seq).sum(axis=1) > 1e-8).astype(np.float32)
    return float(nonzero.mean()), int(nonzero.sum()), int(seq.shape[0])

def _predict_seq(model, actions, seq, conf_floor=0.40, topk=5):
    logits = model.predict(np.expand_dims(seq, 0), verbose=0)[0]
    idx = np.argsort(logits)[::-1]
    idx = idx[:min(topk, len(actions))]
    top = [(actions[i], float(logits[i])) for i in idx]
    max_lab, max_prob = top[0]
    status = "ok" if max_prob >= conf_floor else "low_confidence"
    return top, status

def predict_from_webcam(model_path, labels_path, video, sequence_length, stride):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if video is None:
        return "Record a short clip.", None

    model, actions, norm = _load_model_bundle(model_path, labels_path)

    # Basic sanity: model output vs labels
    out_dim = model.output_shape[-1]
    if out_dim != len(actions):
        return (f"Label/model mismatch: model has {out_dim} outputs but {len(actions)} labels. "
                "Make sure the .labels.json matches this .keras."), None

    # Build sequence
    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length),
                                      stride=max(1, int(stride)))

    # If we saved a different training sequence length, adapt gracefully
    if norm is not None and "sequence_length" in norm.files:
        train_T = int(norm["sequence_length"])
        if seq.shape[0] != train_T:
            # center-crop or pad with last frame to match train length
            if seq.shape[0] > train_T:
                start = max(0, (seq.shape[0]-train_T)//2)
                seq = seq[start:start+train_T]
            else:
                seq = np.concatenate([seq, np.repeat(seq[-1:],[train_T-seq.shape[0]], axis=0)], axis=0)

    # Apply same normalization
    seq = _apply_norm(seq.astype(np.float32), norm)

    # Diagnostics: landmark presence
    nz_ratio, nz_frames, total_frames = _nonzero_landmark_ratio(seq)
    diag = {"nonzero_ratio": round(nz_ratio, 3),
            "nonzero_frames": nz_frames,
            "total_frames": total_frames}

    # Predict
    top, status = _predict_seq(model, actions, seq, conf_floor=0.40, topk=min(5, len(actions)))
    diag["status"] = status
    diag["top1"] = {"label": top[0][0], "prob": round(top[0][1], 3)}

    # Plot bars
    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions")
    pred_arr = fig_to_array(fig); plt.close(fig)

    return json.dumps({"preds": top, "diag": diag}, indent=2), pred_arr


def predict_from_dataset(model_path, labels_path, action_name_drop, seq_index, sequence_length):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if not action_name_drop:
        return "Select an action.", None

    seq_dir = os.path.join(DATA_PATH, action_name_drop, str(int(seq_index)))
    if not os.path.isdir(seq_dir):
        return f"No sequence #{int(seq_index)} for '{action_name_drop}'.", None

    model, actions, norm = _load_model_bundle(model_path, labels_path)

    out_dim = model.output_shape[-1]
    if out_dim != len(actions):
        return (f"Label/model mismatch: model has {out_dim} outputs but {len(actions)} labels. "
                "Make sure the .labels.json matches this .keras."), None

    # Load saved frames
    frames = []
    for i in range(int(sequence_length)):
        fpath = os.path.join(seq_dir, f"{i}.npy")
        frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
    seq = np.stack(frames).astype(np.float32)

    # Align to train sequence length if needed
    if norm is not None and "sequence_length" in norm.files:
        train_T = int(norm["sequence_length"])
        if seq.shape[0] != train_T:
            if seq.shape[0] > train_T:
                start = max(0, (seq.shape[0]-train_T)//2)
                seq = seq[start:start+train_T]
            else:
                seq = np.concatenate([seq, np.repeat(seq[-1:],[train_T-seq.shape[0]], axis=0)], axis=0)

    seq = _apply_norm(seq, norm)

    nz_ratio, nz_frames, total_frames = _nonzero_landmark_ratio(seq)
    diag = {"nonzero_ratio": round(nz_ratio, 3),
            "nonzero_frames": nz_frames,
            "total_frames": total_frames}

    top, status = _predict_seq(model, actions, seq, conf_floor=0.40, topk=min(5, len(actions)))
    diag["status"] = status
    diag["top1"] = {"label": top[0][0], "prob": round(top[0][1], 3)}

    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions (Dataset)")
    pred_arr = fig_to_array(fig); plt.close(fig)
    return json.dumps({"preds": top, "diag": diag}, indent=2), pred_arr


# ==================== Gradio UI ====================
with gr.Blocks(title="Action Detection — Collect / Train / Test") as demo:
    gr.Markdown(f"**Base directory:** `{BASE_DIR}`  \nKeypoints → `{DATA_PATH}`  \nModels → `{MODELS_DIR}`")

    # Tab 1: Collect
    with gr.Tab("1) Collect"):
        with gr.Row():
            action_name_tb = gr.Textbox(label="Action name", placeholder="e.g., wave, hello, thanks")
            target_count = gr.Number(label="Target sequences", value=30, precision=0)
            create_btn = gr.Button("Create / Select Folder")
        status1 = gr.Markdown("")
        progress = gr.Textbox(label="Progress", value="0 / 30", interactive=False)

        gr.Markdown("**Record a short clip (5–10s) for ONE sequence**. Repeat until you reach your target.")
        with gr.Row():
            sequence_length = gr.Slider(10, 60, value=30, step=1, label="Frames per sequence")
            stride = gr.Slider(1, 5, value=1, step=1, label="Stride (use every Nth frame)")
        video_in = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
        save_btn = gr.Button("Record & Save (one sequence)")

        really = gr.Checkbox(label="Really reset this action's data?")
        reset_btn = gr.Button("Reset Action Folder", variant="stop")

        create_btn.click(create_action_folder, [action_name_tb, target_count], [status1, action_name_tb, progress])
        save_btn.click(record_and_save, [action_name_tb, video_in, sequence_length, stride, target_count], [status1, progress])
        reset_btn.click(reset_action, [action_name_tb, really], [status1, action_name_tb])

    # Tab 2: Train / Evaluate
    with gr.Tab("2) Train / Evaluate"):
        with gr.Row():
            seq_len_train = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
            test_size = gr.Slider(0.1, 0.5, value=0.2, step=0.05, label="Test size")
        with gr.Row():
            epochs = gr.Slider(1, 50, value=12, step=1, label="Epochs")
            batch = gr.Slider(4, 64, value=16, step=4, label="Batch size")
        train_btn = gr.Button("Train LSTM")
        train_msg = gr.Textbox(label="Summary", lines=4)
        loss_img = gr.Image(label="Training Loss")
        cm_img = gr.Image(label="Confusion Matrix")
        report_txt = gr.Textbox(label="Classification Report", lines=14)
        train_btn.click(train_model, [seq_len_train, test_size, epochs, batch], [train_msg, loss_img, cm_img, report_txt])

    # Tab 3: Test
    with gr.Tab("3) Test"):
        gr.Markdown("Test with **webcam** or an existing **dataset sequence**.")
        with gr.Row():
            with gr.Column():
                gr.Markdown("**Webcam Test**")
                model_path = gr.Textbox(label="Model .keras path")
                labels_path = gr.Textbox(label="Labels .json path")
                seq_len_inf = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                stride_inf = gr.Slider(1, 5, value=1, step=1, label="Stride")
                vid_inf = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
                infer_btn = gr.Button("Predict (Webcam/Upload)")
                preds_json = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot = gr.Image(label="Scores")
                infer_btn.click(predict_from_webcam, [model_path, labels_path, vid_inf, seq_len_inf, stride_inf], [preds_json, preds_plot])

            with gr.Column():
                gr.Markdown("**Dataset Test**")
                action_sel = gr.Dropdown(label="Action", choices=list_actions())
                seq_idx = gr.Number(label="Sequence #", value=1, precision=0)
                seq_len_ds = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                infer_ds_btn = gr.Button("Predict (Dataset)")
                preds_json_ds = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot_ds = gr.Image(label="Scores")
                infer_ds_btn.click(predict_from_dataset, [model_path, labels_path, action_sel, seq_idx, seq_len_ds], [preds_json_ds, preds_plot_ds])

demo.launch(share=True, server_name="0.0.0.0", show_error=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://fc7445638997fff39c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




## Version Main

In [18]:
# @title
# === Gradio Action Detection App (real-time + landmark preview) ===
# !pip -q install gradio mediapipe opencv-python tensorflow scikit-learn matplotlib tqdm pillow

import os, io, glob, json, time, shutil, collections, tempfile
from pathlib import Path
import numpy as np
import cv2
import gradio as gr
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models

# ---- Config / Paths ----
USE_DRIVE = False
BASE_DIR = "/content/drive/MyDrive/Colab Notebooks/Action Recognition"  # your path
DATA_PATH = os.path.join(BASE_DIR, "MP_Data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)

# ---- MediaPipe for keypoints & drawing ----
import mediapipe as mp
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

# ----------------- Landmark / Keypoint helpers -----------------
def extract_keypoints(results):
    def arr(vals, pad):
        a = np.array(vals).flatten()
        if a.size == 0:
            return np.zeros(pad)
        if a.size < pad:
            a = np.pad(a, (0, pad - a.size))
        return a
    pose = arr([[r.x, r.y, r.z, r.visibility] for r in (results.pose_landmarks.landmark if results.pose_landmarks else [])], 132)
    face = arr([[r.x, r.y, r.z] for r in (results.face_landmarks.landmark if results.face_landmarks else [])], 1404)
    lh   = arr([[r.x, r.y, r.z] for r in (results.left_hand_landmarks.landmark if results.left_hand_landmarks else [])], 63)
    rh   = arr([[r.x, r.y, r.z] for r in (results.right_hand_landmarks.landmark if results.right_hand_landmarks else [])], 63)
    return np.concatenate([pose, face, lh, rh])  # (1662,)

def draw_landmarks_bgr(frame_bgr, results):
    """Draw holistic landmarks on a BGR frame (in-place)."""
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.face_landmarks,
        mp.solutions.holistic.FACEMESH_TESSELATION,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp_styles.get_default_face_mesh_tesselation_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.pose_landmarks,
        mp.solutions.holistic.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.left_hand_landmarks,
        mp.solutions.holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_hand_landmarks_style(),
        connection_drawing_spec=mp_styles.get_default_hand_connections_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.right_hand_landmarks,
        mp.solutions.holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_hand_landmarks_style(),
        connection_drawing_spec=mp_styles.get_default_hand_connections_style(),
    )

# ----------------- Video -> sequence -----------------
def video_to_sequence_keypoints(video_path, sequence_length=30, stride=1):
    cap = cv2.VideoCapture(video_path)
    frames = []
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        i = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if i % max(1, int(stride)) != 0:
                i += 1
                continue
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img.flags.writeable = False
            results = holistic.process(img)
            img.flags.writeable = True
            frames.append(extract_keypoints(results))
            if len(frames) >= sequence_length:
                break
            i += 1
    cap.release()
    if len(frames) == 0:
        frames = [np.zeros(1662) for _ in range(sequence_length)]
    elif len(frames) < sequence_length:
        last = frames[-1]
        frames += [last] * (sequence_length - len(frames))
    else:
        frames = frames[:sequence_length]
    return np.stack(frames, axis=0)  # (T,1662)

# ----------------- Annotate a clip with landmarks (NEW) -----------------
def annotate_video_with_landmarks(in_path, out_w=640, out_h=360, fps=24):
    """
    Reads video file, draws holistic landmarks, writes an annotated .mp4 to a temp file.
    Returns output_path and a single preview frame (RGB ndarray).
    """
    cap = cv2.VideoCapture(in_path)
    if not cap.isOpened():
        return None, None
    # Infer fps/size if available
    in_fps = cap.get(cv2.CAP_PROP_FPS)
    if in_fps and in_fps > 0:
        fps = int(in_fps)
    out_path = os.path.join(tempfile.gettempdir(), f"annot_{int(time.time())}.mp4")
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(out_path, fourcc, fps, (out_w, out_h))
    preview = None

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, (out_w, out_h))
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            rgb.flags.writeable = False
            results = holistic.process(rgb)
            rgb.flags.writeable = True
            # draw on BGR
            draw_landmarks_bgr(frame, results)
            if preview is None:
                preview = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            writer.write(frame)
    cap.release()
    writer.release()
    return out_path, preview

# ---- Image helpers ----
def fig_to_array(fig):
    buf = io.BytesIO()
    fig.savefig(buf, format="png", bbox_inches="tight")
    buf.seek(0)
    arr = np.array(Image.open(buf).convert("RGB"))
    buf.close()
    return arr

def plot_confmat(cm, labels):
    fig = plt.figure(figsize=(4 + 0.3 * len(labels), 4 + 0.3 * len(labels)))
    plt.imshow(cm, interpolation='nearest')
    plt.title("Confusion Matrix")
    plt.colorbar()
    ticks = np.arange(len(labels))
    plt.xticks(ticks, labels, rotation=45, ha="right")
    plt.yticks(ticks, labels)
    thresh = cm.max() / 2
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, int(cm[i, j]), ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    arr = fig_to_array(fig)
    plt.close(fig)
    return arr

# ---- Dataset & Model helpers (unchanged) ----
def list_actions():
    return sorted([d for d in os.listdir(DATA_PATH) if os.path.isdir(os.path.join(DATA_PATH, d))])

def count_sequences(action):
    return len([p for p in glob.glob(os.path.join(DATA_PATH, action, "*")) if os.path.isdir(p)])

def save_sequence(action, sequence_array):
    act_dir = os.path.join(DATA_PATH, action)
    os.makedirs(act_dir, exist_ok=True)
    next_id = count_sequences(action) + 1
    seq_dir = os.path.join(act_dir, str(next_id))
    os.makedirs(seq_dir, exist_ok=True)
    for i, f in enumerate(sequence_array):
        np.save(os.path.join(seq_dir, f"{i}.npy"), f)
    return next_id

def load_dataset(actions=None, sequence_length=30):
    if not actions:
        actions = list_actions()
    a2i = {a: i for i, a in enumerate(actions)}
    X, y = [], []
    for a in actions:
        seq_dirs = sorted(
            [d for d in glob.glob(os.path.join(DATA_PATH, a, "*")) if os.path.isdir(d)],
            key=lambda p: int(os.path.basename(p)) if os.path.basename(p).isdigit() else 0
        )
        for sd in seq_dirs:
            frames = []
            for i in range(int(sequence_length)):
                fpath = os.path.join(sd, f"{i}.npy")
                frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
            X.append(np.stack(frames))
            y.append(a2i[a])
    if len(X) == 0:
        return None, None, actions
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.int64), actions

def build_lstm_model(num_classes, sequence_length=30, feature_dim=1662,
                     lstm_units=128, dense_units=64, dropout=0.3, force_cpu=False):
    lstm_kwargs = dict(
        return_sequences=True,
        activation="tanh",
        recurrent_activation="sigmoid",
        implementation=2,
        recurrent_dropout=0.0,
        dropout=0.0,
    )
    inputs = layers.Input(shape=(sequence_length, feature_dim))
    x = layers.LSTM(lstm_units, **lstm_kwargs)(inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LSTM(lstm_units, activation="tanh", recurrent_activation="sigmoid",
                    implementation=2, recurrent_dropout=0.0, dropout=0.0)(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(dense_units, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

# ==================== Tab 1 handlers (Collect) ====================
def create_action_folder(action_name_text, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value=""), "0 / 0"
    p = os.path.join(DATA_PATH, action_name_text)
    os.makedirs(p, exist_ok=True)
    current = count_sequences(action_name_text)
    total = int(target_count)
    return f"Using folder: {p}", gr.update(value=action_name_text), f"{current} / {total}"

# >>> CHANGED: now also returns an annotated preview video + preview frame
def record_and_save(action_name_text, video, sequence_length, stride, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Pick/create an action first.", None, None, None
    if video is None:
        return "Record or upload a short clip.", None, None, None

    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length), stride=max(1, int(stride)))
    _ = save_sequence(action_name_text, seq)
    progress = f"{count_sequences(action_name_text)} / {int(target_count)}"

    # Annotated preview for data collection quality check
    out_path, preview = annotate_video_with_landmarks(video)
    status = f"Saved 1 sequence in '{action_name_text}'. Annotated preview generated."
    return status, progress, out_path, (preview if preview is not None else None)

def reset_action(action_name_text, really=False):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value="")
    if not really:
        return "Tick the checkbox to confirm reset.", gr.update(value=action_name_text)
    p = os.path.join(DATA_PATH, action_name_text)
    if os.path.exists(p):
        shutil.rmtree(p)
    os.makedirs(p, exist_ok=True)
    return f"Cleared data for '{action_name_text}'.", gr.update(value=action_name_text)

# ==================== Tab 2 handler (Train/Evaluate) ====================
def train_model(sequence_length, test_size, epochs, batch_size):
    X, y, actions = load_dataset(sequence_length=int(sequence_length))
    if X is None:
        return "Dataset empty. Collect sequences first.", None, None, None

    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y, dtype=np.int32)
    X[~np.isfinite(X)] = 0.0

    mean = X.mean(axis=(0, 1), keepdims=True)
    std = X.std(axis=(0, 1), keepdims=True) + 1e-6
    X = (X - mean) / std

    counts = np.bincount(y, minlength=len(actions))
    can_strat = (len(X) > len(actions)) and np.all(counts >= 2)

    if can_strat:
        tsize = min(float(test_size), 0.2)
        Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=tsize, random_state=42, stratify=y)
        val_data = (Xte, yte)
        msg = f"Stratified split. Class counts: {dict(zip(actions, counts))}"
    else:
        Xtr, ytr = X, y
        val_data = None
        msg = f"No validation split (class counts: {dict(zip(actions, counts))})."

    bs = max(1, min(int(batch_size), len(Xtr)))
    model = build_lstm_model(num_classes=len(actions), sequence_length=int(sequence_length))
    history = model.fit(Xtr, ytr, validation_data=val_data, epochs=int(epochs), batch_size=bs, verbose=0)

    ts = int(time.time())
    model_path = os.path.join(MODELS_DIR, f"lstm_{ts}.keras")
    labels_path = os.path.join(MODELS_DIR, f"lstm_{ts}.labels.json")
    model.save(model_path)
    with open(labels_path, "w") as f:
        json.dump(actions, f)

    fig = plt.figure(figsize=(8, 3))
    plt.plot(history.history.get("loss", []), label="train")
    if "val_loss" in history.history:
        plt.plot(history.history["val_loss"], label="val")
    plt.title("Loss"); plt.xlabel("epoch"); plt.ylabel("loss"); plt.legend()
    loss_arr = fig_to_array(fig); plt.close(fig)

    if val_data is not None and len(val_data[0]) > 0:
        Xte, yte = val_data
        ypred = np.argmax(model.predict(Xte, verbose=0), axis=1)
        cm_arr = plot_confmat(confusion_matrix(yte, ypred), actions)
        report = classification_report(yte, ypred, target_names=actions)
    else:
        cm_arr = None
        report = "No validation metrics (insufficient sequences per class). Collect ≥2 per action (10+ preferred)."

    summary = f"{msg}\nTrained on {len(Xtr)} sequences (batch_size={bs}).\nSaved model:\n{model_path}\n{labels_path}"
    return summary, loss_arr, cm_arr, report

# ==================== Tab 3a handlers (Batch Test; same as before) ====================
def predict_from_webcam(model_path, labels_path, video, sequence_length, stride):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if video is None:
        return "Record a short clip.", None
    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)
    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length), stride=max(1, int(stride)))
    logits = model.predict(np.expand_dims(seq, 0), verbose=0)[0]
    idx = np.argsort(logits)[::-1]
    top = [(actions[i], float(logits[i])) for i in idx[:min(5, len(actions))]]

    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions")
    pred_arr = fig_to_array(fig)
    plt.close(fig)
    return json.dumps(top, indent=2), pred_arr

def predict_from_dataset(model_path, labels_path, action_name_drop, seq_index, sequence_length):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if not action_name_drop:
        return "Select an action.", None

    seq_dir = os.path.join(DATA_PATH, action_name_drop, str(int(seq_index)))
    if not os.path.isdir(seq_dir):
        return f"No sequence #{int(seq_index)} for '{action_name_drop}'.", None

    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)

    frames = []
    for i in range(int(sequence_length)):
        fpath = os.path.join(seq_dir, f"{i}.npy")
        frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
    seq = np.stack(frames)

    logits = model.predict(np.expand_dims(seq, 0), verbose=0)[0]
    idx = np.argsort(logits)[::-1]
    top = [(actions[i], float(logits[i])) for i in idx[:min(5, len(actions))]]

    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions (Dataset)")
    pred_arr = fig_to_array(fig)
    plt.close(fig)
    return json.dumps(top, indent=2), pred_arr

# ==================== Tab 3b handlers (NEW: Live Real-time) ====================
def load_model_and_labels(model_path, labels_path):
    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)
    return model, actions

# Helper to draw prediction bars on frame
def draw_scores_bgr(frame, labels, probs, x=10, y=10, bar_w=180, bar_h=18, gap=6):
    if probs is None or labels is None:
        return frame
    for i, (lab, p) in enumerate(zip(labels, probs)):
        y0 = y + i*(bar_h+gap)
        cv2.rectangle(frame, (x, y0), (x+bar_w, y0+bar_h), (50,50,50), 1)
        w = int(bar_w * float(max(0.0, min(1.0, p))))
        cv2.rectangle(frame, (x, y0), (x+w, y0+bar_h), (0,200,0), -1)
        txt = f"{lab}: {p:.2f}"
        cv2.putText(frame, txt, (x+bar_w+10, y0+bar_h-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
    return frame

# >>> NEW: stateful stream function
def realtime_stream(video_input, state, seq_len, top_k):
    """
    video_input: Path to the video file recorded by gr.Video
    state: dict with keys {'deque': deque, 'model': None or model, 'labels': None or list}
    Returns: (annotated_frame, json_topk, updated_state)
    """
    if video_input is None:
        return None, "{}", state

    # initialize state
    if state is None or not isinstance(state, dict) or 'deque' not in state:
        state = {'deque': collections.deque(maxlen=int(seq_len)), 'model': None, 'labels': None}

    # lazy-load model if paths provided in state
    model_path = state.get("model_path")
    labels_path = state.get("labels_path")
    if state.get("model") is None and model_path and labels_path and os.path.exists(model_path) and os.path.exists(labels_path):
        model, labels = load_model_and_labels(model_path, labels_path)
        state["model"] = model
        state["labels"] = labels

    cap = cv2.VideoCapture(video_input)
    annotated_frame = None
    top_json = "{}"

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            bgr = frame # frame is already BGR from cv2.VideoCapture

            # MediaPipe pass
            rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
            rgb.flags.writeable = False
            results = holistic.process(rgb)
            rgb.flags.writeable = True
            draw_landmarks_bgr(bgr, results)
            key = extract_keypoints(results)
            state['deque'].append(key)

            if state.get("model") is not None and len(state['deque']) >= int(seq_len):
                seq = np.stack(list(state['deque']))[-int(seq_len):]
                logits = state['model'].predict(np.expand_dims(seq, 0), verbose=0)[0]
                idx = np.argsort(logits)[::-1][:int(top_k)]
                labs = [state['labels'][i] for i in idx]
                probs = [float(logits[i]) for i in idx]
                # Draw overlay bars
                draw_scores_bgr(bgr, labs, probs, x=10, y=10)
                # Put top-1 big label
                cv2.putText(bgr, labs[0], (10, bgr.shape[0]-20), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,255), 2, cv2.LINE_AA)
                top_json = json.dumps(list(zip(labs, probs)), indent=2)

            annotated_frame = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
            # In a real streaming scenario, you would yield frames here.
            # For simplicity with gr.Video, we'll just process the whole clip
            # and return the last frame or an annotated video path.
            # Since the goal is *real-time*, we should aim to process frame-by-frame.
            # However, gr.Video's stream method is designed to pass the *file path* after recording finishes.
            # To do frame-by-frame, gr.Image(live=True) is the intended way.
            # Given that live=True is not working, let's reconsider the approach.
            # Perhaps the 'streaming' parameter on gr.Video is needed after all, or a different event.

            # Let's revert to gr.Image with live=True and investigate why it failed.
            # The most common reason for Image.__init__() errors with 'live' is an old Gradio version.
            # Let's add a print statement to check the Gradio version.

            pass # Process only the last frame for now with gr.Video

    cap.release()
    # For gr.Video input, returning the last processed frame or an annotated video file path makes sense.
    # Let's return the last frame and the predictions.

    return annotated_frame, top_json, state


# >>> NEW: helpers to set model paths into the live state
def set_live_model_paths(state, model_path, labels_path):
    if state is None or not isinstance(state, dict) or 'deque' not in state:
        state = {'deque': collections.deque(maxlen=30), 'model': None, 'labels': None}
    state['model_path'] = model_path
    state['labels_path'] = labels_path
    # reset loaded model so it reloads with new paths
    state['model'] = None
    state['labels'] = None
    return state, f"Live model set:\n{model_path}\n{labels_path}"

# ==================== Gradio UI ====================
with gr.Blocks(title="Action Detection — Collect / Train / Test (Real-time)") as demo:
    gr.Markdown(f"**Base directory:** `{BASE_DIR}`  \nKeypoints → `{DATA_PATH}`  \nModels → `{MODELS_DIR}`")

    # Tab 1: Collect (now with landmark preview)
    with gr.Tab("1) Collect"):
        with gr.Row():
            action_name_tb = gr.Textbox(label="Action name", placeholder="e.g., wave, hello, thanks")
            target_count = gr.Number(label="Target sequences", value=30, precision=0)
            create_btn = gr.Button("Create / Select Folder")
        status1 = gr.Markdown("")
        progress = gr.Textbox(label="Progress", value="0 / 30", interactive=False)

        gr.Markdown("**Record a short clip (5–10s) for ONE sequence**. We will save keypoints and also render an annotated preview with landmarks.")
        with gr.Row():
            sequence_length = gr.Slider(10, 60, value=30, step=1, label="Frames per sequence")
            stride = gr.Slider(1, 5, value=1, step=1, label="Stride (use every Nth frame)")
        video_in = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
        save_btn = gr.Button("Record & Save (one sequence)")

        # >>> NEW: annotated preview outputs
        anno_path = gr.Textbox(label="Annotated preview file (.mp4)", interactive=False)
        anno_frame = gr.Image(label="Preview (first annotated frame)", visible=True)

        really = gr.Checkbox(label="Really reset this action's data?")
        reset_btn = gr.Button("Reset Action Folder", variant="stop")

        create_btn.click(create_action_folder, [action_name_tb, target_count], [status1, action_name_tb, progress])
        save_btn.click(
            record_and_save,
            [action_name_tb, video_in, sequence_length, stride, target_count],
            [status1, progress, anno_path, anno_frame]
        )
        reset_btn.click(reset_action, [action_name_tb, really], [status1, action_name_tb])

    # Tab 2: Train / Evaluate
    with gr.Tab("2) Train / Evaluate"):
        with gr.Row():
            seq_len_train = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
            test_size = gr.Slider(0.1, 0.5, value=0.2, step=0.05, label="Test size")
        with gr.Row():
            epochs = gr.Slider(1, 50, value=12, step=1, label="Epochs")
            batch = gr.Slider(4, 64, value=16, step=4, label="Batch size")
        train_btn = gr.Button("Train LSTM")
        train_msg = gr.Textbox(label="Summary", lines=4)
        loss_img = gr.Image(label="Training Loss")
        cm_img = gr.Image(label="Confusion Matrix")
        report_txt = gr.Textbox(label="Classification Report", lines=14)
        train_btn.click(train_model, [seq_len_train, test_size, epochs, batch], [train_msg, loss_img, cm_img, report_txt])

    # Tab 3a: Batch Test (existing)
    with gr.Tab("3a) Test (clip based)"):
        gr.Markdown("Test with a short **webcam/uploaded clip** or an existing **dataset sequence**.")
        with gr.Row():
            with gr.Column():
                gr.Markdown("**Webcam/Upload Clip Test**")
                model_path = gr.Textbox(label="Model .keras path")
                labels_path = gr.Textbox(label="Labels .json path")
                seq_len_inf = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                stride_inf = gr.Slider(1, 5, value=1, step=1, label="Stride")
                vid_inf = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
                infer_btn = gr.Button("Predict (Clip)")
                preds_json = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot = gr.Image(label="Scores")
                infer_btn.click(predict_from_webcam, [model_path, labels_path, vid_inf, seq_len_inf, stride_inf], [preds_json, preds_plot])

            with gr.Column():
                gr.Markdown("**Dataset Test**")
                action_sel = gr.Dropdown(label="Action", choices=list_actions())
                seq_idx = gr.Number(label="Sequence #", value=1, precision=0)
                seq_len_ds = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                infer_ds_btn = gr.Button("Predict (Dataset)")
                preds_json_ds = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot_ds = gr.Image(label="Scores")
                infer_ds_btn.click(predict_from_dataset, [model_path, labels_path, action_sel, seq_idx, seq_len_ds], [preds_json_ds, preds_plot_ds])

    # >>> NEW Tab 3b: True Real-time (streaming)
    with gr.Tab("3b) Live Test (real-time)"):
        gr.Markdown("Live, frame-by-frame predictions with a sliding window and on-frame landmarks & scores.")
        live_state = gr.State({'deque': collections.deque(maxlen=30), 'model': None, 'labels': None})

        with gr.Row():
            live_model_path = gr.Textbox(label="Model .keras path (for live)")
            live_labels_path = gr.Textbox(label="Labels .json path (for live)")
            seq_len_live = gr.Slider(10, 60, value=30, step=1, label="Sequence length (window)")
            topk_live = gr.Slider(1, 5, value=3, step=1, label="Top-K to display")

        set_live_btn = gr.Button("Use these paths for LIVE")
        set_msg = gr.Textbox(label="Status", interactive=False)
        set_live_btn.click(set_live_model_paths, [live_state, live_model_path, live_labels_path], [live_state, set_msg])

        # Attempting gr.Video with sources=["webcam"] instead of gr.Image(live=True)
        cam_live = gr.Video(label="Webcam (live)", sources=["webcam"], height=360)
        out_live = gr.Image(label="Annotated (live)")
        json_live = gr.Textbox(label="Top-K (JSON)", lines=6)

        # Stream frames from gr.Video → model
        # cam_live.stream(  # This caused AttributeError
        #     fn=realtime_stream,
        #     inputs=[cam_live, live_state, seq_len_live, topk_live],
        #     outputs=[out_live, json_live, live_state],
        #     time_limit=None
        # )

        # Since stream() is not available, we'll disable the live tab for now or need a different approach
        gr.Markdown("Real-time streaming is not supported with the current Gradio version/setup.")


demo.launch(share=True, server_name="0.0.0.0", show_error=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://515514c0bc916c7f04.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




# VERSION 3

In [22]:
# @title
# === Gradio Action Detection App (real-time + landmark preview) ===
# !pip -q install gradio mediapipe opencv-python tensorflow scikit-learn matplotlib tqdm pillow

import os, io, glob, json, time, shutil, collections, tempfile
from pathlib import Path
import numpy as np
import cv2
import gradio as gr
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models

# ---- Config / Paths ----
USE_DRIVE = False
BASE_DIR = "/content/drive/MyDrive/Colab Notebooks/Action Recognition"  # your path
DATA_PATH = os.path.join(BASE_DIR, "MP_Data")
MODELS_DIR = os.path.join(BASE_DIR, "models")
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(MODELS_DIR, exist_ok=True)

# ---- MediaPipe for keypoints & drawing ----
import mediapipe as mp
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

# ----------------- Landmark / Keypoint helpers -----------------
def extract_keypoints(results):
    def arr(vals, pad):
        a = np.array(vals).flatten()
        if a.size == 0:
            return np.zeros(pad)
        if a.size < pad:
            a = np.pad(a, (0, pad - a.size))
        return a
    pose = arr([[r.x, r.y, r.z, r.visibility] for r in (results.pose_landmarks.landmark if results.pose_landmarks else [])], 132)
    face = arr([[r.x, r.y, r.z] for r in (results.face_landmarks.landmark if results.face_landmarks else [])], 1404)
    lh   = arr([[r.x, r.y, r.z] for r in (results.left_hand_landmarks.landmark if results.left_hand_landmarks else [])], 63)
    rh   = arr([[r.x, r.y, r.z] for r in (results.right_hand_landmarks.landmark if results.right_hand_landmarks else [])], 63)
    return np.concatenate([pose, face, lh, rh])  # (1662,)

def draw_landmarks_bgr(frame_bgr, results):
    """Draw holistic landmarks on a BGR frame (in-place)."""
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.face_landmarks,
        mp.solutions.holistic.FACEMESH_TESSELATION,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp_styles.get_default_face_mesh_tesselation_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.pose_landmarks,
        mp.solutions.holistic.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.left_hand_landmarks,
        mp.solutions.holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_hand_landmarks_style(),
        connection_drawing_spec=mp_styles.get_default_hand_connections_style(),
    )
    mp_drawing.draw_landmarks(
        frame_bgr,
        results.right_hand_landmarks,
        mp.solutions.holistic.HAND_CONNECTIONS,
        landmark_drawing_spec=mp_styles.get_default_hand_landmarks_style(),
        connection_drawing_spec=mp_styles.get_default_hand_connections_style(),
    )

# ----------------- Video -> sequence -----------------
def video_to_sequence_keypoints(video_path, sequence_length=30, stride=1):
    cap = cv2.VideoCapture(video_path)
    frames = []
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        i = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if i % max(1, int(stride)) != 0:
                i += 1
                continue
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img.flags.writeable = False
            results = holistic.process(img)
            img.flags.writeable = True
            frames.append(extract_keypoints(results))
            if len(frames) >= sequence_length:
                break
            i += 1
    cap.release()
    if len(frames) == 0:
        frames = [np.zeros(1662) for _ in range(sequence_length)]
    elif len(frames) < sequence_length:
        last = frames[-1]
        frames += [last] * (sequence_length - len(frames))
    else:
        frames = frames[:sequence_length]
    return np.stack(frames, axis=0)  # (T,1662)

# ----------------- Annotate a clip with landmarks (NEW) -----------------
def annotate_video_with_landmarks(in_path, out_w=640, out_h=360, fps=24):
    """
    Reads video file, draws holistic landmarks, writes an annotated .mp4 to a temp file.
    Returns output_path and a single preview frame (RGB ndarray).
    """
    cap = cv2.VideoCapture(in_path)
    if not cap.isOpened():
        return None, None
    # Infer fps/size if available
    in_fps = cap.get(cv2.CAP_PROP_FPS)
    if in_fps and in_fps > 0:
        fps = int(in_fps)
    out_path = os.path.join(tempfile.gettempdir(), f"annot_{int(time.time())}.mp4")
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(out_path, fourcc, fps, (out_w, out_h))
    preview = None

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.resize(frame, (out_w, out_h))
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            rgb.flags.writeable = False
            results = holistic.process(rgb)
            rgb.flags.writeable = True
            # draw on BGR
            draw_landmarks_bgr(frame, results)
            if preview is None:
                preview = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            writer.write(frame)
    cap.release()
    writer.release()
    return out_path, preview

# ---- Image helpers ----
def fig_to_array(fig):
    buf = io.BytesIO()
    fig.savefig(buf, format="png", bbox_inches="tight")
    buf.seek(0)
    arr = np.array(Image.open(buf).convert("RGB"))
    buf.close()
    return arr

def plot_confmat(cm, labels):
    fig = plt.figure(figsize=(4 + 0.3 * len(labels), 4 + 0.3 * len(labels)))
    plt.imshow(cm, interpolation='nearest')
    plt.title("Confusion Matrix")
    plt.colorbar()
    ticks = np.arange(len(labels))
    plt.xticks(ticks, labels, rotation=45, ha="right")
    plt.yticks(ticks, labels)
    thresh = cm.max() / 2
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, int(cm[i, j]), ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    arr = fig_to_array(fig)
    plt.close(fig)
    return arr

# ---- Dataset & Model helpers (unchanged) ----
def list_actions():
    return sorted([d for d in os.listdir(DATA_PATH) if os.path.isdir(os.path.join(DATA_PATH, d))])

def count_sequences(action):
    return len([p for p in glob.glob(os.path.join(DATA_PATH, action, "*")) if os.path.isdir(p)])

def save_sequence(action, sequence_array):
    act_dir = os.path.join(DATA_PATH, action)
    os.makedirs(act_dir, exist_ok=True)
    next_id = count_sequences(action) + 1
    seq_dir = os.path.join(act_dir, str(next_id))
    os.makedirs(seq_dir, exist_ok=True)
    for i, f in enumerate(sequence_array):
        np.save(os.path.join(seq_dir, f"{i}.npy"), f)
    return next_id

def load_dataset(actions=None, sequence_length=30):
    if not actions:
        actions = list_actions()
    a2i = {a: i for i, a in enumerate(actions)}
    X, y = [], []
    for a in actions:
        seq_dirs = sorted(
            [d for d in glob.glob(os.path.join(DATA_PATH, a, "*")) if os.path.isdir(d)],
            key=lambda p: int(os.path.basename(p)) if os.path.basename(p).isdigit() else 0
        )
        for sd in seq_dirs:
            frames = []
            for i in range(int(sequence_length)):
                fpath = os.path.join(sd, f"{i}.npy")
                frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
            X.append(np.stack(frames))
            y.append(a2i[a])
    if len(X) == 0:
        return None, None, actions
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.int64), actions

def build_lstm_model(num_classes, sequence_length=30, feature_dim=1662,
                     lstm_units=128, dense_units=64, dropout=0.3, force_cpu=False):
    lstm_kwargs = dict(
        return_sequences=True,
        activation="tanh",
        recurrent_activation="sigmoid",
        implementation=2,
        recurrent_dropout=0.0,
        dropout=0.0,
    )
    inputs = layers.Input(shape=(sequence_length, feature_dim))
    x = layers.LSTM(lstm_units, **lstm_kwargs)(inputs)
    x = layers.Dropout(dropout)(x)
    x = layers.LSTM(lstm_units, activation="tanh", recurrent_activation="sigmoid",
                    implementation=2, recurrent_dropout=0.0, dropout=0.0)(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(dense_units, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

# ==================== Tab 1 handlers (Collect) ====================
def create_action_folder(action_name_text, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value=""), "0 / 0"
    p = os.path.join(DATA_PATH, action_name_text)
    os.makedirs(p, exist_ok=True)
    current = count_sequences(action_name_text)
    total = int(target_count)
    return f"Using folder: {p}", gr.update(value=action_name_text), f"{current} / {total}"

# >>> CHANGED: now also returns an annotated preview video + preview frame
def record_and_save(action_name_text, video, sequence_length, stride, target_count):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Pick/create an action first.", None, None, None
    if video is None:
        return "Record or upload a short clip.", None, None, None

    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length), stride=max(1, int(stride)))
    _ = save_sequence(action_name_text, seq)
    progress = f"{count_sequences(action_name_text)} / {int(target_count)}"

    # Annotated preview for data collection quality check
    out_path, preview = annotate_video_with_landmarks(video)
    status = f"Saved 1 sequence in '{action_name_text}'. Annotated preview generated."
    return status, progress, out_path, (preview if preview is not None else None)

def reset_action(action_name_text, really=False):
    if not action_name_text or str(action_name_text).strip() == "":
        return "Enter an action name.", gr.update(value="")
    if not really:
        return "Tick the checkbox to confirm reset.", gr.update(value=action_name_text)
    p = os.path.join(DATA_PATH, action_name_text)
    if os.path.exists(p):
        shutil.rmtree(p)
    os.makedirs(p, exist_ok=True)
    return f"Cleared data for '{action_name_text}'.", gr.update(value=action_name_text)

# ==================== Tab 2 handler (Train/Evaluate) ====================
def train_model(sequence_length, test_size, epochs, batch_size):
    X, y, actions = load_dataset(sequence_length=int(sequence_length))
    if X is None:
        return "Dataset empty. Collect sequences first.", None, None, None

    # --- sanitize & normalize (same as before) ---
    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y, dtype=np.int32)
    X[~np.isfinite(X)] = 0.0

    mean = X.mean(axis=(0, 1), keepdims=True)      # shape (1,1,1662)
    std  = X.std(axis=(0, 1), keepdims=True) + 1e-6
    X = (X - mean) / std

    counts = np.bincount(y, minlength=len(actions))
    can_strat = (len(X) > len(actions)) and np.all(counts >= 2)

    if can_strat:
        tsize = min(float(test_size), 0.2)
        Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=tsize, random_state=42, stratify=y)
        val_data = (Xte, yte)
        msg = f"Stratified split. Class counts: {dict(zip(actions, counts))}"
    else:
        Xtr, ytr = X, y
        val_data = None
        msg = f"No validation split (class counts: {dict(zip(actions, counts))})."

    bs = max(1, min(int(batch_size), len(Xtr)))

    model = build_lstm_model(num_classes=len(actions), sequence_length=int(sequence_length))
    history = model.fit(Xtr, ytr, validation_data=val_data, epochs=int(epochs), batch_size=bs, verbose=0)

    # --- Save artifacts IN THIS ORDER ---
    ts = int(time.time())
    model_path  = os.path.join(MODELS_DIR, f"lstm_{ts}.keras")
    labels_path = os.path.join(MODELS_DIR, f"lstm_{ts}.labels.json")
    model.save(model_path)
    with open(labels_path, "w") as f:
        json.dump(actions, f)

    # normalization bundle (must happen AFTER ts defined)
    norm_path = os.path.join(MODELS_DIR, f"lstm_{ts}.norm.npz")
    np.savez_compressed(
        norm_path,
        mean=mean.astype(np.float32),
        std=std.astype(np.float32),
        sequence_length=int(sequence_length)
    )

    # --- Plots & metrics ---
    fig = plt.figure(figsize=(8, 3))
    plt.plot(history.history.get("loss", []), label="train")
    if "val_loss" in history.history:
        plt.plot(history.history["val_loss"], label="val")
    plt.title("Loss"); plt.xlabel("epoch"); plt.ylabel("loss"); plt.legend()
    loss_arr = fig_to_array(fig); plt.close(fig)

    if val_data is not None and len(val_data[0]) > 0:
        Xte, yte = val_data
        ypred = np.argmax(model.predict(Xte, verbose=0), axis=1)
        cm_arr = plot_confmat(confusion_matrix(yte, ypred), actions)
        report = classification_report(yte, ypred, target_names=actions)
    else:
        cm_arr = None
        report = "No validation metrics (insufficient sequences per class). Collect ≥2 per action (10+ preferred)."

    summary = (f"{msg}\nTrained on {len(Xtr)} sequences (batch_size={bs}).\n"
               f"Saved model:\n{model_path}\n{labels_path}\n{norm_path}")
    return summary, loss_arr, cm_arr, report




# ==================== Tab 3a handlers (Batch Test; same as before) ====================
def _load_model_bundle(model_path, labels_path):
    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)
    # try to find matching normalization file
    base = os.path.splitext(os.path.basename(model_path))[0]   # e.g., "lstm_173023..."
    norm_guess = os.path.join(os.path.dirname(model_path), f"{base}.norm.npz")
    norm = None
    if os.path.exists(norm_guess):
        norm = np.load(norm_guess)
    return model, actions, norm

def _apply_norm(seq, norm):
    if norm is None:
        return seq
    mean = norm["mean"]  # (1,1,1662)
    std  = norm["std"]   # (1,1,1662)
    # reshape to (1,1662) so (T,1662) - (1,1662) broadcasts cleanly
    mean2 = mean.reshape(1, -1).astype(np.float32)
    std2  = std.reshape(1, -1).astype(np.float32)
    return (seq - mean2) / (std2 + 1e-6)


def _nonzero_landmark_ratio(seq):
    # seq shape: (T, 1662). Count frames with any nonzero landmark.
    nonzero = (np.abs(seq).sum(axis=1) > 1e-8).astype(np.float32)
    return float(nonzero.mean()), int(nonzero.sum()), int(seq.shape[0])

def _predict_seq(model, actions, seq, conf_floor=0.40, topk=5):
    logits = model.predict(np.expand_dims(seq, 0), verbose=0)[0]
    idx = np.argsort(logits)[::-1]
    idx = idx[:min(topk, len(actions))]
    top = [(actions[i], float(logits[i])) for i in idx]
    max_lab, max_prob = top[0]
    status = "ok" if max_prob >= conf_floor else "low_confidence"
    return top, status

def predict_from_webcam(model_path, labels_path, video, sequence_length, stride):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if video is None:
        return "Record a short clip.", None

    model, actions, norm = _load_model_bundle(model_path, labels_path)

    # Basic sanity: model output vs labels
    out_dim = model.output_shape[-1]
    if out_dim != len(actions):
        return (f"Label/model mismatch: model has {out_dim} outputs but {len(actions)} labels. "
                "Make sure the .labels.json matches this .keras."), None

    # Build sequence
    seq = video_to_sequence_keypoints(video, sequence_length=int(sequence_length),
                                      stride=max(1, int(stride)))

    # If we saved a different training sequence length, adapt gracefully
    if norm is not None and "sequence_length" in norm.files:
        train_T = int(norm["sequence_length"])
        if seq.shape[0] != train_T:
            # center-crop or pad with last frame to match train length
            if seq.shape[0] > train_T:
                start = max(0, (seq.shape[0]-train_T)//2)
                seq = seq[start:start+train_T]
            else:
                pad_n = train_T - seq.shape[0]
                seq = np.concatenate([seq, np.repeat(seq[-1][None, :], pad_n, axis=0)], axis=0)

    # Apply same normalization
    seq = _apply_norm(seq.astype(np.float32), norm)

    # Diagnostics: landmark presence
    nz_ratio, nz_frames, total_frames = _nonzero_landmark_ratio(seq)
    diag = {"nonzero_ratio": round(nz_ratio, 3),
            "nonzero_frames": nz_frames,
            "total_frames": total_frames}

    # Predict
    top, status = _predict_seq(model, actions, seq, conf_floor=0.40, topk=min(5, len(actions)))
    diag["status"] = status
    diag["top1"] = {"label": top[0][0], "prob": round(top[0][1], 3)}

    # Plot bars
    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions")
    pred_arr = fig_to_array(fig); plt.close(fig)

    return json.dumps({"preds": top, "diag": diag}, indent=2), pred_arr


def predict_from_dataset(model_path, labels_path, action_name_drop, seq_index, sequence_length):
    if not model_path or not labels_path:
        return "Provide model (.keras) and labels (.json).", None
    if not action_name_drop:
        return "Select an action.", None

    seq_dir = os.path.join(DATA_PATH, action_name_drop, str(int(seq_index)))
    if not os.path.isdir(seq_dir):
        return f"No sequence #{int(seq_index)} for '{action_name_drop}'.", None

    model, actions, norm = _load_model_bundle(model_path, labels_path)

    out_dim = model.output_shape[-1]
    if out_dim != len(actions):
        return (f"Label/model mismatch: model has {out_dim} outputs but {len(actions)} labels. "
                "Make sure the .labels.json matches this .keras."), None

    # Load saved frames
    frames = []
    for i in range(int(sequence_length)):
        fpath = os.path.join(seq_dir, f"{i}.npy")
        frames.append(np.load(fpath) if os.path.exists(fpath) else np.zeros(1662))
    seq = np.stack(frames).astype(np.float32)

    # Align to train sequence length if needed
    if norm is not None and "sequence_length" in norm.files:
        train_T = int(norm["sequence_length"])
        if seq.shape[0] != train_T:
            if seq.shape[0] > train_T:
                start = max(0, (seq.shape[0]-train_T)//2)
                seq = seq[start:start+train_T]
            else:
                pad_n = train_T - seq.shape[0]
                seq = np.concatenate([seq, np.repeat(seq[-1][None, :], pad_n, axis=0)], axis=0)

    seq = _apply_norm(seq, norm)

    nz_ratio, nz_frames, total_frames = _nonzero_landmark_ratio(seq)
    diag = {"nonzero_ratio": round(nz_ratio, 3),
            "nonzero_frames": nz_frames,
            "total_frames": total_frames}

    top, status = _predict_seq(model, actions, seq, conf_floor=0.40, topk=min(5, len(actions)))
    diag["status"] = status
    diag["top1"] = {"label": top[0][0], "prob": round(top[0][1], 3)}

    fig = plt.figure(figsize=(6, 3))
    labs = [t[0] for t in top]
    scores = [t[1] for t in top]
    plt.bar(labs, scores)
    plt.ylim(0, 1.0)
    plt.title("Top Predictions (Dataset)")
    pred_arr = fig_to_array(fig); plt.close(fig)
    return json.dumps({"preds": top, "diag": diag}, indent=2), pred_arr


# ==================== Tab 3b handlers (NEW: Live Real-time) ====================
def load_model_and_labels(model_path, labels_path):
    model = tf.keras.models.load_model(model_path, compile=False)
    with open(labels_path, "r") as f:
        actions = json.load(f)
    return model, actions

# Helper to draw prediction bars on frame
def draw_scores_bgr(frame, labels, probs, x=10, y=10, bar_w=180, bar_h=18, gap=6):
    if probs is None or labels is None:
        return frame
    for i, (lab, p) in enumerate(zip(labels, probs)):
        y0 = y + i*(bar_h+gap)
        cv2.rectangle(frame, (x, y0), (x+bar_w, y0+bar_h), (50,50,50), 1)
        w = int(bar_w * float(max(0.0, min(1.0, p))))
        cv2.rectangle(frame, (x, y0), (x+w, y0+bar_h), (0,200,0), -1)
        txt = f"{lab}: {p:.2f}"
        cv2.putText(frame, txt, (x+bar_w+10, y0+bar_h-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
    return frame

# >>> NEW: stateful stream function
def realtime_stream(video_input, state, seq_len, top_k):
    """
    video_input: Path to the video file recorded by gr.Video
    state: dict with keys {'deque': deque, 'model': None or model, 'labels': None or list}
    Returns: (annotated_frame, json_topk, updated_state)
    """
    if video_input is None:
        return None, "{}", state

    # initialize state
    if state is None or not isinstance(state, dict) or 'deque' not in state:
        state = {'deque': collections.deque(maxlen=int(seq_len)), 'model': None, 'labels': None}

    # lazy-load model if paths provided in state
    model_path = state.get("model_path")
    labels_path = state.get("labels_path")
    if state.get("model") is None and model_path and labels_path and os.path.exists(model_path) and os.path.exists(labels_path):
        model, labels = load_model_and_labels(model_path, labels_path)
        state["model"] = model
        state["labels"] = labels

    cap = cv2.VideoCapture(video_input)
    annotated_frame = None
    top_json = "{}"

    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            bgr = frame # frame is already BGR from cv2.VideoCapture

            # MediaPipe pass
            rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
            rgb.flags.writeable = False
            results = holistic.process(rgb)
            rgb.flags.writeable = True
            draw_landmarks_bgr(bgr, results)
            key = extract_keypoints(results)
            state['deque'].append(key)

            if state.get("model") is not None and len(state['deque']) >= int(seq_len):
                seq = np.stack(list(state['deque']))[-int(seq_len):]
                logits = state['model'].predict(np.expand_dims(seq, 0), verbose=0)[0]
                idx = np.argsort(logits)[::-1][:int(top_k)]
                labs = [state['labels'][i] for i in idx]
                probs = [float(logits[i]) for i in idx]
                # Draw overlay bars
                draw_scores_bgr(bgr, labs, probs, x=10, y=10)
                # Put top-1 big label
                cv2.putText(bgr, labs[0], (10, bgr.shape[0]-20), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,255), 2, cv2.LINE_AA)
                top_json = json.dumps(list(zip(labs, probs)), indent=2)

            annotated_frame = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
            # In a real streaming scenario, you would yield frames here.
            # For simplicity with gr.Video, we'll just process the whole clip
            # and return the last frame or an annotated video path.
            # Since the goal is *real-time*, we should aim to process frame-by-frame.
            # However, gr.Video's stream method is designed to pass the *file path* after recording finishes.
            # To do frame-by-frame, gr.Image(live=True) is the intended way.
            # Given that live=True is not working, let's reconsider the approach.
            # Perhaps the 'streaming' parameter on gr.Video is needed after all, or a different event.

            # Let's revert to gr.Image with live=True and investigate why it failed.
            # The most common reason for Image.__init__() errors with 'live' is an old Gradio version.
            # Let's add a print statement to check the Gradio version.

            pass # Process only the last frame for now with gr.Video

    cap.release()
    # For gr.Video input, returning the last processed frame or an annotated video file path makes sense.
    # Let's return the last frame and the predictions.

    return annotated_frame, top_json, state


# >>> NEW: helpers to set model paths into the live state
def set_live_model_paths(state, model_path, labels_path):
    if state is None or not isinstance(state, dict) or 'deque' not in state:
        state = {'deque': collections.deque(maxlen=30), 'model': None, 'labels': None}
    state['model_path'] = model_path
    state['labels_path'] = labels_path
    # reset loaded model so it reloads with new paths
    state['model'] = None
    state['labels'] = None
    return state, f"Live model set:\n{model_path}\n{labels_path}"

# ==================== Gradio UI ====================
with gr.Blocks(title="Action Detection — Collect / Train / Test (Real-time)") as demo:
    gr.Markdown(f"**Base directory:** `{BASE_DIR}`  \nKeypoints → `{DATA_PATH}`  \nModels → `{MODELS_DIR}`")

    # Tab 1: Collect (now with landmark preview)
    with gr.Tab("1) Collect"):
        with gr.Row():
            action_name_tb = gr.Textbox(label="Action name", placeholder="e.g., wave, hello, thanks")
            target_count = gr.Number(label="Target sequences", value=30, precision=0)
            create_btn = gr.Button("Create / Select Folder")
        status1 = gr.Markdown("")
        progress = gr.Textbox(label="Progress", value="0 / 30", interactive=False)

        gr.Markdown("**Record a short clip (5–10s) for ONE sequence**. We will save keypoints and also render an annotated preview with landmarks.")
        with gr.Row():
            sequence_length = gr.Slider(10, 60, value=30, step=1, label="Frames per sequence")
            stride = gr.Slider(1, 5, value=1, step=1, label="Stride (use every Nth frame)")
        video_in = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
        save_btn = gr.Button("Record & Save (one sequence)")

        # >>> NEW: annotated preview outputs
        anno_path = gr.Textbox(label="Annotated preview file (.mp4)", interactive=False)
        anno_frame = gr.Image(label="Preview (first annotated frame)", visible=True)

        really = gr.Checkbox(label="Really reset this action's data?")
        reset_btn = gr.Button("Reset Action Folder", variant="stop")

        create_btn.click(create_action_folder, [action_name_tb, target_count], [status1, action_name_tb, progress])
        save_btn.click(
            record_and_save,
            [action_name_tb, video_in, sequence_length, stride, target_count],
            [status1, progress, anno_path, anno_frame]
        )
        reset_btn.click(reset_action, [action_name_tb, really], [status1, action_name_tb])

    # Tab 2: Train / Evaluate
    with gr.Tab("2) Train / Evaluate"):
        with gr.Row():
            seq_len_train = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
            test_size = gr.Slider(0.1, 0.5, value=0.2, step=0.05, label="Test size")
        with gr.Row():
            epochs = gr.Slider(1, 50, value=12, step=1, label="Epochs")
            batch = gr.Slider(4, 64, value=16, step=4, label="Batch size")
        train_btn = gr.Button("Train LSTM")
        train_msg = gr.Textbox(label="Summary", lines=4)
        loss_img = gr.Image(label="Training Loss")
        cm_img = gr.Image(label="Confusion Matrix")
        report_txt = gr.Textbox(label="Classification Report", lines=14)
        train_btn.click(train_model, [seq_len_train, test_size, epochs, batch], [train_msg, loss_img, cm_img, report_txt])

    # Tab 3a: Batch Test (existing)
    with gr.Tab("3a) Test (clip based)"):
        gr.Markdown("Test with a short **webcam/uploaded clip** or an existing **dataset sequence**.")
        with gr.Row():
            with gr.Column():
                gr.Markdown("**Webcam/Upload Clip Test**")
                model_path = gr.Textbox(label="Model .keras path")
                labels_path = gr.Textbox(label="Labels .json path")
                seq_len_inf = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                stride_inf = gr.Slider(1, 5, value=1, step=1, label="Stride")
                vid_inf = gr.Video(label="Video (webcam or upload)", sources=["webcam", "upload"], height=300)
                infer_btn = gr.Button("Predict (Clip)")
                preds_json = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot = gr.Image(label="Scores")
                infer_btn.click(predict_from_webcam, [model_path, labels_path, vid_inf, seq_len_inf, stride_inf], [preds_json, preds_plot])

            with gr.Column():
                gr.Markdown("**Dataset Test**")
                action_sel = gr.Dropdown(label="Action", choices=list_actions())
                seq_idx = gr.Number(label="Sequence #", value=1, precision=0)
                seq_len_ds = gr.Slider(10, 60, value=30, step=1, label="Sequence length")
                infer_ds_btn = gr.Button("Predict (Dataset)")
                preds_json_ds = gr.Textbox(label="Top-k predictions (JSON)", lines=8)
                preds_plot_ds = gr.Image(label="Scores")
                infer_ds_btn.click(predict_from_dataset, [model_path, labels_path, action_sel, seq_idx, seq_len_ds], [preds_json_ds, preds_plot_ds])

    # >>> NEW Tab 3b: True Real-time (streaming)
    with gr.Tab("3b) Live Test (real-time)"):
        gr.Markdown("Live, frame-by-frame predictions with a sliding window and on-frame landmarks & scores.")
        live_state = gr.State({'deque': collections.deque(maxlen=30), 'model': None, 'labels': None})

        with gr.Row():
            live_model_path = gr.Textbox(label="Model .keras path (for live)")
            live_labels_path = gr.Textbox(label="Labels .json path (for live)")
            seq_len_live = gr.Slider(10, 60, value=30, step=1, label="Sequence length (window)")
            topk_live = gr.Slider(1, 5, value=3, step=1, label="Top-K to display")

        set_live_btn = gr.Button("Use these paths for LIVE")
        set_msg = gr.Textbox(label="Status", interactive=False)
        set_live_btn.click(set_live_model_paths, [live_state, live_model_path, live_labels_path], [live_state, set_msg])

        # Attempting gr.Video with sources=["webcam"] instead of gr.Image(live=True)
        cam_live = gr.Video(label="Webcam (live)", sources=["webcam"], height=360)
        out_live = gr.Image(label="Annotated (live)")
        json_live = gr.Textbox(label="Top-K (JSON)", lines=6)

        # Stream frames from gr.Video → model
        # cam_live.stream(  # This caused AttributeError
        #     fn=realtime_stream,
        #     inputs=[cam_live, live_state, seq_len_live, topk_live],
        #     outputs=[out_live, json_live, live_state],
        #     time_limit=None
        # )

        # Since stream() is not available, we'll disable the live tab for now or need a different approach
        gr.Markdown("Real-time streaming is not supported with the current Gradio version/setup.")


demo.launch(share=True, server_name="0.0.0.0", show_error=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://5265add320c3384bdb.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


