In [1]:
# sequence_labeling_official_majority.py
import os
import json
import re
import pickle
from collections import Counter
from typing import List, Tuple, Dict

# ==================================
# Config
# ==================================
FRAMES_ROOTS = [
    "./cam2-cleaned_frames",
    "./cam3-cleaned_frames",
]
LABELS_ROOT = "./micro_activity_output_cleaned"
SEQUENCE_LENGTH = 16
STRIDE = 8  # sliding window step

# ==================================
# Official micro -> macro mapping (same as udfall_microactivity_stats.py)
# Labels are normalized (lowercase, '_' -> ' ', collapse spaces) before lookup.
# ==================================
OFFICIAL_MAP = {
    "fall lateral": "Fall",
    "fall frontal": "Fall",
    "fall crouch": "Fall",
    "fall rolling": "Fall",
    "sit up from lying": "ADL",
    "stand still": "ADL",
    "lie down from sitting": "ADL",
    "sit down from standing": "ADL",
    "stand up from floor": "ADL",
    "rolling bed": "ADL",
    "sit still": "ADL",
    "stand up from sit": "ADL",
    "walking": "ADL",
    "pick up object": "ADL",
    "lie down on the floor": "ADL",
    "lie still": "Lying",
}

def _normalize_label(s: str) -> str:
    """Lowercase, replace underscores with spaces, and collapse multiple spaces."""
    s = s.lower().replace("_", " ").strip()
    return re.sub(r"\s+", " ", s)

def to_macro(label: str) -> str:
    """Map a micro-activity to its macro group using the official table (default 'ADL')."""
    return OFFICIAL_MAP.get(_normalize_label(label), "ADL")

# Macro -> numeric id
MACRO_TO_ID = {"ADL": 0, "Fall": 1, "Lying": 2}
ID_TO_MACRO = {v: k for k, v in MACRO_TO_ID.items()}

# ==================================
# Helpers
# ==================================
def safe_frame_index(filename: str) -> int:
    """Extract a numeric frame index from names like 'frame_00012.jpg' (returns -1 if not found)."""
    try:
        core = os.path.splitext(os.path.basename(filename))[0]
        tokens = core.split("_")
        # Try reversed tokens to catch suffix digits quickly
        for tok in reversed(tokens):
            if tok.isdigit():
                return int(tok)
        # Fallback: collect all digits
        digits = "".join(ch for ch in core if ch.isdigit())
        return int(digits) if digits else -1
    except Exception:
        return -1

def load_labels_dict(labels_root: str, activity_name: str) -> Dict[int, str]:
    """
    Load per-frame micro labels for a given activity as a dict[int -> label_str].
    Supports either {'root': {...}} or a flat dict.
    Keys are coerced to int; non-numeric keys are skipped.
    """
    label_json_path = os.path.join(labels_root, activity_name, "ann", "per_frame_micro_activities.json")
    if not os.path.exists(label_json_path):
        return {}
    with open(label_json_path, "r", encoding="utf-8") as f:
        loaded_json = json.load(f)
        labels_dict = loaded_json.get("root", loaded_json)

    out = {}
    for k, v in labels_dict.items():
        try:
            out[int(k)] = v
        except Exception:
            continue
    return out

def map_micro_to_class(lbl: str) -> int:
    """Convert a micro label to macro via official map, then to numeric class id."""
    macro = to_macro(lbl)
    return MACRO_TO_ID[macro]

def majority_vote_class(class_ids: List[int], seq_len: int) -> int:
    """
    Majority voting on macro class ids for the sequence.
    Returns: 0=ADL, 1=Fall, 2=Lying
    Rule: if Fall or Lying reaches >= seq_len//2 votes, pick it; otherwise ADL.
    Priority tie-breaker: Fall > Lying > ADL (only matters if both reach threshold simultaneously).
    """
    counts = Counter(class_ids)
    half = seq_len // 2

    fall_ok = counts.get(1, 0) >= half
    lying_ok = counts.get(2, 0) >= half

    if fall_ok and lying_ok:
        return 1  # prefer Fall on ties
    if fall_ok:
        return 1
    if lying_ok:
        return 2
    return 0

