# Pushup Posture & Phase Classification Model

Multi-task CNN-LSTM model for classifying pushup phase and posture from IMU data.

**Labels:**
- **Phase**: top, moving-down, bottom, moving-up, not-in-pushup
- **Posture**: good-form, hips-sagging, hips-high, partial-rom

## 1. Setup and Imports

In [None]:
import numpy as np
import pandas as pd
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import os
import subprocess

print(f"TensorFlow version: {tf.__version__}")
print(f"Keras version: {keras.__version__}")

## 2. Upload Dataset (Colab)

In [None]:
# Upload your pushup_data_YYYYMMDD_HHMMSS.json files
from google.colab import files

print("Please upload your pushup dataset JSON file(s)...")
print("You can select MULTIPLE files if you have data from different sessions")
uploaded = files.upload()

# Get all uploaded filenames
DATA_FILES = list(uploaded.keys())
print(f"\nUploaded {len(DATA_FILES)} file(s):")
for filename in DATA_FILES:
    print(f"  - {filename}")

## 3. Load and Explore Data

In [None]:
# Load and combine all JSON files
all_sessions = []

for data_file in DATA_FILES:
    with open(data_file, 'r') as f:
        data = json.load(f)
        all_sessions.extend(data['sessions'])
    print(f"Loaded {len(data['sessions'])} sessions from {data_file}")

print(f"\nTotal sessions across all files: {len(all_sessions)}")

# Show first session structure
if len(all_sessions) > 0:
    print(f"\nFirst session structure:")
    first_session = all_sessions[0]
    print(f"  Phase: {first_session['phase_label']}")
    print(f"  Posture: {first_session['posture_label']}")
    print(f"  Samples: {len(first_session['imu_data'])}")
    print(f"  Duration: {first_session['duration']:.2f}s")

    # Count label distributions
    phase_counts = {}
    posture_counts = {}

    for session in all_sessions:
        phase = session['phase_label']
        posture = session['posture_label']
        phase_counts[phase] = phase_counts.get(phase, 0) + 1
        posture_counts[posture] = posture_counts.get(posture, 0) + 1

    print(f"\nPhase distribution: {phase_counts}")
    print(f"Posture distribution: {posture_counts}")
else:
    print("Warning: No sessions found!")

## 4. Preprocessing - Sliding Window Segmentation

In [None]:
# Hyperparameters
WINDOW_SIZE = 100  # samples per window (~1-2 seconds at 50-100 Hz)
STRIDE = 25        # overlap of 75%
SAMPLING_RATE = 100  # Hz (adjust based on your data)

def create_windows(imu_data, window_size, stride):
    """
    Create sliding windows from IMU time series.
    
    Args:
        imu_data: List of dicts with keys 'ax', 'ay', 'az', 'gx', 'gy', 'gz'
        window_size: Number of samples per window
        stride: Step size between windows
    
    Returns:
        np.array of shape [num_windows, window_size, 6]
    """
    # Convert to numpy array [samples, 6]
    data_array = np.array([
        [sample['ax'], sample['ay'], sample['az'],
         sample['gx'], sample['gy'], sample['gz']]
        for sample in imu_data
    ])
    
    windows = []
    for i in range(0, len(data_array) - window_size + 1, stride):
        window = data_array[i:i+window_size]
        windows.append(window)
    
    return np.array(windows)

# Process all sessions from all files
X_windows = []
y_phase = []
y_posture = []

for session in all_sessions:
    windows = create_windows(session['imu_data'], WINDOW_SIZE, STRIDE)
    
    # Each window inherits the session's labels
    for window in windows:
        X_windows.append(window)
        y_phase.append(session['phase_label'])
        y_posture.append(session['posture_label'])

X = np.array(X_windows)
print(f"\nCreated {len(X)} windows of shape {X.shape}")
print(f"Expected shape: [num_windows, {WINDOW_SIZE}, 6]")

## 5. Encode Labels

In [None]:
# Encode labels to integers
phase_encoder = LabelEncoder()
posture_encoder = LabelEncoder()

