# Silksong Gesture Recognition - CNN/LSTM Training

**Training for Hollow Knight: Silksong Voice-Controlled Watch Interface**

This notebook trains a CNN/LSTM deep learning model for real-time gesture recognition.

## Setup Requirements:
1. ✅ Enable GPU: Runtime > Change runtime type > GPU (T4 recommended)
2. ✅ Upload your data to Google Drive in: `My Drive/silksong_data/`
3. ✅ Each session folder should contain:
   - `sensor_data.csv` (accelerometer + gyroscope data)
   - `[session]_labels.csv` (gesture labels with timestamps)

## Expected Training Time:
- **With GPU (T4):** 20-40 minutes
- **Without GPU (CPU):** 2-4 hours (not recommended)

---

## 1. Mount Google Drive & Install Dependencies

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

print("\n✅ Google Drive mounted!")
print("Your data should be in: /content/drive/MyDrive/silksong_data/")

In [None]:
# Check GPU availability
import tensorflow as tf

print("TensorFlow version:", tf.__version__)
print("\nGPU Available:", tf.config.list_physical_devices('GPU'))

if tf.config.list_physical_devices('GPU'):
    print("\n✅ GPU is enabled! Training will be fast.")
else:
    print("\n⚠️  No GPU detected. Training will be slow.")
    print("   Enable GPU: Runtime > Change runtime type > GPU")

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os
from pathlib import Path

print("✅ All imports successful!")

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## 2. Configure Data Paths

**Update this cell with your session folder names!**

In [None]:
# Data directory in Google Drive
DATA_DIR = '/content/drive/MyDrive/silksong_data'

# List your session folders here
SESSION_FOLDERS = [
    '20251017_125600_session',
    '20251017_135458_session',
    '20251017_141539_session',
    '20251017_143217_session',
    '20251017_143627_session',
]

# Model configuration
WINDOW_SIZE = 50  # 1 second at 50Hz sampling rate
STRIDE = 25       # 50% overlap between windows
NUM_FEATURES = 9  # 3 sensors × 3 axes (accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, etc.)

# Gesture classes
GESTURES = ['jump', 'punch', 'turn', 'walk', 'noise']
NUM_CLASSES = len(GESTURES)

print(f"Configured {len(SESSION_FOLDERS)} sessions for training")
print(f"Gestures: {GESTURES}")

## 3. Load and Prepare Training Data

In [None]:
def load_session_data(session_folder):
    """Load sensor data and labels for one session"""
    session_path = os.path.join(DATA_DIR, session_folder)

    # Load sensor data
    sensor_file = os.path.join(session_path, 'sensor_data.csv')
    sensor_data = pd.read_csv(sensor_file)

    # Load labels
    labels_file = os.path.join(session_path, f'{session_folder}_labels.csv')
    labels_data = pd.read_csv(labels_file)

    return sensor_data, labels_data


def create_label_vector(sensor_data, labels_data):
    """Create per-sample labels from segment labels"""
    num_samples = len(sensor_data)
    label_vector = np.full(num_samples, -1, dtype=int)

    # Assuming 50Hz sampling rate
    sample_rate = 50.0

    for _, row in labels_data.iterrows():
        start_time = row['timestamp']
        duration = row['duration']
        gesture = row['gesture']

        if gesture not in GESTURES:
            continue

        gesture_idx = GESTURES.index(gesture)

        # Convert time to sample indices
        start_idx = int(start_time * sample_rate)
        end_idx = int((start_time + duration) * sample_rate)

        # Clip to valid range
        start_idx = max(0, min(start_idx, num_samples))
        end_idx = max(0, min(end_idx, num_samples))

        label_vector[start_idx:end_idx] = gesture_idx

    return label_vector


def create_windows(sensor_data, labels, window_size, stride):
    """Create sliding windows from continuous data"""
    X = []
    y = []

    num_samples = len(sensor_data)

    for i in range(0, num_samples - window_size, stride):
        window = sensor_data[i:i+window_size]
        window_labels = labels[i:i+window_size]

        # Skip if window contains unlabeled data
        if np.any(window_labels == -1):
            continue

        # Use majority vote for window label
        label = np.bincount(window_labels).argmax()

        X.append(window)
        y.append(label)

    return np.array(X), np.array(y)


print("✅ Helper functions defined")

In [None]:
# Load and process all sessions
all_X = []
all_y = []

for session_folder in SESSION_FOLDERS:
    print(f"\nProcessing {session_folder}...")

    try:
        sensor_data, labels_data = load_session_data(session_folder)
        print(f"  Sensor samples: {len(sensor_data)}")
        print(f"  Label segments: {len(labels_data)}")

        # Extract features (assume columns are: timestamp, accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z, ...)
        feature_cols = [col for col in sensor_data.columns if col != 'timestamp']
        features = sensor_data[feature_cols].values

        # Create per-sample labels
        label_vector = create_label_vector(sensor_data, labels_data)

        # Create sliding windows
        X, y = create_windows(features, label_vector, WINDOW_SIZE, STRIDE)
        print(f"  Generated {len(X)} windows")

        all_X.append(X)
        all_y.append(y)

    except Exception as e:
        print(f"  ❌ Error: {e}")
        continue