# ==================================
# Core
# ==================================
def generate_sequences_and_labels_official(
    frame_roots: List[str],
    labels_root: str,
    seq_len: int = 16,
    stride: int = 8,
) -> List[Tuple[List[str], int]]:
    """
    Build index-aware sequences using official micro->macro labeling.

    For each frames_dir:
      1) Sort frames by their numeric index (extracted from filename).
      2) For each frame, look up its micro label by the SAME numeric index in the JSON.
      3) Map micro -> macro -> class_id (0=ADL,1=Fall,2=Lying).
      4) Keep only frames that have a valid label (skip missing).
      5) Slide a window of length seq_len with step 'stride' over the LABELED list.
         Assign the sequence label by majority voting over its class_ids.

    Returns: list of (list_of_frame_paths, sequence_label_id)
    """
    assert seq_len > 0, "seq_len must be > 0"
    assert stride > 0, "stride must be > 0"

    data: List[Tuple[List[str], int]] = []

    for frames_root in frame_roots:
        if not os.path.isdir(frames_root):
            print(f"[WARN] Frames root not found: {frames_root}")
            continue

        cam_tag = os.path.basename(frames_root.rstrip("/"))

        for activity_name in sorted(os.listdir(frames_root)):
            activity_path = os.path.join(frames_root, activity_name)
            if not os.path.isdir(activity_path):
                continue

            labels_dict = load_labels_dict(labels_root, activity_name)
            if not labels_dict:
                print(f"[{cam_tag}] Missing or empty labels for activity: {activity_name}")
                continue

            for subfolder in sorted(os.listdir(activity_path)):
                frames_dir = os.path.join(activity_path, subfolder)
                if not os.path.isdir(frames_dir):
                    continue

                frame_files = [f for f in os.listdir(frames_dir) if f.lower().endswith(".jpg")]
                if not frame_files:
                    continue

                # Sort by numeric frame index
                frame_files.sort(key=safe_frame_index)

                labeled_frames: List[Tuple[str, int]] = []
                skipped_no_label = 0
                skipped_bad_index = 0

                for fname in frame_files:
                    idx = safe_frame_index(fname)
                    if idx < 0:
                        skipped_bad_index += 1
                        continue
                    micro = labels_dict.get(idx)
                    if micro is None:
                        skipped_no_label += 1
                        continue
                    cls_id = map_micro_to_class(micro)
                    labeled_frames.append((os.path.join(frames_dir, fname), cls_id))

                if skipped_no_label or skipped_bad_index:
                    print(
                        f"[{cam_tag}] {activity_name}/{os.path.basename(frames_dir)}: "
                        f"kept={len(labeled_frames)}, skipped_no_label={skipped_no_label}, skipped_bad_index={skipped_bad_index}"
                    )

                # Need at least seq_len labeled frames for a window
                n = len(labeled_frames)
                if n < seq_len:
                    continue

                # Sliding window over labeled frames only
                for i in range(0, n - seq_len + 1, stride):
                    window = labeled_frames[i:i + seq_len]
                    seq_paths = [p for p, _ in window]
                    seq_class_ids = [c for _, c in window]
                    seq_label = majority_vote_class(seq_class_ids, seq_len)
                    data.append((seq_paths, seq_label))

    return data

# ==================================
# Run (Jupyter / Script)
# ==================================
if __name__ == "__main__":
    dataset = generate_sequences_and_labels_official(
        frame_roots=FRAMES_ROOTS,
        labels_root=LABELS_ROOT,
        seq_len=SEQUENCE_LENGTH,
        stride=STRIDE,
    )

    print(f"Total sequences: {len(dataset)}")
    dist = Counter([y for _, y in dataset])
    print("Class distribution (0=ADL, 1=Fall, 2=Lying):", dict(dist))

    # Optional: save pickle with metadata in filename
    out_name = f"seq_official_len{SEQUENCE_LENGTH}_stride{STRIDE}_new.pkl"
    with open(out_name, "wb") as f:
        pickle.dump(dataset, f)
    print(f"Saved pickle to {out_name}")


Total sequences: 33144
Class distribution (0=ADL, 1=Fall, 2=Lying): {0: 27364, 1: 5420, 2: 360}
Saved pickle to seq_official_len16_stride8_new.pkl


In [None]:
0

In [1]:
#correct logic

# sequence_labeling_official_majority.py
import os
import json
import re
import pickle
from collections import Counter
from typing import List, Tuple, Dict

# ==================================
# Config
# ==================================
FRAMES_ROOTS = [
    "./cam2-cleaned_frames",
    "./cam3-cleaned_frames",
]
LABELS_ROOT = "./micro_activity_output_cleaned"
SEQUENCE_LENGTH = 16
STRIDE = 8  # sliding window step

