# Notebook with demostration of different methods for augmentation of skeletons that we get from videos with people doing exercises

In [None]:
from __future__ import annotations
import os
from pathlib import Path
from typing import Iterable, Tuple

import numpy as np
import cv2

try:
    from tqdm import tqdm
except Exception:
    def tqdm(x, **kwargs):
        return x

def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)

### Skeleton augmentations

Assumed skeleton array shape: (T, J, 2) — per-frame 2D joints in pixels.
We implement:
- Rotate around image center
- Scale from center
- Translate (pixels)
- Jitter (Gaussian noise)
- Horizontal flip (W-known)
- Temporal resample/crop

If original image width `W` isn’t known, either provide it or normalize skeletons to [-1, 1] first, then flip via x -> -x.


In [26]:
from dataclasses import dataclass

@dataclass
class ImageInfo:
    width: int
    height: int


def skel_rotate(skel: np.ndarray, angle_deg: float, img: ImageInfo) -> np.ndarray:
    cx, cy = img.width / 2.0, img.height / 2.0
    theta = np.deg2rad(angle_deg)
    R = np.array([[np.cos(theta), -np.sin(theta)],
                  [np.sin(theta),  np.cos(theta)]], dtype=np.float32)
    centered = skel - np.array([[cx, cy]], dtype=np.float32)
    rot = centered @ R.T
    return rot + np.array([[cx, cy]], dtype=np.float32)


def skel_scale(skel: np.ndarray, scale: float, img: ImageInfo) -> np.ndarray:
    cx, cy = img.width / 2.0, img.height / 2.0
    centered = skel - np.array([[cx, cy]], dtype=np.float32)
    scaled = centered * scale
    return scaled + np.array([[cx, cy]], dtype=np.float32)


def skel_translate(skel: np.ndarray, dx: float, dy: float) -> np.ndarray:
    return skel + np.array([[dx, dy]], dtype=np.float32)


def skel_jitter(skel: np.ndarray, sigma: float=2.0) -> np.ndarray:
    noise = np.random.normal(0, sigma, size=skel.shape).astype(np.float32)
    return skel + noise


def skel_flip_horizontal(skel: np.ndarray, img: ImageInfo) -> np.ndarray:
    out = skel.copy()
    out[..., 0] = (img.width - 1) - out[..., 0]
    return out


def skel_time_crop(skel: np.ndarray, ratio: float=0.8) -> np.ndarray:
    T = skel.shape[0]
    new_T = max(1, int(T * ratio))
    if new_T >= T:
        return skel
    start = np.random.randint(0, T - new_T + 1)
    return skel[start:start+new_T]


def skel_time_resample(skel: np.ndarray, new_T: int) -> np.ndarray:
    T = skel.shape[0]
    if new_T <= 0:
        return skel
    idx = np.linspace(0, T-1, new_T).round().astype(int)
    idx = np.clip(idx, 0, T-1)
    return skel[idx]


def demo_skeleton_augs():
    T, J = 10, 5
    img = ImageInfo(width=640, height=360)
    base = np.stack([
        np.stack([
            np.array([100 + t*2 + j*5, 50 + j*10], dtype=np.float32)
            for j in range(J)
        ], axis=0)
        for t in range(T)
    ], axis=0)

    rot = np.stack([skel_rotate(base[t], 10.0, img) for t in range(T)], axis=0)
    scl = np.stack([skel_scale(base[t], 1.1, img) for t in range(T)], axis=0)
    trn = np.stack([skel_translate(base[t], 5.0, -3.0) for t in range(T)], axis=0)
    jit = np.stack([skel_jitter(base[t], 1.5) for t in range(T)], axis=0)
    flp = np.stack([skel_flip_horizontal(base[t], img) for t in range(T)], axis=0)
    crp = skel_time_crop(base, 0.7)
    rsm = skel_time_resample(base, 6)

    return {
        "base": base,
        "rot": rot,
        "scale": scl,
        "translate": trn,
        "jitter": jit,
        "flip": flp,
        "time_crop": crp,
        "time_resample": rsm,
    }


### Batch skeleton augmentation from .npy files

This section loads skeleton arrays saved as `.npy` (shape (T, J, 2) or (T, J, 3) with confidence), applies selected augmentations using known frame width/height, and saves augmented copies while preserving dtype and confidence channel if present.

Edit the paths and the `IMG_W`, `IMG_H` values below to match your data.


#### How the batch skeleton augmentation (.npy) cell works

