# CNN/LSTM Parallel Two-Classifier Training

**Parallel Architecture for Silksong Gesture Recognition**

This notebook trains TWO CNN/LSTM models:
1. **Binary Classifier**: Walk vs Idle (locomotion - 5s samples)
2. **Multiclass Classifier**: Jump, Punch, Turn_Left, Turn_Right (actions - 1-2s samples)

These run in **PARALLEL** for simultaneous detection (e.g., walk + jump).

## Setup Requirements:
1. ✅ Enable GPU: Runtime > Change runtime type > GPU (T4 recommended)
2. ✅ Upload merged_training.zip to Google Drive
3. ✅ Unzip in: `My Drive/silksong_data/merged_training/`

## Expected Training Time:
- **With GPU (T4):** 30-60 minutes (2 models)
- **Without GPU:** Not recommended

---

## 1. Mount Google Drive & Check GPU

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

print("\n✅ Google Drive mounted!")
print("Your data should be in: /content/drive/MyDrive/silksong_data/merged_training/")

In [None]:
# Check GPU availability
import tensorflow as tf

print("TensorFlow version:", tf.__version__)
print("\nGPU Available:", tf.config.list_physical_devices('GPU'))

if tf.config.list_physical_devices('GPU'):
    print("\n✅ GPU is enabled! Training will be fast.")
else:
    print("\n⚠️  No GPU detected. Enable GPU: Runtime > Change runtime type > GPU")

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.utils import shuffle, class_weight
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os
from pathlib import Path
from scipy import stats

print("✅ All imports successful!")

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## 2. Configure Data Paths & Architecture

**CRITICAL**: Two separate models with different sample types!

In [None]:
# Data directory in Google Drive
DATA_DIR = '/content/drive/MyDrive/silksong_data/merged_training'

# ==================== PARALLEL ARCHITECTURE ====================

# BINARY CLASSIFIER: Locomotion states (5s samples, ~100-200 rows each)
BINARY_GESTURES = ['idle', 'walk']
BINARY_DATA_DIR = os.path.join(DATA_DIR, 'binary_classification')
BINARY_WINDOW_SIZE = 100  # 2 seconds at 50Hz (from 5s samples)
BINARY_STRIDE = 50        # 1 second stride

# MULTICLASS CLASSIFIER: Action gestures (1-2s samples, ~40-80 rows each)
MULTICLASS_GESTURES = ['jump', 'punch', 'turn_left', 'turn_right']
MULTICLASS_DATA_DIR = os.path.join(DATA_DIR, 'multiclass_classification')
MULTICLASS_WINDOW_SIZE = 50  # 1 second at 50Hz (from 1-2s samples)
MULTICLASS_STRIDE = 25        # 0.5 second stride

# Sensor features (merged data format)
NUM_FEATURES = 10  # accel_x/y/z, gyro_x/y/z, rot_w/x/y/z

print("📊 Training Configuration:")
print(f"\nBinary Classifier (Locomotion):")
print(f"  Gestures: {BINARY_GESTURES}")
print(f"  Window: {BINARY_WINDOW_SIZE} samples = {BINARY_WINDOW_SIZE/50:.1f}s")
print(f"  Data: {BINARY_DATA_DIR}")

print(f"\nMulticlass Classifier (Actions):")
print(f"  Gestures: {MULTICLASS_GESTURES}")
print(f"  Window: {MULTICLASS_WINDOW_SIZE} samples = {MULTICLASS_WINDOW_SIZE/50:.1f}s")
print(f"  Data: {MULTICLASS_DATA_DIR}")

## 3. Data Loading Functions

In [None]:
def load_gesture_samples(data_dir, gestures, feature_cols=None):
    """
    Load CSV files for each gesture class (merged format).

    Args:
        data_dir: Path to binary_classification or multiclass_classification folder
        gestures: List of gesture names
        feature_cols: Columns to use as features

    Returns:
        X: List of samples (each is a 2D array: timesteps × features)
        y: List of labels (gesture indices)
    """
    if feature_cols is None:
        feature_cols = ['accel_x', 'accel_y', 'accel_z',
                       'gyro_x', 'gyro_y', 'gyro_z',
                       'rot_w', 'rot_x', 'rot_y', 'rot_z']

    all_samples = []
    all_labels = []

    for gesture_idx, gesture in enumerate(gestures):
        gesture_path = Path(data_dir) / gesture

        if not gesture_path.exists():
            print(f"⚠️  Warning: {gesture} folder not found at {gesture_path}")
            continue

        csv_files = list(gesture_path.glob("*.csv"))
        print(f"📂 Loading {len(csv_files)} samples for '{gesture}'...")

        for csv_file in csv_files:
            try:
                df = pd.read_csv(csv_file)

                # Extract features (merged format has all sensors in one row)
                if all(col in df.columns for col in feature_cols):
                    sample = df[feature_cols].values  # Shape: (n_timesteps, n_features)
                    all_samples.append(sample)
                    all_labels.append(gesture_idx)
                else:
                    print(f"⚠️  Skipping {csv_file.name}: missing columns")

            except Exception as e:
                print(f"❌ Error loading {csv_file.name}: {e}")

    print(f"\n✅ Loaded {len(all_samples)} total samples")
    return all_samples, all_labels