# ==================================
# Official micro -> macro mapping (same as udfall_microactivity_stats.py)
# Labels are normalized (lowercase, '_' -> ' ', collapse spaces) before lookup.
# ==================================
OFFICIAL_MAP = {
    "fall lateral": "Fall",
    "fall frontal": "Fall",
    "fall crouch": "Fall",
    "fall rolling": "Fall",
    "sit up from lying": "ADL",
    "stand still": "ADL",
    "lie down from sitting": "ADL",
    "sit down from standing": "ADL",
    "stand up from floor": "ADL",
    "rolling bed": "ADL",
    "sit still": "ADL",
    "stand up from sit": "ADL",
    "walking": "ADL",
    "pick up object": "ADL",
    "lie down on the floor": "ADL",
    "lie still": "Lying",
}

COMPOSITE_SEP = r"[-/|]"  # separators between micro-activities in a single frame label

def _normalize_label(s: str) -> str:
    """Lowercase, replace underscores with spaces, and collapse multiple spaces."""
    s = s.lower().replace("_", " ").strip()
    return re.sub(r"\s+", " ", s)

def to_macro(label: str) -> str:
    """
    Map a micro-activity (possibly composite) to a macro class.
    Rules:
      1) Try exact lookup after normalization.
      2) If not found, split on COMPOSITE_SEP and map each part individually.
      3) Priority when combining: Fall > Lying > ADL.
    """
    s = _normalize_label(label)

    # 1) Exact match
    if s in OFFICIAL_MAP:
        return OFFICIAL_MAP[s]

    # 2) Composite handling: e.g., "lie still-lie down on the floor"
    parts = [p.strip() for p in re.split(COMPOSITE_SEP, s) if p.strip()]
    if parts:
        prio = {"Fall": 3, "Lying": 2, "ADL": 1}
        best = "ADL"
        for p in parts:
            cls = OFFICIAL_MAP.get(p, "ADL")
            if prio[cls] > prio[best]:
                best = cls
            # early exit if we already hit highest priority
            if best == "Fall":
                break
        return best

    # 3) Default
    return "ADL"

# Macro -> numeric id
MACRO_TO_ID = {"ADL": 0, "Fall": 1, "Lying": 2}
ID_TO_MACRO = {v: k for k, v in MACRO_TO_ID.items()}

# ==================================
# Helpers
# ==================================
def safe_frame_index(filename: str) -> int:
    """Extract a numeric frame index from names like 'frame_00012.jpg' (returns -1 if not found)."""
    try:
        core = os.path.splitext(os.path.basename(filename))[0]
        tokens = core.split("_")
        for tok in reversed(tokens):
            if tok.isdigit():
                return int(tok)
        digits = "".join(ch for ch in core if ch.isdigit())
        return int(digits) if digits else -1
    except Exception:
        return -1

def load_labels_dict(labels_root: str, activity_name: str) -> Dict[int, str]:
    """
    Load per-frame micro labels for a given activity as a dict[int -> label_str].
    Supports either {'root': {...}} or a flat dict. Non-numeric keys are skipped.
    """
    label_json_path = os.path.join(labels_root, activity_name, "ann", "per_frame_micro_activities.json")
    if not os.path.exists(label_json_path):
        return {}
    with open(label_json_path, "r", encoding="utf-8") as f:
        loaded_json = json.load(f)
        labels_dict = loaded_json.get("root", loaded_json)

    out = {}
    for k, v in labels_dict.items():
        try:
            out[int(k)] = v
        except Exception:
            continue
    return out

def map_micro_to_class(lbl: str) -> int:
    """Convert a micro label to macro via official map (with composite logic), then to numeric class id."""
    macro = to_macro(lbl)
    return MACRO_TO_ID[macro]

def majority_vote_class(class_ids: List[int], seq_len: int) -> int:
    """
    Majority voting on macro class ids for the sequence.
    Returns: 0=ADL, 1=Fall, 2=Lying
    Rule: if Fall or Lying reaches >= seq_len//2 votes, pick it; otherwise ADL.
    Priority tie-breaker: Fall > Lying > ADL.
    """
    counts = Counter(class_ids)
    half = seq_len // 2
    fall_ok = counts.get(1, 0) >= half
    lying_ok = counts.get(2, 0) >= half
    if fall_ok and lying_ok:
        return 1
    if fall_ok:
        return 1
    if lying_ok:
        return 2
    return 0