# Combine all sessions
if all_X:
    X_combined = np.concatenate(all_X, axis=0)
    y_combined = np.concatenate(all_y, axis=0)

    print(f"\n✅ Total training windows: {len(X_combined)}")
    print(f"   Input shape: {X_combined.shape}")
    print(f"   Labels shape: {y_combined.shape}")

    # Show class distribution
    print("\n   Class distribution:")
    for i, gesture in enumerate(GESTURES):
        count = np.sum(y_combined == i)
        percentage = count / len(y_combined) * 100
        print(f"     {gesture}: {count} ({percentage:.1f}%)")
else:
    print("\n❌ No data loaded! Check your data paths.")

## 4. Split Train/Validation/Test Sets

In [None]:
# Shuffle data
X_combined, y_combined = shuffle(X_combined, y_combined, random_state=42)

# Split: 70% train, 15% validation, 15% test
X_temp, X_test, y_temp, y_test = train_test_split(
    X_combined, y_combined, test_size=0.15, random_state=42, stratify=y_combined
)

X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.176, random_state=42, stratify=y_temp  # 0.176 of 0.85 ≈ 0.15 overall
)

print(f"Training set:   {len(X_train)} samples ({len(X_train)/len(X_combined)*100:.1f}%)")
print(f"Validation set: {len(X_val)} samples ({len(X_val)/len(X_combined)*100:.1f}%)")
print(f"Test set:       {len(X_test)} samples ({len(X_test)/len(X_combined)*100:.1f}%)")

## 5. Build CNN/LSTM Model

In [None]:
def create_cnn_lstm_model(input_shape, num_classes):
    """Create CNN/LSTM architecture for gesture recognition"""

    model = keras.Sequential([
        # Input layer
        layers.Input(shape=input_shape),

        # CNN layers for feature extraction
        layers.Conv1D(filters=64, kernel_size=5, padding='same', activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),

        layers.Conv1D(filters=128, kernel_size=3, padding='same', activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),

        # LSTM layers for temporal modeling
        layers.LSTM(128, return_sequences=True),
        layers.Dropout(0.3),

        layers.LSTM(64),
        layers.Dropout(0.3),

        # Dense layers for classification
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),

        layers.Dense(num_classes, activation='softmax')
    ])

    return model


# Create model
input_shape = (WINDOW_SIZE, NUM_FEATURES)
model = create_cnn_lstm_model(input_shape, NUM_CLASSES)

# Compile model
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()

## 6. Train Model

In [None]:
# Training callbacks
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    ),
    keras.callbacks.ModelCheckpoint(
        'best_model.h5',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )
]

print("✅ Callbacks configured")

In [None]:
# Train model
print("🚀 Starting training...\n")

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=100,
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

print("\n✅ Training complete!")

## 7. Evaluate Model

In [None]:
# Plot training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Accuracy
ax1.plot(history.history['accuracy'], label='Train')
ax1.plot(history.history['val_accuracy'], label='Validation')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.set_title('Model Accuracy')
ax1.legend()
ax1.grid(True)

# Loss
ax2.plot(history.history['loss'], label='Train')
ax2.plot(history.history['val_loss'], label='Validation')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.set_title('Model Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Evaluate on test set
test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f"\n📊 Test Accuracy: {test_accuracy*100:.2f}%")
print(f"   Test Loss: {test_loss:.4f}")

# Predictions
y_pred = model.predict(X_test, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)

# Classification report
print("\n" + "="*60)
print("CLASSIFICATION REPORT")
print("="*60)
print(classification_report(y_test, y_pred_classes, target_names=GESTURES))

In [None]:
# Confusion matrix
cm = confusion_matrix(y_test, y_pred_classes)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

plt.figure(figsize=(10, 8))
sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
            xticklabels=GESTURES, yticklabels=GESTURES)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix (Normalized)')
plt.tight_layout()
plt.show()

## 8. Save Trained Model

In [None]:
# Save to Google Drive
model_save_path = '/content/drive/MyDrive/silksong_data/cnn_lstm_gesture.h5'
model.save(model_save_path)

print(f"✅ Model saved to: {model_save_path}")
print("\nDownload this file to your local project and place it in the 'models/' directory")
print("Then run: python src/udp_listener_v3.py")

## ✅ Training Complete!

### Next Steps:

1. **Download the trained model:**
   - Right-click on the file in Google Drive: `silksong_data/cnn_lstm_gesture.h5`
   - Download to your local machine

2. **Place model in your project:**
   ```bash
   # Move to your project's models directory
   mv ~/Downloads/cnn_lstm_gesture.h5 /path/to/project/models/
   ```

3. **Test real-time recognition:**
   ```bash
   cd src
   python udp_listener_v3.py
   ```

4. **Expected performance:**
   - Latency: 10-30ms per prediction
   - Accuracy: 90-98%
   - Much faster than Phase IV SVM model!

---

**Questions or issues?** Check the documentation in `docs/Phase_V/README.md`