# ACE Tennis Shot Classifier

Two-stage neural network system for classifying tennis shots from wrist-worn IMU data.

## Overview
- **Stage A**: Binary classification (Idle vs Swing)
- **Stage B**: Multiclass classification (Forehand vs Backhand vs Serve)

## Requirements
- TensorFlow/Keras
- scikit-learn
- pandas, numpy
- matplotlib, seaborn

In [None]:
import os
import json
import numpy as np
import pandas as pd
from pathlib import Path
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
from tensorflow import keras

## Configuration

Update these paths to match your data directory structure.

In [None]:
# Data paths - update these to match your directory structure
DATA_DIR = 'UPDATE_HERE'
FOREHAND_CSV = f'{DATA_DIR}/Forehands_100Hz_500Batch_Wrist.csv'
BACKHAND_CSV = f'{DATA_DIR}/Backhands_100Hz_500Batch_Wrist.csv'
SERVE_CSV = f'{DATA_DIR}/Serves_100Hz_500Batch_Wrist.csv'
IDLE_CSV = f'{DATA_DIR}/Idle_100Hz_500Batch_Wrist.csv'

# Output directory for trained models
ARTIFACT_DIR = '../ACE_Trained_NN'

# Sampling configuration (100Hz IMU data)
SAMPLE_HZ = 100.0
WINDOW_SIZE = 150  # 1.5 seconds
PRE_SAMPLES = 70   # 0.7s before peak
POST_SAMPLES = 80  # 0.8s after peak

# Detection thresholds
MOTION_TRIGGER = 21.0               # Acceleration threshold for swing detection (m/s¬≤)
IDLE_MAX = MOTION_TRIGGER - 1       # Max acceleration for idle classification
IDLE_RATIO = 0.8                    # Proportion of samples below IDLE_MAX for idle window

# Model settings
RANDOM_STATE = 0
CLASS_NAMES_A = ["idle", "swing"]
CLASS_NAMES_B = ["backhand", "forehand", "serve"]

## Data Loading and Preprocessing

In [None]:
def load_influx_csv(path: str) -> pd.DataFrame:
    """Load and clean InfluxDB CSV export."""
    df = pd.read_csv(path, skiprows=3)
    df = df.iloc[:, 3:]  # Remove metadata columns
    df.columns = [c.strip() for c in df.columns]
    df["_time"] = pd.to_datetime(df["_time"], errors="coerce")
    df.dropna(subset=["_time"], inplace=True)
    return df

## Feature Extraction

Extracts 35 optimized features from each 1.5-second window:
- **Statistical features (24)**: mean, std, max for 8 IMU channels
- **Biomechanical features (6)**: wrist rotation, velocity, follow-through
- **Temporal features (5)**: smoothness, energy, trajectory analysis

