In [10]:
import cv2
import mediapipe as mp
import numpy as np
import os
from collections import deque
import time
import sys
import argparse
import datetime
from scipy import interpolate
import json
import logging
from scipy.ndimage import gaussian_filter1d

# ── DETECT JUPYTER ENVIRONMENT ─────────
def is_jupyter():
    try:
        get_ipython()
        return True
    except NameError:
        return False

# ── CONFIG FOR JUPYTER ────────────────
GESTURE_NAME = "sos"  # Change this for each gesture
GESTURE_TYPE = "dynamic"  # "static" or "dynamic"
MAX_RECORDINGS = 5      # Number of gesture performances
NUM_VARIATIONS = 10     # Total files per performance (1 original + 9 augmented)

# ── PARSE ARGUMENTS FOR COMMAND LINE ───
if not is_jupyter():
    parser = argparse.ArgumentParser(description="Record gesture sequences for dataset creation.")
    parser.add_argument("--gesture", type=str, required=True, help="Name of the gesture to record (e.g., swipe_right, peace).")
    parser.add_argument("--type", type=str, choices=["static", "dynamic"], required=True, help="Type of gesture: static or dynamic.")
    parser.add_argument("--samples", type=int, default=5, help="Number of gesture performances to record.")
    parser.add_argument("--variations", type=int, default=10, help="Number of files to save per performance (1 original + augmented).")
    parser.add_argument("--length", type=int, default=60, help="Target length of sequence in frames.")
    args = parser.parse_args()
    GESTURE_NAME = args.gesture
    GESTURE_TYPE = args.type
    MAX_RECORDINGS = args.samples
    NUM_VARIATIONS = args.variations
    SEQUENCE_LENGTH = args.length

# ── CONFIG ─────────────────────────────
SEQUENCE_LENGTH = 60 if 'SEQUENCE_LENGTH' not in locals() else SEQUENCE_LENGTH
SEQUENCE_LENGTH_RANGE = 10  # Allow sequences to be +/- this many frames
BASE_SAVE_DIR = "dataset"
SAVE_DIR = f"{BASE_SAVE_DIR}/{GESTURE_TYPE}/{GESTURE_NAME}"
os.makedirs(SAVE_DIR, exist_ok=True)
MIN_VALID_FRAMES = int(SEQUENCE_LENGTH * 0.7)  # Lowered to 70% for faster gestures
MOVEMENT_THRESHOLD = 0.15 if GESTURE_TYPE == "dynamic" else 0.05
AUGMENTATION_NOISE = 0.02 if GESTURE_TYPE == "dynamic" else 0.01
SMOOTH_JITTER = True  # Apply light Gaussian smoothing to temporal jitter
LOG_FILE = f"{BASE_SAVE_DIR}/logs.txt"

# ── SETUP LOGGING ──────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.FileHandler(LOG_FILE, encoding='utf-8'),  # Use utf-8 for file
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

# ── INIT ───────────────────────────────
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=1,
                       min_detection_confidence=0.7, min_tracking_confidence=0.5)
mp_draw = mp.solutions.drawing_utils

cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
buffer = []  # Use list instead of deque to avoid premature frame dropping
recording = False
countdown_active = False
countdown_start = 0
sample_count = 0
valid_frames = 0
hands_visible_frames = 0
quality_issues = []
last_valid_frame = None  # Store last valid landmarks

# FPS counter
start_time = time.time()
frame_count = 0

# Create log directory
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)

def log_message(message):
    """Log message with timestamp."""
    logger.info(message)

def normalize_landmarks(landmarks):
    """Normalize landmarks relative to wrist position and hand size."""
    landmarks = np.array(landmarks).reshape(-1, 3)
    wrist = landmarks[0]
    normalized = landmarks - wrist
    scale_reference = np.linalg.norm(landmarks[9] - wrist)
    if scale_reference > 0:
        normalized = normalized / scale_reference
    return normalized.flatten()

def check_hand_quality(landmarks):
    """Check if hand landmarks are of good quality."""
    landmarks = np.array(landmarks).reshape(-1, 3)
    issues = []
    x_coords = landmarks[:, 0]
    y_coords = landmarks[:, 1]
    if np.min(x_coords) < 0.05 or np.max(x_coords) > 0.95:
        issues.append("Hand near horizontal edge")
    if np.min(y_coords) < 0.05 or np.max(y_coords) > 0.95:
        issues.append("Hand near vertical edge")
    return issues