def create_windows(samples, labels, window_size, stride):
    """
    Create sliding windows from variable-length samples.

    Args:
        samples: List of 2D arrays (each: timesteps × features)
        labels: List of gesture indices
        window_size: Number of timesteps per window
        stride: Step size for sliding window

    Returns:
        X: Array of shape (n_windows, window_size, n_features)
        y: Array of shape (n_windows,)
    """
    X_windows = []
    y_windows = []

    for sample, label in zip(samples, labels):
        # Slide window across this sample
        for start_idx in range(0, len(sample) - window_size + 1, stride):
            window = sample[start_idx:start_idx + window_size]
            X_windows.append(window)
            y_windows.append(label)

    X = np.array(X_windows)
    y = np.array(y_windows)

    print(f"\n📊 Created {len(X)} windows from {len(samples)} samples")
    print(f"   Window shape: {X.shape}")
    print(f"   Label distribution: {np.bincount(y)}")

    return X, y

## 4. Model Architecture

In [None]:
def create_cnn_lstm_model(input_shape, num_classes, name="gesture_model"):
    """
    Create CNN/LSTM hybrid model.

    Args:
        input_shape: (window_size, num_features)
        num_classes: Number of gesture classes
        name: Model name

    Returns:
        Compiled Keras model
    """
    model = keras.Sequential([
        # Input layer
        keras.Input(shape=input_shape, name='sensor_input'),

        # CNN layers for feature extraction
        layers.Conv1D(64, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.3),

        layers.Conv1D(128, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Dropout(0.3),

        # LSTM layers for temporal patterns
        layers.LSTM(128, return_sequences=True),
        layers.Dropout(0.3),

        layers.LSTM(64),
        layers.Dropout(0.3),

        # Dense classification layers
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),

        layers.Dense(num_classes, activation='softmax', name='output')
    ], name=name)

    # Compile model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

## 5. Train Binary Classifier (Walk vs Idle)

In [None]:
# Load binary data
print("\n" + "="*60)
print("BINARY CLASSIFIER TRAINING")
print("="*60)

binary_samples, binary_labels = load_gesture_samples(
    BINARY_DATA_DIR,
    BINARY_GESTURES
)

# Create windows
X_binary, y_binary = create_windows(
    binary_samples,
    binary_labels,
    BINARY_WINDOW_SIZE,
    BINARY_STRIDE
)

# Split train/test
X_binary_train, X_binary_test, y_binary_train, y_binary_test = train_test_split(
    X_binary, y_binary, test_size=0.2, random_state=42, stratify=y_binary
)

print(f"\n📊 Binary Split:")
print(f"   Train: {len(X_binary_train)} windows")
print(f"   Test:  {len(X_binary_test)} windows")

In [None]:
# Create binary model
binary_model = create_cnn_lstm_model(
    input_shape=(BINARY_WINDOW_SIZE, NUM_FEATURES),
    num_classes=len(BINARY_GESTURES),
    name="binary_classifier"
)

binary_model.summary()

In [None]:
# Calculate class weights
binary_class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(y_binary_train),
    y=y_binary_train
)
binary_class_weights_dict = {i: w for i, w in enumerate(binary_class_weights)}

print(f"\nClass weights: {binary_class_weights_dict}")

# Train binary model
binary_history = binary_model.fit(
    X_binary_train, y_binary_train,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    class_weight=binary_class_weights_dict,
    callbacks=[
        keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(patience=5, factor=0.5)
    ],
    verbose=1
)

In [None]:
# Evaluate binary model
binary_test_loss, binary_test_acc = binary_model.evaluate(X_binary_test, y_binary_test)
print(f"\n✅ Binary Test Accuracy: {binary_test_acc*100:.2f}%")

# Predictions
y_binary_pred = np.argmax(binary_model.predict(X_binary_test), axis=1)

# Classification report
print("\nClassification Report (Binary):")
print(classification_report(y_binary_test, y_binary_pred, target_names=BINARY_GESTURES))

# Confusion matrix
cm_binary = confusion_matrix(y_binary_test, y_binary_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm_binary, annot=True, fmt='d', cmap='Blues',
            xticklabels=BINARY_GESTURES, yticklabels=BINARY_GESTURES)
plt.title('Binary Classifier Confusion Matrix')
plt.ylabel('True')
plt.xlabel('Predicted')
plt.show()

## 6. Train Multiclass Classifier (Jump, Punch, Turns)

In [None]:
# Load multiclass data
print("\n" + "="*60)
print("MULTICLASS CLASSIFIER TRAINING")
print("="*60)

multiclass_samples, multiclass_labels = load_gesture_samples(
    MULTICLASS_DATA_DIR,
    MULTICLASS_GESTURES
)

# Create windows
X_multiclass, y_multiclass = create_windows(
    multiclass_samples,
    multiclass_labels,
    MULTICLASS_WINDOW_SIZE,
    MULTICLASS_STRIDE
)