1) Configuration: set `SKELETON_DIR` (where input `.npy` files live), `OUT_DIR` (where to save), and frame size `IMG_W`, `IMG_H`. The output directory is created if missing.
2) Define `AUGS`: a dict mapping a filename suffix to a function that takes coordinates of shape (T, J, 2) and returns augmented coordinates. Included: rotate, scale, translate, jitter, horizontal flip, temporal crop, temporal resample.
3) Scan `SKELETON_DIR` for `.npy` files.
4) For each file:
   - Load `arr` and validate shape (T, J, 2|3). Keep the original `dtype`.
   - Detect confidence channel (`has_conf` when C=3). Split into `coords = arr[..., :2]` and `conf = arr[..., 2:]` if present.
   - For each augmentation compute `aug_xy` (x,y only). If `conf` exists and the time length T changed (crop/resample), align `conf` via `skel_time_resample`, then concatenate back to get (T, J, 3). Otherwise, concatenate the original `conf`.
   - Cast back to the original `dtype` and save to `OUT_DIR` with a suffix (e.g., `_rot10`, `_tcrop`).

Result: for each input `.npy` one output per augmentation is produced; the original `dtype` is preserved; if confidence exists, the third channel is preserved as well.


In [27]:
from typing import List

BASE_DIR = Path.cwd().parent

SKELETON_DIR = Path(BASE_DIR / "data/skeletons")
OUT_DIR = Path(BASE_DIR / "data/augmented_skeletons")
IMG_W, IMG_H = 640, 360
ensure_dir(OUT_DIR)

AUGS = {
    "rot10": lambda arr: np.stack([skel_rotate(arr[t, :, :2], 10.0, ImageInfo(IMG_W, IMG_H)) for t in range(arr.shape[0])], axis=0),
    "scale110": lambda arr: np.stack([skel_scale(arr[t, :, :2], 1.1, ImageInfo(IMG_W, IMG_H)) for t in range(arr.shape[0])], axis=0),
    "trans_5_-3": lambda arr: np.stack([skel_translate(arr[t, :, :2], 5.0, -3.0) for t in range(arr.shape[0])], axis=0),
    "jitter": lambda arr: np.stack([skel_jitter(arr[t, :, :2], 1.5) for t in range(arr.shape[0])], axis=0),
    "flip": lambda arr: np.stack([skel_flip_horizontal(arr[t, :, :2], ImageInfo(IMG_W, IMG_H)) for t in range(arr.shape[0])], axis=0),
    "tcrop": lambda arr: skel_time_crop(arr[:, :, :2], 0.8),
    "tresamp6": lambda arr: skel_time_resample(arr[:, :, :2], 6),
}

sk_files: List[Path] = []
if SKELETON_DIR.exists():
    sk_files = sorted([p for p in SKELETON_DIR.glob("*.npy")])
else:
    print("Skeleton dir does not exist:", SKELETON_DIR)

for npy_path in tqdm(sk_files):
    arr = np.load(npy_path)
    if arr.ndim != 3 or arr.shape[-1] not in (2, 3):
        print("Skip incompatible shape:", npy_path, arr.shape)
        continue
    dtype = arr.dtype
    has_conf = (arr.shape[-1] == 3)
    coords = arr[..., :2].astype(np.float32)
    conf = arr[..., 2:] if has_conf else None

    for suf, fn in AUGS.items():
        aug_xy = fn(coords)
        if has_conf:
            if aug_xy.shape[0] != arr.shape[0]:
                new_T = aug_xy.shape[0]
                conf_new = skel_time_resample(conf, new_T) if new_T != conf.shape[0] else conf
                aug = np.concatenate([aug_xy, conf_new], axis=-1)
            else:
                aug = np.concatenate([aug_xy, conf], axis=-1)
        else:
            aug = aug_xy
        aug = aug.astype(dtype, copy=False)
        out_path = OUT_DIR / f"{npy_path.stem}_{suf}.npy"
        ensure_dir(out_path.parent)
        np.save(out_path, aug)
print("Saved augmented skeletons to:", OUT_DIR)


100%|██████████| 3/3 [00:00<00:00, 122.80it/s]

Saved augmented skeletons to: /Users/victor/Documents/vs_files/Action_Recognition/data/augmented_skeletons





#### Test with 3 files

In [36]:
# Targeted test on three skeleton files (pick first three from data/skeletons)
BASE_DIR = Path.cwd().parent

TEST_FILES = [
    Path(BASE_DIR / "data/skeletons/0b3c6ea5f943fbc80b9c6d20373cc3bf.npy"),
    Path(BASE_DIR / "data/skeletons/0b4dd5fa651633b90bb7e4a455caaf4c.npy"),
    Path(BASE_DIR / "data/skeletons/0b9d78bb7ad1f347ed0a6af7262da572.npy"),
]

IMG_W, IMG_H = 640, 360
OUT_DIR = Path(BASE_DIR / "data/augmented_skeletons_test")
ensure_dir(OUT_DIR)