y_phase_encoded = phase_encoder.fit_transform(y_phase)
y_posture_encoded = posture_encoder.fit_transform(y_posture)

# Convert to categorical (one-hot)
num_phase_classes = len(phase_encoder.classes_)
num_posture_classes = len(posture_encoder.classes_)

y_phase_cat = keras.utils.to_categorical(y_phase_encoded, num_phase_classes)
y_posture_cat = keras.utils.to_categorical(y_posture_encoded, num_posture_classes)

print(f"Phase classes ({num_phase_classes}): {phase_encoder.classes_}")
print(f"Posture classes ({num_posture_classes}): {posture_encoder.classes_}")

## 6. Train/Test Split

In [None]:
# Split data (80/20)
X_train, X_test, y_phase_train, y_phase_test, y_posture_train, y_posture_test = train_test_split(
    X, y_phase_cat, y_posture_cat, test_size=0.2, random_state=42, stratify=y_phase_encoded
)

print(f"Training samples: {len(X_train)}")
print(f"Testing samples: {len(X_test)}")
print(f"Input shape: {X_train.shape}")

## 7. Normalize Data

In [None]:
# Compute mean and std on training data
# Shape: [window_size, 6] -> compute per-channel stats
mean = X_train.reshape(-1, 6).mean(axis=0)
std = X_train.reshape(-1, 6).std(axis=0)

# Normalize
X_train_norm = (X_train - mean) / (std + 1e-8)
X_test_norm = (X_test - mean) / (std + 1e-8)

print(f"Mean: {mean}")
print(f"Std: {std}")

# Save normalization parameters
norm_params = {
    'mean': mean.tolist(),
    'std': std.tolist()
}
with open('norm_params.json', 'w') as f:
    json.dump(norm_params, f)

## 8. Build Multi-Task Model

In [None]:
def build_multitask_model(input_shape, num_phase_classes, num_posture_classes):
    """
    CNN-LSTM multi-task model for phase and posture classification.
    
    Architecture:
    - 1D CNN layers for feature extraction
    - LSTM layer for temporal modeling
    - Two output heads (phase, posture)
    """
    inputs = keras.Input(shape=input_shape, name='imu_input')
    
    # CNN feature extraction
    x = layers.Conv1D(32, kernel_size=5, activation='relu', padding='same')(inputs)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)
    
    x = layers.Conv1D(64, kernel_size=5, activation='relu', padding='same')(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)
    
    # LSTM temporal modeling
    x = layers.LSTM(64, return_sequences=False)(x)
    x = layers.Dropout(0.3)(x)
    
    # Shared dense layer
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    
    # Output heads
    phase_output = layers.Dense(num_phase_classes, activation='softmax', name='phase')(x)
    posture_output = layers.Dense(num_posture_classes, activation='softmax', name='posture')(x)
    
    model = keras.Model(inputs=inputs, outputs=[phase_output, posture_output])
    
    return model

# Build model
model = build_multitask_model(
    input_shape=(WINDOW_SIZE, 6),
    num_phase_classes=num_phase_classes,
    num_posture_classes=num_posture_classes
)

model.summary()

### 8.1 Alternative: TFLite Micro Compatible Model (No LSTM)

**IMPORTANT:** The LSTM model above won't work on ESP32 because TFLite Micro doesn't support LSTM ops.

Use this simpler CNN-only model instead:

In [None]:
def build_tflite_micro_compatible_model(input_shape, num_phase_classes, num_posture_classes):
    """
    CNN-only model compatible with TFLite Micro (no LSTM).
    
    Uses only 1D convolutions, pooling, and dense layers.
    All ops are supported on ESP32.
    """
    inputs = keras.Input(shape=input_shape, name='imu_input')
    
    # 1D CNN feature extraction (deeper to compensate for no LSTM)
    x = layers.Conv1D(32, kernel_size=5, activation='relu', padding='same')(inputs)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)
    
    x = layers.Conv1D(64, kernel_size=5, activation='relu', padding='same')(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)
    
    x = layers.Conv1D(128, kernel_size=3, activation='relu', padding='same')(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)
    
    # Global pooling to reduce to fixed size
    x = layers.GlobalAveragePooling1D()(x)
    
    # Dense layers
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    
    # Output heads
    phase_output = layers.Dense(num_phase_classes, activation='softmax', name='phase')(x)
    posture_output = layers.Dense(num_posture_classes, activation='softmax', name='posture')(x)
    
    model = keras.Model(inputs=inputs, outputs=[phase_output, posture_output])
    
    return model