# ==================================
# Core
# ==================================
def generate_sequences_and_labels_official(
    frame_roots: List[str],
    labels_root: str,
    seq_len: int = 16,
    stride: int = 8,
) -> List[Tuple[List[str], int]]:
    """
    Build index-aware sequences using official micro->macro labeling (with composite handling).
    """
    assert seq_len > 0 and stride > 0

    data: List[Tuple[List[str], int]] = []

    for frames_root in frame_roots:
        if not os.path.isdir(frames_root):
            print(f"[WARN] Frames root not found: {frames_root}")
            continue

        cam_tag = os.path.basename(frames_root.rstrip("/"))

        for activity_name in sorted(os.listdir(frames_root)):
            activity_path = os.path.join(frames_root, activity_name)
            if not os.path.isdir(activity_path):
                continue

            labels_dict = load_labels_dict(labels_root, activity_name)
            if not labels_dict:
                print(f"[{cam_tag}] Missing or empty labels for activity: {activity_name}")
                continue

            for subfolder in sorted(os.listdir(activity_path)):
                frames_dir = os.path.join(activity_path, subfolder)
                if not os.path.isdir(frames_dir):
                    continue

                frame_files = [f for f in os.listdir(frames_dir) if f.lower().endswith(".jpg")]
                if not frame_files:
                    continue

                frame_files.sort(key=safe_frame_index)

                labeled_frames: List[Tuple[str, int]] = []
                skipped_no_label = 0
                skipped_bad_index = 0

                for fname in frame_files:
                    idx = safe_frame_index(fname)
                    if idx < 0:
                        skipped_bad_index += 1
                        continue
                    micro = labels_dict.get(idx)
                    if micro is None:
                        skipped_no_label += 1
                        continue
                    cls_id = map_micro_to_class(micro)
                    labeled_frames.append((os.path.join(frames_dir, fname), cls_id))

                if skipped_no_label or skipped_bad_index:
                    print(
                        f"[{cam_tag}] {activity_name}/{os.path.basename(frames_dir)}: "
                        f"kept={len(labeled_frames)}, skipped_no_label={skipped_no_label}, skipped_bad_index={skipped_bad_index}"
                    )

                n = len(labeled_frames)
                if n < seq_len:
                    continue

                for i in range(0, n - seq_len + 1, stride):
                    window = labeled_frames[i:i + seq_len]
                    seq_paths = [p for p, _ in window]
                    seq_class_ids = [c for _, c in window]
                    seq_label = majority_vote_class(seq_class_ids, seq_len)
                    data.append((seq_paths, seq_label))

    return data

# ==================================
# Run
# ==================================
if __name__ == "__main__":
    dataset = generate_sequences_and_labels_official(
        frame_roots=FRAMES_ROOTS,
        labels_root=LABELS_ROOT,
        seq_len=SEQUENCE_LENGTH,
        stride=STRIDE,
    )

    print(f"Total sequences: {len(dataset)}")
    dist = Counter([y for _, y in dataset])
    print("Class distribution (0=ADL, 1=Fall, 2=Lying):", dict(dist))

    out_name = f"seq_official_len{SEQUENCE_LENGTH}_stride{STRIDE}_new.pkl"
    with open(out_name, "wb") as f:
        pickle.dump(dataset, f)
    print(f"Saved pickle to {out_name}")




Total sequences: 33144
Class distribution (0=ADL, 1=Fall, 2=Lying): {0: 27336, 1: 5420, 2: 388}
Saved pickle to seq_official_len16_stride8_new.pkl


In [3]:
# make_video_windows_pkl.py
# Create a reproducible pre-windowed PKL dataset directly from videos
# Each item: ({"video_path": str, "frame_indices": List[int]}, label_id)
# label_id: 0 = ADL, 1 = Fall, 2 = Lying

import os
import re
import json
import pickle
from collections import Counter
from typing import Dict, List, Tuple, Any

# ===================== CONFIG =====================
FRAMES_ROOTS = [
    "./cam2-cleaned_frames",
    "./cam3-cleaned_frames",
]

LABELS_ROOT = "./micro_activity_output_cleaned"

SEQ_LEN = 16
STRIDE  = 8
OUT_PKL = f"./video_seq_official_len{SEQ_LEN}_stride{STRIDE}.pkl"