# Split train/test
X_multi_train, X_multi_test, y_multi_train, y_multi_test = train_test_split(
    X_multiclass, y_multiclass, test_size=0.2, random_state=42, stratify=y_multiclass
)

print(f"\n📊 Multiclass Split:")
print(f"   Train: {len(X_multi_train)} windows")
print(f"   Test:  {len(X_multi_test)} windows")

In [None]:
# Create multiclass model
multiclass_model = create_cnn_lstm_model(
    input_shape=(MULTICLASS_WINDOW_SIZE, NUM_FEATURES),
    num_classes=len(MULTICLASS_GESTURES),
    name="multiclass_classifier"
)

multiclass_model.summary()

In [None]:
# Calculate class weights
multi_class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(y_multi_train),
    y=y_multi_train
)
multi_class_weights_dict = {i: w for i, w in enumerate(multi_class_weights)}

print(f"\nClass weights: {multi_class_weights_dict}")

# Train multiclass model
multi_history = multiclass_model.fit(
    X_multi_train, y_multi_train,
    epochs=50,
    batch_size=32,
    validation_split=0.2,
    class_weight=multi_class_weights_dict,
    callbacks=[
        keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(patience=5, factor=0.5)
    ],
    verbose=1
)

In [None]:
# Evaluate multiclass model
multi_test_loss, multi_test_acc = multiclass_model.evaluate(X_multi_test, y_multi_test)
print(f"\n✅ Multiclass Test Accuracy: {multi_test_acc*100:.2f}%")

# Predictions
y_multi_pred = np.argmax(multiclass_model.predict(X_multi_test), axis=1)

# Classification report
print("\nClassification Report (Multiclass):")
print(classification_report(y_multi_test, y_multi_pred, target_names=MULTICLASS_GESTURES))

# Confusion matrix
cm_multi = confusion_matrix(y_multi_test, y_multi_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm_multi, annot=True, fmt='d', cmap='Greens',
            xticklabels=MULTICLASS_GESTURES, yticklabels=MULTICLASS_GESTURES)
plt.title('Multiclass Classifier Confusion Matrix')
plt.ylabel('True')
plt.xlabel('Predicted')
plt.show()

## 7. Save Models

In [None]:
# Save to Google Drive
model_save_dir = '/content/drive/MyDrive/silksong_models'
os.makedirs(model_save_dir, exist_ok=True)

# Save binary model
binary_model_path = os.path.join(model_save_dir, 'binary_cnn_lstm.h5')
binary_model.save(binary_model_path)
print(f"✅ Binary model saved: {binary_model_path}")

# Save multiclass model
multi_model_path = os.path.join(model_save_dir, 'multiclass_cnn_lstm.h5')
multiclass_model.save(multi_model_path)
print(f"✅ Multiclass model saved: {multi_model_path}")

# Save gesture mappings
import json

mappings = {
    'binary_gestures': BINARY_GESTURES,
    'multiclass_gestures': MULTICLASS_GESTURES,
    'binary_window_size': BINARY_WINDOW_SIZE,
    'multiclass_window_size': MULTICLASS_WINDOW_SIZE,
    'num_features': NUM_FEATURES
}

mapping_path = os.path.join(model_save_dir, 'model_config.json')
with open(mapping_path, 'w') as f:
    json.dump(mappings, f, indent=2)
print(f"✅ Config saved: {mapping_path}")

## 8. Training History Visualization

In [None]:
# Plot binary training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Binary accuracy
ax1.plot(binary_history.history['accuracy'], label='Train')
ax1.plot(binary_history.history['val_accuracy'], label='Validation')
ax1.set_title('Binary Classifier Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.legend()
ax1.grid(True)

# Binary loss
ax2.plot(binary_history.history['loss'], label='Train')
ax2.plot(binary_history.history['val_loss'], label='Validation')
ax2.set_title('Binary Classifier Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Plot multiclass training history
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Multiclass accuracy
ax1.plot(multi_history.history['accuracy'], label='Train')
ax1.plot(multi_history.history['val_accuracy'], label='Validation')
ax1.set_title('Multiclass Classifier Accuracy')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.legend()
ax1.grid(True)

# Multiclass loss
ax2.plot(multi_history.history['loss'], label='Train')
ax2.plot(multi_history.history['val_loss'], label='Validation')
ax2.set_title('Multiclass Classifier Loss')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

## 9. Download Models to Local Machine

**After training, download the models:**

1. Go to Files tab (left sidebar)
2. Navigate to `drive/MyDrive/silksong_models/`
3. Download:
   - `binary_cnn_lstm.h5`
   - `multiclass_cnn_lstm.h5`
   - `model_config.json`
4. Place in your local `models/` directory

**Or use Google Drive sync** to automatically sync to your computer.

---

## Summary

✅ **Two models trained:**
- Binary: Walk vs Idle (locomotion)
- Multiclass: Jump, Punch, Turn_Left, Turn_Right (actions)

✅ **Parallel architecture** - Both run simultaneously in controller

✅ **Merged sensor data** - Clean, non-zero-inflated features

✅ **Expected accuracy**: 85-95% (vs 60-70% with old single model)

**Next:** Test models with your controller! 🎮