# Build TFLite Micro compatible model
print("Building TFLite Micro compatible model (CNN-only, no LSTM)...")
model_micro = build_tflite_micro_compatible_model(
    input_shape=(WINDOW_SIZE, 6),
    num_phase_classes=num_phase_classes,
    num_posture_classes=num_posture_classes
)

model_micro.summary()

print("\nThis model uses ONLY TFLite-compatible ops:")
print("- Conv1D")
print("- MaxPooling1D")  
print("- GlobalAveragePooling1D")
print("- Dense")
print("- Dropout (removed during conversion)")
print("\nIt will work on ESP32 TFLite Micro!")

## 9. Compile and Train

In [None]:
# IMPORTANT: Use the TFLite Micro compatible model!
# Change this line to use model_micro instead of model
# model = model_micro

# For now, train the LSTM model for comparison
# But remember: You MUST use model_micro for ESP32 deployment!

# Compile with separate losses for each output
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    loss={
        'phase': 'categorical_crossentropy',
        'posture': 'categorical_crossentropy'
    },
    metrics={
        'phase': ['accuracy'],
        'posture': ['accuracy']
    },
    loss_weights={'phase': 1.0, 'posture': 1.0}
)

# Train
history = model.fit(
    X_train_norm,
    {'phase': y_phase_train, 'posture': y_posture_train},
    validation_data=(
        X_test_norm,
        {'phase': y_phase_test, 'posture': y_posture_test}
    ),
    epochs=50,
    batch_size=32,
    verbose=1
)

print("\nTraining complete!")
print("\n⚠️  WARNING: This LSTM model won't work on ESP32!")
print("For ESP32 deployment, retrain using model_micro instead.")

## 10. Evaluate Model

In [None]:
# Plot training history
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Phase loss
axes[0, 0].plot(history.history['phase_loss'], label='Train')
axes[0, 0].plot(history.history['val_phase_loss'], label='Val')
axes[0, 0].set_title('Phase Loss')
axes[0, 0].set_xlabel('Epoch')
axes[0, 0].set_ylabel('Loss')
axes[0, 0].legend()
axes[0, 0].grid(True)

# Phase accuracy
axes[0, 1].plot(history.history['phase_accuracy'], label='Train')
axes[0, 1].plot(history.history['val_phase_accuracy'], label='Val')
axes[0, 1].set_title('Phase Accuracy')
axes[0, 1].set_xlabel('Epoch')
axes[0, 1].set_ylabel('Accuracy')
axes[0, 1].legend()
axes[0, 1].grid(True)

# Posture loss
axes[1, 0].plot(history.history['posture_loss'], label='Train')
axes[1, 0].plot(history.history['val_posture_loss'], label='Val')
axes[1, 0].set_title('Posture Loss')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].legend()
axes[1, 0].grid(True)

# Posture accuracy
axes[1, 1].plot(history.history['posture_accuracy'], label='Train')
axes[1, 1].plot(history.history['val_posture_accuracy'], label='Val')
axes[1, 1].set_title('Posture Accuracy')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('Accuracy')
axes[1, 1].legend()
axes[1, 1].grid(True)

plt.tight_layout()
plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
plt.show()

# Final evaluation
results = model.evaluate(X_test_norm, {'phase': y_phase_test, 'posture': y_posture_test}, verbose=0)
print(f"\nTest Results:")
print(f"Phase Accuracy: {results[3]:.4f}")
print(f"Posture Accuracy: {results[4]:.4f}")

## 11. Save Keras Model