# ===================== MICRO → MACRO MAP =====================
OFFICIAL_MAP = {
    "fall lateral": "Fall",
    "fall frontal": "Fall",
    "fall crouch": "Fall",
    "fall rolling": "Fall",
    "sit up from lying": "ADL",
    "stand still": "ADL",
    "lie down from sitting": "ADL",
    "sit down from standing": "ADL",
    "stand up from floor": "ADL",
    "rolling bed": "ADL",
    "sit still": "ADL",
    "stand up from sit": "ADL",
    "walking": "ADL",
    "pick up object": "ADL",
    "lie down on the floor": "ADL",
    "lie still": "Lying",
}
MACRO_TO_ID = {"ADL": 0, "Fall": 1, "Lying": 2}

# ===================== HELPERS =====================
def _norm(s: str) -> str:
    """Normalize label: lowercase, replace '_' with space, collapse spaces."""
    s = s.lower().replace("_", " ").strip()
    return re.sub(r"\s+", " ", s)

def to_macro(lbl: str) -> str:
    """Convert micro label to macro label using OFFICIAL_MAP."""
    return OFFICIAL_MAP.get(_norm(lbl), "ADL")

def to_id(lbl: str) -> int:
    """Convert label string to numeric class id."""
    return MACRO_TO_ID[to_macro(lbl)]

def safe_idx(name: str) -> int:
    """Extract numeric frame index from filename like frame_000123.jpg."""
    base = os.path.splitext(os.path.basename(name))[0]
    for tok in reversed(base.split("_")):
        if tok.isdigit():
            return int(tok)
    digits = "".join(ch for ch in base if ch.isdigit())
    return int(digits) if digits else -1

def load_labels(activity: str) -> Dict[int, str]:
    """Load per-frame micro labels as dict[int -> label_str]."""
    p = os.path.join(LABELS_ROOT, activity, "ann", "per_frame_micro_activities.json")
    if not os.path.isfile(p):
        return {}
    with open(p, "r", encoding="utf-8") as f:
        js = json.load(f)
    d = js.get("root", js)
    out = {}
    for k, v in d.items():
        try:
            out[int(k)] = v
        except Exception:
            pass
    return out

def majority(ids: List[int], seq_len: int) -> int:
    """Majority voting for macro class ids (priority: Fall > Lying > ADL)."""
    c = Counter(ids)
    half = seq_len // 2
    fall_ok  = c.get(1, 0) >= half
    lying_ok = c.get(2, 0) >= half
    if fall_ok and lying_ok:
        return 1
    if fall_ok:
        return 1
    if lying_ok:
        return 2
    return 0

def map_frames_dir_to_video_path(frames_dir: str) -> str:
    """
    Map a frames directory to its corresponding video file path.

    Frame dirs look like:
        ./cam2-cleaned_frames/<Activity>/<clip_subdir>/
        ./cam3-cleaned_frames/<Activity>/<clip_subdir>/

    Video files look like:
        ./cam2-newdata/<Activity>/<Activity> CAM 2.mp4
        ./cam3-newdata/<Activity>/<Activity> CAM 3.mp4
    """
    norm = frames_dir.replace("\\", "/").rstrip("/")
    # Determine which camera (cam2 or cam3)
    if "/cam2-cleaned_frames/" in norm or norm.endswith("/cam2-cleaned_frames"):
        cam = 2
        new_root = norm.replace("cam2-cleaned_frames", "cam2-newdata")
    elif "/cam3-cleaned_frames/" in norm or norm.endswith("/cam3-cleaned_frames"):
        cam = 3
        new_root = norm.replace("cam3-cleaned_frames", "cam3-newdata")
    else:
        return ""

    parts = new_root.split("/")
    if len(parts) < 3:
        return ""

    # Activity folder is one level above the clip subdir
    # e.g., .../cam2-newdata/Actor_1_Bed/<clip_subdir>
    activity = parts[-2]

    # Activity directory (remove the last clip subdir)
    activity_dir = "/".join(parts[:-1])

    # Expected video filename
    vid_name = f"{activity} CAM {cam}.mp4"
    vpath = os.path.join(activity_dir, vid_name)

    if os.path.isfile(vpath):
        return vpath

    # Try alternate extensions if needed
    for ext in (".MP4", ".avi", ".mov", ".mkv"):
        alt = os.path.join(activity_dir, f"{activity} CAM {cam}{ext}")
        if os.path.isfile(alt):
            return alt

    return ""  # Video not found