for npy_path in TEST_FILES:
    arr = np.load(npy_path)
    if arr.ndim != 3 or arr.shape[-1] not in (2, 3):
        print("Skip incompatible shape:", npy_path, arr.shape)
        continue
    dtype = arr.dtype
    has_conf = (arr.shape[-1] == 3)
    coords = arr[..., :2].astype(np.float32)
    conf = arr[..., 2:] if has_conf else None

    for suf, fn in AUGS.items():
        aug_xy = fn(coords)
        if has_conf:
            if aug_xy.shape[0] != arr.shape[0]:
                new_T = aug_xy.shape[0]
                conf_new = skel_time_resample(conf, new_T) if new_T != conf.shape[0] else conf
                aug = np.concatenate([aug_xy, conf_new], axis=-1)
            else:
                aug = np.concatenate([aug_xy, conf], axis=-1)
        else:
            aug = aug_xy
        aug = aug.astype(dtype, copy=False)
        out_path = OUT_DIR / f"{npy_path.stem}_{suf}.npy"
        np.save(out_path, aug)
    print("Augmented:", npy_path.name, "->", OUT_DIR)


Augmented: 0b3c6ea5f943fbc80b9c6d20373cc3bf.npy -> /Users/victor/Documents/vs_files/Action_Recognition/data/augmented_skeletons_test
Augmented: 0b4dd5fa651633b90bb7e4a455caaf4c.npy -> /Users/victor/Documents/vs_files/Action_Recognition/data/augmented_skeletons_test
Augmented: 0b9d78bb7ad1f347ed0a6af7262da572.npy -> /Users/victor/Documents/vs_files/Action_Recognition/data/augmented_skeletons_test


#### Check type

In [37]:
BASE_DIR = Path.cwd().parent

orig = Path(BASE_DIR / "data/skeletons/0b3c6ea5f943fbc80b9c6d20373cc3bf.npy")

IMG_W, IMG_H = 640, 360
out_dir = Path(BASE_DIR / "data/augmented_skeletons_test")
aug_paths = sorted(out_dir.glob(f"{orig.stem}_*.npy"))

arr0 = np.load(orig)
T0, J0, C0 = arr0.shape
print("orig:", arr0.shape, arr0.dtype)

for p in aug_paths:
    arr = np.load(p)
    print(p.name, "->", arr.shape, arr.dtype)
    if "_tcrop" in p.stem:
        assert arr.shape[0] < T0
    elif "_tresamp6" in p.stem:
        assert arr.shape[0] == 6
    else:
        assert arr.shape[0] == T0
    if C0 == 3:
        assert arr.shape[2] == 3
    else:
        assert arr.shape[2] == 2

orig: (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_flip.npy -> (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_jitter.npy -> (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_rot10.npy -> (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_scale110.npy -> (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_tcrop.npy -> (39, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_trans_5_-3.npy -> (49, 17, 3) float32
0b3c6ea5f943fbc80b9c6d20373cc3bf_tresamp6.npy -> (6, 17, 3) float32


In [38]:
BASE_DIR = Path.cwd().parent

data = np.load(BASE_DIR / "data/skeletons/0b3c6ea5f943fbc80b9c6d20373cc3bf.npy")

print(data.shape)

print("First frame (orig):")
print(data[0]) 
print(data[0, 0])

data1 = np.load(BASE_DIR / "data/augmented_skeletons_test/0b3c6ea5f943fbc80b9c6d20373cc3bf_flip.npy")

print(data1.shape)

print("\nFirst frame (augmented):")
print(data1[0]) 
print(data1[0, 0])

(49, 17, 3)
First frame (orig):
[[1.71322098e+02 1.21599564e+02 9.88917470e-01]
 [1.76614304e+02 1.18680008e+02 9.85222280e-01]
 [1.68325943e+02 1.18370483e+02 9.20247972e-01]
 [1.87045685e+02 1.22255722e+02 9.36038315e-01]
 [1.66081482e+02 1.21657700e+02 3.41832161e-01]
 [1.98538727e+02 1.39886856e+02 9.89978433e-01]
 [1.57727875e+02 1.38593582e+02 9.80428636e-01]
 [2.17625824e+02 1.06342621e+02 9.63149011e-01]
 [1.43540894e+02 1.04614494e+02 9.57552075e-01]
 [2.20913544e+02 7.39672165e+01 9.40750062e-01]
 [1.44039810e+02 7.45982971e+01 9.43886578e-01]
 [1.85741470e+02 2.15389664e+02 8.91559243e-01]
 [1.56021591e+02 2.14411423e+02 8.84795666e-01]
 [1.92367172e+02 2.40000000e+02 1.20780841e-01]
 [1.47710693e+02 2.40000000e+02 1.21171385e-01]
 [1.92333771e+02 2.22976364e+02 7.96720199e-03]
 [1.55217239e+02 2.26019165e+02 8.34085606e-03]]
[171.3221     121.59956      0.98891747]
(49, 17, 3)

First frame (augmented):
[[4.67677917e+02 1.21599564e+02 9.88917470e-01]
 [4.62385681e+02 1.18680