In [None]:
# Save as .h5 file
MODEL_FILE = 'pushup_model.h5'
model.save(MODEL_FILE)
print(f"Saved model to {MODEL_FILE}")

# Save label encoders
metadata = {
    'phase_classes': phase_encoder.classes_.tolist(),
    'posture_classes': posture_encoder.classes_.tolist(),
    'window_size': WINDOW_SIZE,
    'stride': STRIDE,
    'sampling_rate': SAMPLING_RATE,
    'num_phase_classes': num_phase_classes,
    'num_posture_classes': num_posture_classes
}

with open('model_metadata.json', 'w') as f:
    json.dump(metadata, f, indent=2)

print("Saved metadata to model_metadata.json")

## 12. Convert to TensorFlow Lite

### 12.1 Float32 Model

In [None]:
# Export as TensorFlow SavedModel format first (for TFLite conversion)
SAVED_MODEL_DIR = 'saved_model'
model.export(SAVED_MODEL_DIR)

# Convert to TFLite (Float32)
converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL_DIR)

# IMPORTANT: TFLite Micro on ESP32 doesn't support SELECT_TF_OPS
# We need to use only built-in TFLite ops
# This means we CANNOT use native LSTM layers on embedded devices

print("WARNING: This model uses LSTM which requires SELECT_TF_OPS")
print("These ops are NOT supported on TFLite Micro (ESP32)")
print("Converting anyway for reference, but you'll need to simplify the model")
print("Recommendation: Replace LSTM with 1D Conv layers or SimpleRNN")

converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,  # Use only TFLite built-in ops
]

try:
    tflite_model_float = converter.convert()
    
    # Save float32 model
    FLOAT_MODEL_FILE = 'pushup_model_float32.tflite'
    with open(FLOAT_MODEL_FILE, 'wb') as f:
        f.write(tflite_model_float)
    
    float_size = len(tflite_model_float) / 1024
    print(f"\nFloat32 TFLite model size: {float_size:.2f} KB")
    print(f"Saved to {FLOAT_MODEL_FILE}")
except Exception as e:
    print(f"\nERROR during conversion: {e}")
    print("\nThe model architecture is not compatible with TFLite Micro.")
    print("You need to rebuild the model without LSTM layers.")
    print("See section 8.1 below for a SimpleRNN-based alternative.")

### 12.2 INT8 Quantized Model

In [None]:
# Representative dataset for quantization
def representative_dataset():
    for i in range(100):
        # Use random samples from training data
        idx = np.random.randint(0, len(X_train_norm))
        sample = X_train_norm[idx:idx+1].astype(np.float32)
        yield [sample]

# Convert to TFLite with INT8 quantization
converter = tf.lite.TFLiteConverter.from_saved_model(SAVED_MODEL_DIR)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model_quantized = converter.convert()

print("INT8 quantization complete!")

### 12.3 Save Quantized Model

In [None]:
# Save quantized model
QUANTIZED_MODEL_FILE = 'pushup_model_quantized.tflite'
with open(QUANTIZED_MODEL_FILE, 'wb') as f:
    f.write(tflite_model_quantized)

quantized_size = len(tflite_model_quantized) / 1024
compression_ratio = float_size / quantized_size

print(f"\nQuantized TFLite model size: {quantized_size:.2f} KB")
print(f"Compression ratio: {compression_ratio:.2f}x")
print(f"Saved to {QUANTIZED_MODEL_FILE}")

## 13. Test TFLite Model Accuracy