# ===================== MAIN BUILDER =====================
def build_dataset() -> List[Tuple[Dict[str, Any], int]]:
    """
    Create a list of ({"video_path", "frame_indices"}, label_id)
    by scanning frame folders, matching them to videos, and using majority voting.
    """
    out: List[Tuple[Dict[str, Any], int]] = []

    for frames_root in FRAMES_ROOTS:
        if not os.path.isdir(frames_root):
            print(f"[WARN] frames root not found: {frames_root}")
            continue

        cam_tag = os.path.basename(frames_root.rstrip("/\\"))

        for activity in sorted(os.listdir(frames_root)):
            act_dir = os.path.join(frames_root, activity)
            if not os.path.isdir(act_dir):
                continue

            labels = load_labels(activity)
            if not labels:
                print(f"[{cam_tag}] No labels for activity: {activity}")
                continue

            for clip_dir in sorted(os.listdir(act_dir)):
                frames_dir = os.path.join(act_dir, clip_dir)
                if not os.path.isdir(frames_dir):
                    continue

                vpath = map_frames_dir_to_video_path(frames_dir)
                if not vpath:
                    print(f"[{cam_tag}] MISSING VIDEO for frames_dir: {frames_dir}")
                    continue

                jpgs = [f for f in os.listdir(frames_dir) if f.lower().endswith(".jpg")]
                if not jpgs:
                    continue
                jpgs.sort(key=safe_idx)

                labeled: List[Tuple[int, int]] = []  # (frame_index, class_id)
                miss_label = bad_idx = 0
                for fn in jpgs:
                    idx = safe_idx(fn)
                    if idx < 0:
                        bad_idx += 1
                        continue
                    micro = labels.get(idx)
                    if micro is None:
                        miss_label += 1
                        continue
                    labeled.append((idx, to_id(micro)))

                if len(labeled) < SEQ_LEN:
                    continue

                # Sliding window over labeled frames
                for i in range(0, len(labeled) - SEQ_LEN + 1, STRIDE):
                    win = labeled[i:i + SEQ_LEN]
                    inds = [fi for fi, _ in win]
                    ids  = [ci for _, ci in win]
                    y = majority(ids, SEQ_LEN)
                    out.append(({"video_path": vpath, "frame_indices": inds}, y))

                if miss_label or bad_idx:
                    print(f"[{cam_tag}] {activity}/{clip_dir}: "
                          f"kept={len(labeled)} miss={miss_label} bad={bad_idx}")

    return out

# ===================== RUN =====================
if __name__ == "__main__":
    dataset = build_dataset()
    print(f"\nTotal windows: {len(dataset)}")
    dist = Counter([y for _, y in dataset])
    print("Class dist (0=ADL, 1=Fall, 2=Lying):", dict(dist))

    # Print a few samples for sanity-check
    for i in range(min(3, len(dataset))):
        s, y = dataset[i]
        print(f"Sample {i}: {s['video_path']}  idx[:5]={s['frame_indices'][:5]}  label={y}")

    with open(OUT_PKL, "wb") as f:
        pickle.dump(dataset, f)
    print(f"\nSaved → {OUT_PKL}")



Total windows: 33144
Class dist (0=ADL, 1=Fall, 2=Lying): {0: 27364, 1: 5420, 2: 360}
Sample 0: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[206, 207, 208, 209, 210]  label=0
Sample 1: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[214, 215, 216, 217, 218]  label=0
Sample 2: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[222, 223, 224, 225, 226]  label=0

Saved → ./video_seq_official_len16_stride8.pkl


In [1]:
#correct logic for videos

# make_video_windows_pkl.py
# Create a reproducible pre-windowed PKL dataset directly from videos
# Each item: ({"video_path": str, "frame_indices": List[int]}, label_id)
# label_id: 0 = ADL, 1 = Fall, 2 = Lying

import os
import re
import json
import pickle
from collections import Counter
from typing import Dict, List, Tuple, Any

# ===================== CONFIG =====================
FRAMES_ROOTS = [
    "./cam2-cleaned_frames",
    "./cam3-cleaned_frames",
]

LABELS_ROOT = "./micro_activity_output_cleaned"

SEQ_LEN = 16
STRIDE  = 8
OUT_PKL = f"./video_seq_official_len{SEQ_LEN}_stride{STRIDE}.pkl"