def check_movement(sequence, threshold=MOVEMENT_THRESHOLD):
    """Check if the sequence has significant movement."""
    sequence = np.array(sequence)
    x_coords = sequence[:, ::3]
    x_range = np.ptp(x_coords, axis=0).max()
    y_coords = sequence[:, 1::3]
    y_range = np.ptp(y_coords, axis=0).max()
    return max(x_range, y_range) >= threshold, max(x_range, y_range)

def preprocess_sequence(sequence, target_length=SEQUENCE_LENGTH):
    """Resample sequence to target length using interpolation."""
    if len(sequence) == target_length:
        return np.array(sequence)
    
    orig_time = np.linspace(0, 1, len(sequence))
    new_time = np.linspace(0, 1, target_length)
    sequence = np.array(sequence)
    interp_sequence = np.zeros((target_length, sequence.shape[1]))
    
    for i in range(sequence.shape[1]):
        interp_func = interpolate.interp1d(orig_time, sequence[:, i])
        interp_sequence[:, i] = interp_func(new_time)
        
    return interp_sequence

def augment_sequence(sequence, noise_level=AUGMENTATION_NOISE):
    """Generate an augmented version of the sequence."""
    sequence = np.array(sequence)
    choice = np.random.randint(0, 4)
    
    if choice == 0:
        noise = np.random.uniform(-noise_level, noise_level, sequence.shape)
        augmented = sequence + noise
        
    elif choice == 1:
        augmented = np.copy(sequence)
        shift = np.random.randint(-2, 3)
        if shift > 0:
            augmented[shift:, :] = sequence[:-shift, :]
            for i in range(shift):
                augmented[i, :] = sequence[0, :]
        elif shift < 0:
            augmented[:shift, :] = sequence[-shift:, :]
            for i in range(-shift):
                augmented[sequence.shape[0]-i-1, :] = sequence[-1, :]
        if SMOOTH_JITTER:
            for i in range(augmented.shape[1]):
                augmented[:, i] = gaussian_filter1d(augmented[:, i], sigma=0.5)
        
    elif choice == 2:
        scale_factor = np.random.uniform(0.95, 1.05)
        augmented = sequence * scale_factor
        
    elif choice == 3:
        augmented = np.copy(sequence)
        num_landmarks = sequence.shape[1] // 3
        landmark_to_zero = np.random.randint(1, num_landmarks)
        augmented[:, landmark_to_zero*3:landmark_to_zero*3+3] = 0
    
    augmented[:, ::3] = np.clip(augmented[:, ::3], 0, 1)
    augmented[:, 1::3] = np.clip(augmented[:, 1::3], 0, 1)
    return augmented

def draw_hand_box(frame, landmarks):
    """Draw a bounding box around the hand."""
    if landmarks:
        landmarks_array = np.array(landmarks).reshape(-1, 3)
        x_coords = landmarks_array[:, 0]
        y_coords = landmarks_array[:, 1]
        h, w, _ = frame.shape
        x_min = int(np.min(x_coords) * w) - 10
        x_max = int(np.max(x_coords) * w) + 10
        y_min = int(np.min(y_coords) * h) - 10
        y_max = int(np.max(y_coords) * h) + 10
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
        quality_color = (0, 255, 0)
        if x_min < 10 or y_min < 10 or x_max > w-10 or y_max > h-10:
            quality_color = (0, 165, 255)
        cv2.circle(frame, (x_min, y_min-10), 5, quality_color, -1)

def save_metadata(sample_index, movement_amount=None):
    """Save metadata for this recording session."""
    metadata = {
        "gesture_name": GESTURE_NAME,
        "gesture_type": GESTURE_TYPE,
        "date_recorded": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "sequence_length": SEQUENCE_LENGTH,
        "quality_issues": quality_issues,
        "movement_amount": movement_amount
    }
    with open(f"{SAVE_DIR}/{sample_index}_metadata.json", "w", encoding='utf-8') as f:
        json.dump(metadata, f, indent=2)