In [None]:
def evaluate_tflite_model(tflite_model_path, X_test, y_phase_test, y_posture_test, is_quantized=False):
    """
    Evaluate TFLite model accuracy on test set.
    """
    # Load TFLite model
    interpreter = tf.lite.Interpreter(model_path=tflite_model_path)
    interpreter.allocate_tensors()
    
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    # Get input/output scales and zero points for quantized model
    input_scale, input_zero_point = input_details[0]['quantization']
    
    phase_predictions = []
    posture_predictions = []
    
    for i in range(len(X_test)):
        input_data = X_test[i:i+1].astype(np.float32)
        
        # Quantize input if needed
        if is_quantized:
            input_data = input_data / input_scale + input_zero_point
            input_data = input_data.astype(np.int8)
        
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        
        # Get outputs (phase and posture)
        phase_output = interpreter.get_tensor(output_details[0]['index'])[0]
        posture_output = interpreter.get_tensor(output_details[1]['index'])[0]
        
        # Dequantize outputs if needed
        if is_quantized:
            phase_scale, phase_zero_point = output_details[0]['quantization']
            posture_scale, posture_zero_point = output_details[1]['quantization']
            phase_output = (phase_output.astype(np.float32) - phase_zero_point) * phase_scale
            posture_output = (posture_output.astype(np.float32) - posture_zero_point) * posture_scale
        
        phase_predictions.append(np.argmax(phase_output))
        posture_predictions.append(np.argmax(posture_output))
    
    # Calculate accuracy
    phase_true = np.argmax(y_phase_test, axis=1)
    posture_true = np.argmax(y_posture_test, axis=1)
    
    phase_accuracy = np.mean(np.array(phase_predictions) == phase_true)
    posture_accuracy = np.mean(np.array(posture_predictions) == posture_true)
    
    return phase_accuracy, posture_accuracy

# Test Float32 model
print("Evaluating Float32 TFLite model...")
float_phase_acc, float_posture_acc = evaluate_tflite_model(
    FLOAT_MODEL_FILE, X_test_norm, y_phase_test, y_posture_test, is_quantized=False
)
print(f"  Phase Accuracy: {float_phase_acc:.4f}")
print(f"  Posture Accuracy: {float_posture_acc:.4f}")

# Test Quantized model
print("\nEvaluating INT8 Quantized TFLite model...")
quant_phase_acc, quant_posture_acc = evaluate_tflite_model(
    QUANTIZED_MODEL_FILE, X_test_norm, y_phase_test, y_posture_test, is_quantized=True
)
print(f"  Phase Accuracy: {quant_phase_acc:.4f}")
print(f"  Posture Accuracy: {quant_posture_acc:.4f}")

# Accuracy drop
print("\nAccuracy drop due to quantization:")
print(f"  Phase: {(float_phase_acc - quant_phase_acc)*100:.2f}%")
print(f"  Posture: {(float_posture_acc - quant_posture_acc)*100:.2f}%")

## 14. Convert TFLite to C Array (.cc file)

In [None]:
# Use xxd to convert binary to C array
CC_MODEL_FILE = 'pushup_model_quantized.cc'

print(f"Converting {QUANTIZED_MODEL_FILE} to C array...")

# Run xxd command
result = subprocess.run(
    ['xxd', '-i', QUANTIZED_MODEL_FILE],
    capture_output=True,
    text=True
)

if result.returncode == 0:
    # Write output to .cc file
    with open(CC_MODEL_FILE, 'w') as f:
        f.write(result.stdout)
    
    print(f"Converted {QUANTIZED_MODEL_FILE} to {CC_MODEL_FILE}")
    print(f"Model can now be embedded in Arduino/ESP32 firmware")
    
    # Show first few lines
    with open(CC_MODEL_FILE, 'r') as f:
        lines = f.readlines()[:10]
    print("\nFirst 10 lines of .cc file:")
    print(''.join(lines))
else:
    print(f"Error running xxd: {result.stderr}")

## 15. Download All Files

In [None]:
# Download all generated files
from google.colab import files

files_to_download = [
    MODEL_FILE,                # pushup_model.h5
    'model_metadata.json',     # Label encoders and config
    FLOAT_MODEL_FILE,          # Float32 TFLite
    QUANTIZED_MODEL_FILE,      # INT8 quantized TFLite
    CC_MODEL_FILE              # C array for embedded deployment
]

print("Downloading files...")
for filename in files_to_download:
    if os.path.exists(filename):
        files.download(filename)
        print(f"  Downloaded: {filename}")
    else:
        print(f"  Warning: {filename} not found")

print("\nAll files downloaded!")