# ===================== MICRO → MACRO MAP =====================
OFFICIAL_MAP = {
    "fall lateral": "Fall",
    "fall frontal": "Fall",
    "fall crouch": "Fall",
    "fall rolling": "Fall",
    "sit up from lying": "ADL",
    "stand still": "ADL",
    "lie down from sitting": "ADL",
    "sit down from standing": "ADL",
    "stand up from floor": "ADL",
    "rolling bed": "ADL",
    "sit still": "ADL",
    "stand up from sit": "ADL",
    "walking": "ADL",
    "pick up object": "ADL",
    "lie down on the floor": "ADL",
    "lie still": "Lying",
}
MACRO_TO_ID = {"ADL": 0, "Fall": 1, "Lying": 2}
ID_TO_MACRO = {v: k for k, v in MACRO_TO_ID.items()}

# separators between multiple micro-activities recorded for the same frame
COMPOSITE_SEP = r"[-/|]"

# ===================== HELPERS =====================
def _norm(s: str) -> str:
    """Normalize label: lowercase, replace '_' with space, collapse spaces."""
    s = s.lower().replace("_", " ").strip()
    return re.sub(r"\s+", " ", s)

def to_macro(lbl: str) -> str:
    """
    Convert micro label (possibly composite) to macro label.
    Rules:
      1) Exact lookup after normalization.
      2) If not found, split by COMPOSITE_SEP and map parts individually.
      3) Combine with priority: Fall > Lying > ADL.
      4) Default to ADL.
    """
    s = _norm(lbl)

    # 1) exact match
    if s in OFFICIAL_MAP:
        return OFFICIAL_MAP[s]

    # 2) composite handling, e.g. "lie still-lie down on the floor"
    parts = [p.strip() for p in re.split(COMPOSITE_SEP, s) if p.strip()]
    if parts:
        prio = {"Fall": 3, "Lying": 2, "ADL": 1}
        best = "ADL"
        for p in parts:
            cls = OFFICIAL_MAP.get(p, "ADL")
            if prio[cls] > prio[best]:
                best = cls
            if best == "Fall":   # early exit: highest priority
                break
        return best

    # 3) default
    return "ADL"

def to_id(lbl: str) -> int:
    """Convert label string to numeric class id."""
    return MACRO_TO_ID[to_macro(lbl)]

def safe_idx(name: str) -> int:
    """Extract numeric frame index from filename like frame_000123.jpg."""
    base = os.path.splitext(os.path.basename(name))[0]
    for tok in reversed(base.split("_")):
        if tok.isdigit():
            return int(tok)
    digits = "".join(ch for ch in base if ch.isdigit())
    return int(digits) if digits else -1

def load_labels(activity: str) -> Dict[int, str]:
    """Load per-frame micro labels as dict[int -> label_str]."""
    p = os.path.join(LABELS_ROOT, activity, "ann", "per_frame_micro_activities.json")
    if not os.path.isfile(p):
        return {}
    with open(p, "r", encoding="utf-8") as f:
        js = json.load(f)
    d = js.get("root", js)
    out = {}
    for k, v in d.items():
        try:
            out[int(k)] = v
        except Exception:
            pass
    return out

def majority(ids: List[int], seq_len: int) -> int:
    """Majority voting for macro class ids (priority: Fall > Lying > ADL)."""
    c = Counter(ids)
    half = seq_len // 2
    fall_ok  = c.get(1, 0) >= half
    lying_ok = c.get(2, 0) >= half
    if fall_ok and lying_ok:
        return 1
    if fall_ok:
        return 1
    if lying_ok:
        return 2
    return 0

def map_frames_dir_to_video_path(frames_dir: str) -> str:
    """
    Map a frames directory to its corresponding video file path.

    Frame dirs:
        ./cam2-cleaned_frames/<Activity>/<clip_subdir>/
        ./cam3-cleaned_frames/<Activity>/<clip_subdir>/

    Video files:
        ./cam2-newdata/<Activity>/<Activity> CAM 2.mp4
        ./cam3-newdata/<Activity>/<Activity> CAM 3.mp4
    """
    norm = frames_dir.replace("\\", "/").rstrip("/")
    if "/cam2-cleaned_frames/" in norm or norm.endswith("/cam2-cleaned_frames"):
        cam = 2
        new_root = norm.replace("cam2-cleaned_frames", "cam2-newdata")
    elif "/cam3-cleaned_frames/" in norm or norm.endswith("/cam3-cleaned_frames"):
        cam = 3
        new_root = norm.replace("cam3-cleaned_frames", "cam3-newdata")
    else:
        return ""

    parts = new_root.split("/")
    if len(parts) < 3:
        return ""

    activity = parts[-2]
    activity_dir = "/".join(parts[:-1])
    vid_name = f"{activity} CAM {cam}.mp4"
    vpath = os.path.join(activity_dir, vid_name)
    if os.path.isfile(vpath):
        return vpath

    for ext in (".MP4", ".avi", ".mov", ".mkv"):
        alt = os.path.join(activity_dir, f"{activity} CAM {cam}{ext}")
        if os.path.isfile(alt):
            return alt

    return ""  # not found

