# Data Augmentation of Keypoint CSV Data

- Three methods: **mirroring**, **rotation**, **stretching**
- Applied to MoveNet (2D) and MediaPipe (3D) keypoint CSVs

In [1]:
import numpy as np
import pandas as pd
from pathlib import Path

In [2]:
MOVENET_DIR = Path('../data/processed/keypoints/movenet')
MEDIAPIPE_NORM_DIR = Path('../data/processed/keypoints/mediapipe_norm')
MEDIAPIPE_WORLD_DIR = Path('../data/processed/keypoints/mediapipe_world')

OUT_MOVENET_DIR = Path('../data/processed/keypoints_augmented/movenet')
OUT_MEDIAPIPE_NORM_DIR = Path('../data/processed/keypoints_augmented/mediapipe_norm')
OUT_MEDIAPIPE_WORLD_DIR = Path('../data/processed/keypoints_augmented/mediapipe_world')

JOINTS = [
    'left_shoulder', 'right_shoulder',
    'left_elbow', 'right_elbow',
    'left_wrist', 'right_wrist',
    'left_hip', 'right_hip',
    'left_knee', 'right_knee',
    'left_ankle', 'right_ankle'
]

# Angles for rotation in radians
ROTATION_ANGLES = [np.pi / 7, -np.pi / 11, -np.pi / 13]
# Stretch factors in x and y directions
STRETCH_FACTORS_X = [0.9, 1.1, 1.2]
STRETCH_FACTORS_Y = [0.9, 1.1, 1.2]

In [3]:
def is_3d(df):
    for col in df.columns:
        if col.endswith('_z'):
            return True
    return False


def get_columns(df, suffix):
    cols = []
    for joint in JOINTS:
        col = joint + suffix
        if col in df.columns:
            cols.append(col)
    return cols

## Augmentation functions

### Mirroring
- Negate x, y, and/or z coordinates
- Mirror any axis combination

In [None]:
def mirror(df, mirror_x=True, mirror_y=False, mirror_z=False):
    result = df.copy()
    
    if mirror_x:
        x_cols = get_columns(df, '_x')
        result[x_cols] = result[x_cols] * -1
    
    if mirror_y:
        y_cols = get_columns(df, '_y')
        result[y_cols] = result[y_cols] * -1
    
    if mirror_z:
        z_cols = get_columns(df, '_z')
        if z_cols:  # 3D data
            result[z_cols] = result[z_cols] * -1
    
    return result

### Rotation
- **2D (MoveNet):** rotation matrix on (x, y)
- **3D (MediaPipe):** Y-axis rotation on (x, z)

In [5]:
def rotate(df, angle):
    result = df.copy()
    c = np.cos(angle)
    s = np.sin(angle)

    if is_3d(df):
        # 3D: rotate around Y-axis, affects x and z
        for joint in JOINTS:
            x = df[joint + '_x'].values
            z = df[joint + '_z'].values
            result[joint + '_x'] = c * x + s * z
            result[joint + '_z'] = -s * x + c * z
    else:
        # 2D: rotate in xy-plane
        for joint in JOINTS:
            x = df[joint + '_x'].values
            y = df[joint + '_y'].values
            result[joint + '_x'] = c * x - s * y
            result[joint + '_y'] = s * x + c * y

    return result

### Stretching
- Scale x, y, and/or z coordinates by a factor
- Z-stretch only applies to 3D data

In [None]:
def stretch(df, fx=1.0, fy=1.0, fz=1.0):
    result = df.copy()

    if fx != 1.0:
        x_cols = get_columns(df, '_x')
        result[x_cols] = result[x_cols] * fx

    if fy != 1.0:
        y_cols = get_columns(df, '_y')
        result[y_cols] = result[y_cols] * fy

    if fz != 1.0:
        z_cols = get_columns(df, '_z')
        if z_cols: 
            result[z_cols] = result[z_cols] * fz

    return result

In [7]:
def save_csv(df, output_dir, method, filename):
    folder = output_dir / method
    folder.mkdir(parents=True, exist_ok=True)
    df.to_csv(folder / filename, index=False)


def run_augmentation(input_dir, output_dir):
    csv_files = list(input_dir.glob('*.csv'))
    count = 0

    for file in csv_files:
        df = pd.read_csv(file)
        name = file.stem

        # mirroring
        mirrored = mirror(df)
        save_csv(mirrored, output_dir, 'mirrored', f'{name}_mirrored.csv')
        count += 1

        # rotation at 3 angles
        for angle in ROTATION_ANGLES:
            deg = np.rad2deg(angle)
            rotated = rotate(df, angle)
            save_csv(rotated, output_dir, 'rotated', f'{name}_rot{deg:.1f}.csv')
            count += 1

        # stretch x
        for f in STRETCH_FACTORS_X:
            stretched = stretch(df, fx=f)
            save_csv(stretched, output_dir, 'stretched', f'{name}_stretchX{f}.csv')
            count += 1

        # stretch y
        for f in STRETCH_FACTORS_Y:
            stretched = stretch(df, fy=f)
            save_csv(stretched, output_dir, 'stretched', f'{name}_stretchY{f}.csv')
            count += 1

    print(f'Done: {input_dir.name} -> {count} files generated')

## Run augmentation
- 1 mirrored + 3 rotated + 6 stretched = **10 variants per original file**
- Output saved in subfolders: `mirrored/`, `rotated/`, `stretched/`

### MoveNet (2D)

In [8]:
run_augmentation(MOVENET_DIR, OUT_MOVENET_DIR)

Done: movenet -> 100 files generated


### MediaPipe Normalized (3D)

In [9]:
run_augmentation(MEDIAPIPE_NORM_DIR, OUT_MEDIAPIPE_NORM_DIR)

Done: mediapipe_norm -> 100 files generated


### MediaPipe World (3D)

In [10]:
run_augmentation(MEDIAPIPE_WORLD_DIR, OUT_MEDIAPIPE_WORLD_DIR)

Done: mediapipe_world -> 100 files generated


## Summary

In [11]:
print("File counts per method:")
print()

models = [('MoveNet', OUT_MOVENET_DIR),
          ('MediaPipe Norm', OUT_MEDIAPIPE_NORM_DIR),
          ('MediaPipe World', OUT_MEDIAPIPE_WORLD_DIR)]

for name, path in models:
    total = 0
    for method in ['mirrored', 'rotated', 'stretched']:
        method_path = path / method
        if method_path.exists():
            n = len(list(method_path.glob('*.csv')))
            print(f'  {name}/{method}: {n} files')
            total += n
    print(f'  {name} total: {total}')
    print()

File counts per method:

  MoveNet/mirrored: 10 files
  MoveNet/rotated: 30 files
  MoveNet/stretched: 60 files
  MoveNet total: 100

  MediaPipe Norm/mirrored: 10 files
  MediaPipe Norm/rotated: 30 files
  MediaPipe Norm/stretched: 60 files
  MediaPipe Norm total: 100

  MediaPipe World/mirrored: 10 files
  MediaPipe World/rotated: 30 files
  MediaPipe World/stretched: 60 files
  MediaPipe World total: 100