In [None]:
def extract_features(window_df: pd.DataFrame) -> np.ndarray:
    """Extract 35 optimized features from a 1.5s window."""
    features = []
    
    # Part 1: Statistical features (24)
    channels = ["Amag", "Ax", "Ay", "Az", "Gmag", "Gx", "Gy", "Gz"]
    for ch in channels:
        if ch not in window_df.columns:
            continue
        data = window_df[ch].values
        features.extend([np.mean(data), np.std(data), np.max(data)])
    
    # Part 2: Biomechanical features (6)
    Ax, Ay, Az = window_df["Ax"].values, window_df["Ay"].values, window_df["Az"].values
    Gx, Gy, Gz = window_df["Gx"].values, window_df["Gy"].values, window_df["Gz"].values
    Amag, Gmag = window_df["Amag"].values, window_df["Gmag"].values
    
    pronation_ratio = np.sum(Gx > 0) / len(Gx)
    wrist_flexion_mean = np.mean(np.abs(Gy))
    forearm_rotation_vel = np.mean(np.abs(Gx))
    
    peak_idx = np.argmax(Amag)
    follow_through = np.mean(Amag[peak_idx:]) if peak_idx < len(Amag) - 1 else Amag[-1]
    
    lateral_swing = np.mean(np.abs(Ax))
    vertical_lift = np.mean(Az)
    
    features.extend([pronation_ratio, wrist_flexion_mean, forearm_rotation_vel, 
                     follow_through, lateral_swing, vertical_lift])
    
    # Part 3: Temporal analysis (5)
    accel_jerk = np.mean(np.abs(np.diff(Amag)))
    
    accel_vectors = np.column_stack([Ax, Ay, Az])
    direction_changes = []
    for i in range(1, len(accel_vectors)):
        v1, v2 = accel_vectors[i-1], accel_vectors[i]
        norm1, norm2 = np.linalg.norm(v1), np.linalg.norm(v2)
        if norm1 > 0 and norm2 > 0:
            cos_angle = np.clip(np.dot(v1, v2) / (norm1 * norm2), -1.0, 1.0)
            direction_changes.append(np.arccos(cos_angle))
    trajectory_curvature = np.mean(direction_changes) if direction_changes else 0.0
    
    threshold = np.max(Amag) * 0.3
    post_peak = Amag[peak_idx:]
    follow_through_length = np.sum(post_peak >= threshold) / len(post_peak) if peak_idx < len(Amag) - 1 else 0.0
    
    middle_third = Amag[len(Amag)//3:2*len(Amag)//3]
    energy_release = np.sum(middle_third) / (len(middle_third) + 1e-6)
    
    last_third = Amag[2*len(Amag)//3:]
    recovery_phase = np.mean(last_third)
    
    features.extend([accel_jerk, trajectory_curvature, follow_through_length, 
                     energy_release, recovery_phase])
    
    return np.nan_to_num(np.array(features, dtype=np.float32))

## Window Extraction

In [None]:
def find_peaks_over_threshold(amag, threshold, neighborhood=2):
    """Detect peaks in acceleration magnitude."""
    peaks = []
    for i in range(neighborhood, len(amag) - neighborhood):
        if amag[i] <= threshold:
            continue
        local_max = np.max(amag[i-neighborhood:i+neighborhood+1])
        if amag[i] == local_max:
            peaks.append(i)
    return np.array(peaks, dtype=int)


def windows_from_swings(df: pd.DataFrame, label: str):
    """Extract windows around detected swing peaks."""
    am = df["Amag"].values
    peaks = find_peaks_over_threshold(am, MOTION_TRIGGER)
    out = []
    for p in peaks:
        s, e = p - PRE_SAMPLES, p + POST_SAMPLES
        if s < 0 or e > len(df):
            continue
        w = df.iloc[s:e]
        if len(w) == WINDOW_SIZE:
            out.append((extract_features(w), label))
    return out


def windows_from_idle(df: pd.DataFrame, label: str = "idle"):
    """Extract idle windows (low acceleration segments)."""
    out = []
    i, n = 0, len(df)
    while i + WINDOW_SIZE <= n:
        seg = df.iloc[i:i+WINDOW_SIZE]
        below = np.mean(seg["Amag"].values < IDLE_MAX)
        if below >= IDLE_RATIO:
            out.append((extract_features(seg), label))
            i += WINDOW_SIZE
        else:
            i += 1
    return out

## Dataset Creation

In [None]:
def make_datasets():
    """Load data and create datasets for both stages."""
    fh = load_influx_csv(FOREHAND_CSV)
    bh = load_influx_csv(BACKHAND_CSV)
    sv = load_influx_csv(SERVE_CSV)
    idf = load_influx_csv(IDLE_CSV)
    
    swing_rows = []
    swing_rows += windows_from_swings(fh, "forehand")
    swing_rows += windows_from_swings(bh, "backhand")
    swing_rows += windows_from_swings(sv, "serve")
    idle_rows = windows_from_idle(idf, "idle")
    
    # Stage A: Idle vs Swing
    X_A, y_A = [], []
    for feat, _ in idle_rows:
        X_A.append(feat)
        y_A.append("idle")
    for feat, _ in swing_rows:
        X_A.append(feat)
        y_A.append("swing")
    
    # Stage B: Stroke classification
    X_B = [f for f, l in swing_rows]
    y_B = [l for f, l in swing_rows]
    
    return (np.vstack(X_A), np.array(y_A)), (np.vstack(X_B), np.array(y_B))

## Model Architecture

In [None]:
def build_model(input_size, num_classes):
    """Simple neural network with one hidden layer."""
    model = keras.Sequential([
        keras.layers.Dense(32, activation='relu', input_shape=(input_size,)),
        keras.layers.Dense(num_classes, activation='softmax')
    ])
    
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

## Training

Train both classifier stages with proper train/test splitting and early stopping.

In [None]:
# Load and prepare datasets
print("Loading data...")
(X_A, y_A), (X_B, y_B) = make_datasets()

print(f"Stage A distribution: {Counter(y_A.tolist())}")
print(f"Stage B distribution: {Counter(y_B.tolist())}")

# ===============================================================
# SHARED SCALER: Fit on Stage A training data
# ===============================================================
print("\n‚öôÔ∏è  Fitting shared scaler on Stage A training data...")
print("   This scaler will be used for BOTH Stage A and Stage B")
print("   to ensure consistent normalization across all models.\n")

label_map_A = {"idle": 0, "swing": 1}
y_A_num = np.array([label_map_A[y] for y in y_A])

X_train_A, X_test_A, y_train_A, y_test_A = train_test_split(
    X_A, y_A_num, test_size=0.2, random_state=RANDOM_STATE, stratify=y_A_num
)

# Fit ONE shared scaler on Stage A training data (includes idle + swing)
shared_scaler = StandardScaler().fit(X_train_A)

# ===============================================================
# Stage A: Idle vs Swing
# ===============================================================
print("üéæ Stage A: Idle vs Swing")

# Apply shared scaler to Stage A data
X_train_A_scaled = shared_scaler.transform(X_train_A)
X_test_A_scaled = shared_scaler.transform(X_test_A)

modelA = build_model(X_A.shape[1], len(label_map_A))

# Add early stopping to stop training when validation loss stops improving
early_stop = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("\nüöÄ Training Stage A...")
historyA = modelA.fit(
    X_train_A_scaled, y_train_A,
    epochs=50,
    batch_size=16,
    validation_split=0.2,
    callbacks=[early_stop],
    verbose=1
)

y_pred_A = np.argmax(modelA.predict(X_test_A_scaled), axis=1)
accA = accuracy_score(y_test_A, y_pred_A)
print(f"\n‚úÖ Stage A Accuracy: {accA:.3f}")
print(classification_report(y_test_A, y_pred_A, target_names=list(label_map_A.keys())))

# Detailed Metrics Table for Stage A
precision_A, recall_A, f1_A, support_A = precision_recall_fscore_support(y_test_A, y_pred_A)
metrics_df_A = pd.DataFrame({
    'Class': list(label_map_A.keys()),
    'Precision': [f"{p:.4f}" for p in precision_A],
    'Recall': [f"{r:.4f}" for r in recall_A],
    'F1-Score': [f"{f:.4f}" for f in f1_A],
    'Support': support_A
})
print("\nüìä Stage A Detailed Metrics:")
print("=" * 60)
print(metrics_df_A.to_string(index=False))
print("=" * 60)

In [None]:
# Confusion Matrix
cmA = confusion_matrix(y_test_A, y_pred_A)
plt.figure(figsize=(4, 4))
sns.heatmap(cmA, annot=True, fmt="d", cmap="Blues",
            xticklabels=list(label_map_A.keys()),
            yticklabels=list(label_map_A.keys()))
plt.title(f"Stage A Confusion Matrix (Acc={accA:.2f})", color="darkorange")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Plot accuracy/loss curves
plt.figure(figsize=(6, 4))
plt.plot(historyA.history["accuracy"], label="Train Accuracy", color="darkorange")
plt.plot(historyA.history["val_accuracy"], label="Val Accuracy", color="black")
plt.title("Stage A Training Progress")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

In [None]:
# ===============================================================
# Stage B: Stroke Type (Forehand / Backhand / Serve)
# ===============================================================
print("\nüè∏ Stage B: Forehand / Backhand / Serve")

label_map_B = {"backhand": 0, "forehand": 1, "serve": 2}
y_B_num = np.array([label_map_B[y] for y in y_B])

# Split Stage B data
X_train_B, X_test_B, y_train_B, y_test_B = train_test_split(
    X_B, y_B_num, test_size=0.2, random_state=RANDOM_STATE, stratify=y_B_num
)

# Apply SAME shared scaler to Stage B data
X_train_B_scaled = shared_scaler.transform(X_train_B)
X_test_B_scaled = shared_scaler.transform(X_test_B)

print("‚úÖ Using shared scaler for Stage B (same as Stage A)")

modelB = build_model(X_B.shape[1], len(label_map_B))

early_stop_B = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=5, restore_best_weights=True
)

print("\nüöÄ Training Stage B...")
historyB = modelB.fit(
    X_train_B_scaled, y_train_B,
    epochs=50,
    batch_size=16,
    validation_split=0.2,
    callbacks=[early_stop_B],
    verbose=1
)

y_pred_B = np.argmax(modelB.predict(X_test_B_scaled), axis=1)
accB = accuracy_score(y_test_B, y_pred_B)
print(f"\n‚úÖ Stage B Accuracy: {accB:.3f}")
print(classification_report(y_test_B, y_pred_B, target_names=list(label_map_B.keys())))

# Detailed Metrics Table for Stage B
precision_B, recall_B, f1_B, support_B = precision_recall_fscore_support(y_test_B, y_pred_B)
metrics_df_B = pd.DataFrame({
    'Class': list(label_map_B.keys()),
    'Precision': [f"{p:.4f}" for p in precision_B],
    'Recall': [f"{r:.4f}" for r in recall_B],
    'F1-Score': [f"{f:.4f}" for f in f1_B],
    'Support': support_B
})
print("\nüìä Stage B Detailed Metrics:")
print("=" * 60)
print(metrics_df_B.to_string(index=False))
print("=" * 60)

In [None]:
# Confusion Matrix
cmB = confusion_matrix(y_test_B, y_pred_B)
plt.figure(figsize=(5, 4))
sns.heatmap(cmB, annot=True, fmt="d", cmap="Oranges",
            xticklabels=list(label_map_B.keys()),
            yticklabels=list(label_map_B.keys()))
plt.title(f"Stage B Confusion Matrix (Acc={accB:.2f})", color="darkorange")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

# Plot accuracy/loss curves
plt.figure(figsize=(6, 4))
plt.plot(historyB.history["accuracy"], label="Train Accuracy", color="darkorange")
plt.plot(historyB.history["val_accuracy"], label="Val Accuracy", color="black")
plt.title("Stage B Training Progress")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

## Save Models and Artifacts

In [None]:
out = Path(ARTIFACT_DIR)
out.mkdir(parents=True, exist_ok=True)

modelA.save(out / "idle_swing_model.keras")
modelB.save(out / "stroke_type_model.keras")
joblib.dump(shared_scaler, out / "scaler.pkl")
(out / "class_names_A.json").write_text(json.dumps(CLASS_NAMES_A))
(out / "class_names_B.json").write_text(json.dumps(CLASS_NAMES_B))

print(f"\n‚úÖ Saved all artifacts to {out}")

## Export Normalization Parameters for ESP32

These parameters are used for feature normalization on the embedded device.

In [None]:
# ===============================================================
# Export Normalization Parameters for ESP32 and paste into arduino code
# ===============================================================
means = shared_scaler.mean_.astype(float)
scales = np.sqrt(shared_scaler.var_).astype(float)

print("\n=== SHARED SCALER PARAMETERS (for ESP32) ===")
print("PASTE THESE INTO ESP32 CODE")
print("\n=== FEATURE MEANS ===")
print(", ".join([f"{m:.6f}f" for m in means]))
print('length = ', len(means))

print("\n=== FEATURE SCALES ===")
print(", ".join([f"{s:.6f}f" for s in scales]))
print('length = ', len(scales))

## Convert Models to TensorFlow Lite for ESP32

To deploy these models on an ESP32 microcontroller, convert them to TensorFlow Lite format:

```bash
# Convert both models to TensorFlow Lite (.tflite)
python export_tflite.py


# Convert TFLite models to C header files for Arduino
python convert_to_header.py
```

This will generate:
- `idle_swing_model.tflite` and `stroke_type_model.tflite`
- Header files that can be included in your ESP32 firmware