# ===================== MAIN BUILDER =====================
def build_dataset() -> List[Tuple[Dict[str, Any], int]]:
    """
    Create a list of ({"video_path", "frame_indices"}, label_id)
    by scanning frame folders, matching them to videos, and using majority voting.
    """
    out: List[Tuple[Dict[str, Any], int]] = []

    for frames_root in FRAMES_ROOTS:
        if not os.path.isdir(frames_root):
            print(f"[WARN] frames root not found: {frames_root}")
            continue

        cam_tag = os.path.basename(frames_root.rstrip("/\\"))

        for activity in sorted(os.listdir(frames_root)):
            act_dir = os.path.join(frames_root, activity)
            if not os.path.isdir(act_dir):
                continue

            labels = load_labels(activity)
            if not labels:
                print(f"[{cam_tag}] No labels for activity: {activity}")
                continue

            for clip_dir in sorted(os.listdir(act_dir)):
                frames_dir = os.path.join(act_dir, clip_dir)
                if not os.path.isdir(frames_dir):
                    continue

                vpath = map_frames_dir_to_video_path(frames_dir)
                if not vpath:
                    print(f"[{cam_tag}] MISSING VIDEO for frames_dir: {frames_dir}")
                    continue

                jpgs = [f for f in os.listdir(frames_dir) if f.lower().endswith(".jpg")]
                if not jpgs:
                    continue
                jpgs.sort(key=safe_idx)

                labeled: List[Tuple[int, int]] = []  # (frame_index, class_id)
                miss_label = bad_idx = 0
                for fn in jpgs:
                    idx = safe_idx(fn)
                    if idx < 0:
                        bad_idx += 1
                        continue
                    micro = labels.get(idx)
                    if micro is None:
                        miss_label += 1
                        continue
                    labeled.append((idx, to_id(micro)))

                if len(labeled) < SEQ_LEN:
                    continue

                for i in range(0, len(labeled) - SEQ_LEN + 1, STRIDE):
                    win = labeled[i:i + SEQ_LEN]
                    inds = [fi for fi, _ in win]
                    ids  = [ci for _, ci in win]
                    y = majority(ids, SEQ_LEN)
                    out.append(({"video_path": vpath, "frame_indices": inds}, y))

                if miss_label or bad_idx:
                    print(f"[{cam_tag}] {activity}/{clip_dir}: kept={len(labeled)} miss={miss_label} bad={bad_idx}")

    return out

# ===================== RUN =====================
if __name__ == "__main__":
    dataset = build_dataset()
    print(f"\nTotal windows: {len(dataset)}")
    dist = Counter([y for _, y in dataset])
    print("Class dist (0=ADL, 1=Fall, 2=Lying):", dict(dist))

    # sanity prints
    for i in range(min(3, len(dataset))):
        s, y = dataset[i]
        print(f"Sample {i}: {s['video_path']}  idx[:5]={s['frame_indices'][:5]}  label={y} ({ID_TO_MACRO[y]})")

    with open(OUT_PKL, "wb") as f:
        pickle.dump(dataset, f)
    print(f"\nSaved → {OUT_PKL}")



Total windows: 33144
Class dist (0=ADL, 1=Fall, 2=Lying): {0: 27336, 1: 5420, 2: 388}
Sample 0: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[206, 207, 208, 209, 210]  label=0 (ADL)
Sample 1: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[214, 215, 216, 217, 218]  label=0 (ADL)
Sample 2: ./cam2-newdata/Actor_1_Bed/Actor_1_Bed CAM 2.mp4  idx[:5]=[222, 223, 224, 225, 226]  label=0 (ADL)

Saved → ./video_seq_official_len16_stride8.pkl