# Main info display
log_message(f"Recording {MAX_RECORDINGS} performances for {GESTURE_TYPE} gesture: {GESTURE_NAME}")
log_message(f"Each performance will save {NUM_VARIATIONS} files (1 original + {NUM_VARIATIONS-1} augmented).")
log_message(f"Files will be saved in: {SAVE_DIR}")
print("Press 's' to start recording a gesture.")
print("Press 'i' to show info about the current gesture")
print("Press 'q' to quit.")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        if time.time() - start_time >= 1.0:
            fps = frame_count
            frame_count = 0
            start_time = time.time()

        frame = cv2.flip(frame, 1)
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = hands.process(rgb)

        landmark_row = None
        raw_landmarks = None
        hand_issues = []

        if result.multi_hand_landmarks:
            for hand_landmarks in result.multi_hand_landmarks:
                mp_draw.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
                raw_landmarks = []
                for lm in hand_landmarks.landmark:
                    raw_landmarks.extend([lm.x, lm.y, lm.z])
                landmark_row = normalize_landmarks(raw_landmarks).tolist()
                hand_issues = check_hand_quality(raw_landmarks)
                draw_hand_box(frame, raw_landmarks)
                last_valid_frame = landmark_row  # Update last valid frame
                break

        # ── COUNTDOWN LOGIC ───────────
        if countdown_active:
            elapsed = time.time() - countdown_start
            countdown_secs = 3 - int(elapsed)
            if countdown_secs > 0:
                cv2.putText(frame, str(countdown_secs),
                           (frame.shape[1]//2-50, frame.shape[0]//2),
                           cv2.FONT_HERSHEY_SIMPLEX, 4, (255, 0, 0), 5)
            else:
                countdown_active = False
                recording = True
                buffer = []
                valid_frames = 0
                hands_visible_frames = 0
                quality_issues = []
                last_valid_frame = None
                log_message("Started recording... Perform the gesture now.")

        # ── RECORDING GESTURE ───────────
        elif recording:
            if landmark_row:
                buffer.append(landmark_row)
                valid_frames += 1
                hands_visible_frames += 1
                if hand_issues:
                    quality_issues.extend(hand_issues)
                cv2.putText(frame, f"Recording ({len(buffer)}/{SEQUENCE_LENGTH} frames)",
                           (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                if hand_issues:
                    issue_text = ", ".join(hand_issues)
                    cv2.putText(frame, f"Issue: {issue_text}",
                               (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)
            else:
                # Use last valid frame if available, else zeros
                frame_to_append = last_valid_frame if last_valid_frame else [0] * 63
                buffer.append(frame_to_append)
                cv2.putText(frame, "No hand detected!",
                           (10, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

            # Check if we should stop recording
            min_frames = SEQUENCE_LENGTH - SEQUENCE_LENGTH_RANGE
            max_frames = SEQUENCE_LENGTH + SEQUENCE_LENGTH_RANGE

            if len(buffer) >= min_frames and hands_visible_frames >= MIN_VALID_FRAMES:
                stop_recording = False
                sequence = buffer[:max_frames]  # Cap to max frames

                if len(buffer) >= max_frames:
                    stop_recording = True
                elif GESTURE_TYPE == "static" and len(buffer) >= SEQUENCE_LENGTH:
                    stop_recording = True
                elif GESTURE_TYPE == "dynamic":
                    # Early stop for dynamic gestures with sufficient movement
                    has_movement, movement_amount = check_movement(sequence)
                    if has_movement and len(buffer) >= MIN_VALID_FRAMES:
                        stop_recording = True

                if stop_recording:
                    if GESTURE_TYPE == "dynamic":
                        has_movement, movement_amount = check_movement(sequence)
                        if not has_movement:
                            log_message(f"Not enough movement (max range: {movement_amount:.3f} < {MOVEMENT_THRESHOLD}). Try again.")
                            buffer = []
                            valid_frames = 0
                            hands_visible_frames = 0
                            recording = False
                            continue

                    sequence = preprocess_sequence(sequence, SEQUENCE_LENGTH)
                    np.savetxt(f"{SAVE_DIR}/{sample_count}_original.csv", sequence, delimiter=",")
                    log_message(f"Saved original sequence: {SAVE_DIR}/{sample_count}_original.csv")

                    for i in range(1, NUM_VARIATIONS):
                        augmented_sequence = augment_sequence(sequence)
                        np.savetxt(f"{SAVE_DIR}/{sample_count}_{i}.csv", augmented_sequence, delimiter=",")

                    if quality_issues:
                        unique_issues = list(set(quality_issues))
                        log_message(f"Recording had some quality issues: {', '.join(unique_issues)}")

                    save_metadata(sample_count, movement_amount if GESTURE_TYPE == "dynamic" else None)
                    sample_count += 1
                    buffer = []
                    valid_frames = 0
                    hands_visible_frames = 0
                    recording = False

                    if sample_count >= MAX_RECORDINGS:
                        log_message("All recordings done!")
                        break
                    else:
                        log_message("Stopped recording. Press 's' for next sequence.")

            progress = min(len(buffer) / SEQUENCE_LENGTH, 1.0)
            cv2.rectangle(frame, (0, frame.shape[0]-10),
                         (int(frame.shape[1] * progress), frame.shape[0]),
                         (0, 255, 0), -1)
        else:
            status = "Idle - Press 's' to record"
            cv2.putText(frame, status, (10, 40),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (100, 100, 255), 2)
            if landmark_row and hand_issues:
                issue_text = ", ".join(hand_issues)
                cv2.putText(frame, f"Issue: {issue_text}",
                           (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)

        cv2.putText(frame, f"Recorded: {sample_count}/{MAX_RECORDINGS}",
                   (frame.shape[1]-250, 30), cv2.FONT_HERSHEY_SIMPLEX,
                   0.7, (255, 255, 255), 2)
        gesture_info = f"Gesture: {GESTURE_NAME} ({GESTURE_TYPE})"
        cv2.putText(frame, gesture_info,
                   (10, frame.shape[0]-20), cv2.FONT_HERSHEY_SIMPLEX,
                   0.7, (255, 255, 255), 2)
        cv2.imshow(f"Gesture Recorder: {GESTURE_NAME} ({GESTURE_TYPE})", frame)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('s') and not recording and not countdown_active and sample_count < MAX_RECORDINGS:
            countdown_active = True
            countdown_start = time.time()
            print("Countdown started... Get ready!")
        elif key == ord('i'):
            print(f"\n=== GESTURE INFO ===")
            print(f"Name: {GESTURE_NAME}")
            print(f"Type: {GESTURE_TYPE}")
            print(f"Progress: {sample_count}/{MAX_RECORDINGS} recordings")
            print(f"Saving to: {SAVE_DIR}")
            print(f"Min valid frames: {MIN_VALID_FRAMES}")
            if GESTURE_TYPE == "dynamic":
                print(f"Movement threshold: {MOVEMENT_THRESHOLD}")
            print("===================\n")
        elif key == ord('q'):
            if recording:
                confirm = input("Recording in progress. Are you sure you want to quit? (y/n): ")
                if confirm.lower() != 'y':
                    continue
            break

finally:
    cap.release()
    cv2.destroyAllWindows()
    hands.close()

log_message(f"Session ended. Recorded {sample_count}/{MAX_RECORDINGS} samples for {GESTURE_NAME}.")

[2025-05-13 19:22:44] Recording 5 performances for dynamic gesture: sos
[2025-05-13 19:22:44] Each performance will save 10 files (1 original + 9 augmented).
[2025-05-13 19:22:44] Files will be saved in: dataset/dynamic/sos


Press 's' to start recording a gesture.
Press 'i' to show info about the current gesture
Press 'q' to quit.
Countdown started... Get ready!


[2025-05-13 19:22:50] Started recording... Perform the gesture now.
[2025-05-13 19:22:52] Saved original sequence: dataset/dynamic/sos/0_original.csv
[2025-05-13 19:22:52] Stopped recording. Press 's' for next sequence.


Countdown started... Get ready!


[2025-05-13 19:22:57] Started recording... Perform the gesture now.
[2025-05-13 19:22:59] Saved original sequence: dataset/dynamic/sos/1_original.csv
[2025-05-13 19:22:59] Stopped recording. Press 's' for next sequence.


Countdown started... Get ready!


[2025-05-13 19:23:04] Started recording... Perform the gesture now.
[2025-05-13 19:23:06] Saved original sequence: dataset/dynamic/sos/2_original.csv
[2025-05-13 19:23:06] Stopped recording. Press 's' for next sequence.


Countdown started... Get ready!


[2025-05-13 19:23:10] Started recording... Perform the gesture now.
[2025-05-13 19:23:12] Saved original sequence: dataset/dynamic/sos/3_original.csv
[2025-05-13 19:23:12] Stopped recording. Press 's' for next sequence.


Countdown started... Get ready!


[2025-05-13 19:23:17] Started recording... Perform the gesture now.
[2025-05-13 19:23:19] Saved original sequence: dataset/dynamic/sos/4_original.csv
[2025-05-13 19:23:19] All recordings done!
[2025-05-13 19:23:19] Session ended. Recorded 5/5 samples for sos.
