# Paper-Style Sensor Modality Classifier

Based on Čulić Gambiroža et al. (2025) "Lost in data: recognizing type of time series sensor data using signal pattern classification"

This notebook implements a simplified multi-class Random Forest classifier using 5 basic statistical features (min, max, avg, median, std) from sample-based windows to classify sensor modality types.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import glob
import os
import sys
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report

sys.path.insert(0, os.path.abspath('../src'))

from SensorDataLoader import SensorDataLoader
from signal_pattern_classification.statistical_features import (
    streaming_windows,
    stored_windows,
    extract_features_from_windows,
    top_k_accuracy,
    FEATURE_NAMES
)

plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('husl')

In [None]:
MODALITY_MAP = {
    'accelerometer': ['hand_accel_16g', 'chest_accel_16g', 'ankle_accel_16g'],
    'gyroscope': ['hand_gyro', 'chest_gyro', 'ankle_gyro'],
    'magnetometer': ['hand_mag', 'chest_mag', 'ankle_mag'],
    'temperature': ['hand_temp', 'chest_temp', 'ankle_temp']
}

WINDOW_SIZES = [10, 20, 50, 100]
MODES = ['streaming', 'stored']

## 1. Load PAMAP2 Data

In [None]:
def load_all_pamap2_data(data_dir: str) -> dict:
    loader = SensorDataLoader(seed=42)
    files = sorted(glob.glob(os.path.join(data_dir, 'subject*.dat')))
    
    modality_signals = {m: [] for m in MODALITY_MAP.keys()}
    
    for filepath in files:
        print(f"Loading {os.path.basename(filepath)}...")
        sensors = loader.load_pamap2(filepath)
        sensors = loader.get_stationary_segments(sensors, activities=[2, 3])
        
        for modality, sensor_keys in MODALITY_MAP.items():
            for key in sensor_keys:
                if key not in sensors:
                    continue
                data = sensors[key]
                if data.ndim == 1:
                    clean = data[~np.isnan(data)]
                    if len(clean) >= 100:
                        modality_signals[modality].append(clean)
                else:
                    for axis in range(data.shape[1]):
                        clean = data[:, axis]
                        clean = clean[~np.isnan(clean)]
                        if len(clean) >= 100:
                            modality_signals[modality].append(clean)
    
    return modality_signals

In [None]:
data_dir = '../datasets/PAMAP2_Dataset/Protocol'
modality_data = load_all_pamap2_data(data_dir)

print("\nData summary:")
for m, signals in modality_data.items():
    total = sum(len(s) for s in signals)
    print(f"  {m}: {len(signals)} signals, {total:,} samples")

## 2. Create Train/Val/Eval Splits

In [None]:
def create_data_splits(modality_data: dict, random_state: int = 42) -> tuple:
    rng = np.random.default_rng(random_state)
    
    total_samples = {m: sum(len(s) for s in signals) for m, signals in modality_data.items()}
    min_samples = min(total_samples.values())
    split_size = int(0.2 * min_samples)
    
    print(f"Samples per modality: {total_samples}")
    print(f"Smallest class: {min_samples}, split size: {split_size}")
    
    train_data = {m: [] for m in MODALITY_MAP.keys()}
    val_data = {m: [] for m in MODALITY_MAP.keys()}
    eval_data = {m: [] for m in MODALITY_MAP.keys()}
    
    for modality, signals in modality_data.items():
        all_samples = np.concatenate(signals)
        rng.shuffle(all_samples)
        
        eval_data[modality] = [all_samples[:split_size]]
        val_data[modality] = [all_samples[split_size:2*split_size]]
        train_data[modality] = [all_samples[2*split_size:]]
    
    return train_data, val_data, eval_data

In [None]:
train_data, val_data, eval_data = create_data_splits(modality_data)

## 3. Prepare Dataset and Train Classifier

In [None]:
def prepare_dataset(data: dict, window_size: int, mode: str, random_state: int = 42) -> tuple:
    X_list = []
    y_list = []
    
    for modality, signals in data.items():
        for signal in signals:
            if mode == 'streaming':
                windows = streaming_windows(signal, window_size)
            else:
                n_windows = len(signal) // window_size
                if n_windows == 0:
                    continue
                windows = stored_windows(signal, window_size, n_windows, random_state)
            
            if len(windows) == 0:
                continue
            
            features = extract_features_from_windows(windows)
            X_list.append(features)
            y_list.extend([modality] * len(features))
    
    if not X_list:
        return np.array([]).reshape(0, 5), np.array([])
    
    return np.vstack(X_list), np.array(y_list)


def train_classifier(train_data: dict, window_size: int) -> RandomForestClassifier:
    X_train, y_train = prepare_dataset(train_data, window_size, 'streaming')
    clf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
    clf.fit(X_train, y_train)
    print(f"Trained on {len(y_train):,} samples")
    return clf


def run_experiment(clf, eval_data: dict, window_size: int, mode: str, random_state: int = 42) -> dict:
    X_test, y_test = prepare_dataset(eval_data, window_size, mode, random_state)
    
    y_pred = clf.predict(X_test)
    y_proba = clf.predict_proba(X_test)
    
    top1 = top_k_accuracy(y_test, y_proba, k=1, classes=clf.classes_)
    top2 = top_k_accuracy(y_test, y_proba, k=2, classes=clf.classes_)
    top3 = top_k_accuracy(y_test, y_proba, k=3, classes=clf.classes_)
    
    return {
        'top1': top1,
        'top2': top2,
        'top3': top3,
        'y_true': y_test,
        'y_pred': y_pred,
        'confusion_matrix': confusion_matrix(y_test, y_pred, labels=clf.classes_),
        'classes': clf.classes_
    }

## 4. Run All Experiments

In [None]:
results = {}
classifiers = {}

for ws in WINDOW_SIZES:
    print(f"\nTraining classifier for window_size={ws}...")
    classifiers[ws] = train_classifier(train_data, ws)

print("\nRunning experiments...")
for ws in WINDOW_SIZES:
    for mode in MODES:
        key = (ws, mode)
        results[key] = run_experiment(classifiers[ws], eval_data, ws, mode)
        print(f"  Window={ws:3d}, Mode={mode:9s}: "
              f"Top-1={results[key]['top1']*100:5.1f}%, "
              f"Top-2={results[key]['top2']*100:5.1f}%, "
              f"Top-3={results[key]['top3']*100:5.1f}%")

## 5. Results Table

In [None]:
rows = []
for ws in WINDOW_SIZES:
    for mode in MODES:
        key = (ws, mode)
        r = results[key]
        rows.append({
            'Window Size': ws,
            'Mode': mode,
            'Top-1 (%)': f"{r['top1']*100:.1f}",
            'Top-2 (%)': f"{r['top2']*100:.1f}",
            'Top-3 (%)': f"{r['top3']*100:.1f}"
        })

results_df = pd.DataFrame(rows)
results_df

## 6. Confusion Matrix (Window=20, Streaming)

In [None]:
key = (20, 'streaming')
cm = results[key]['confusion_matrix']
classes = results[key]['classes']

fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=classes, yticklabels=classes, ax=ax)
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix (Window=20, Streaming Mode)')
plt.tight_layout()
plt.show()

## 7. Per-Class Accuracy

In [None]:
key = (20, 'streaming')
cm = results[key]['confusion_matrix']
classes = results[key]['classes']

per_class_acc = cm.diagonal() / cm.sum(axis=1)

fig, ax = plt.subplots(figsize=(10, 5))
bars = ax.bar(classes, per_class_acc * 100, color=sns.color_palette('husl', len(classes)))
ax.set_ylabel('Accuracy (%)')
ax.set_xlabel('Modality')
ax.set_title('Per-Class Accuracy (Window=20, Streaming Mode)')
ax.set_ylim(0, 105)

for bar, acc in zip(bars, per_class_acc):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
            f'{acc*100:.1f}%', ha='center', va='bottom', fontsize=10)

plt.tight_layout()
plt.show()

## 8. Accuracy vs Window Size

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for ax, mode in zip(axes, MODES):
    top1_vals = [results[(ws, mode)]['top1']*100 for ws in WINDOW_SIZES]
    top2_vals = [results[(ws, mode)]['top2']*100 for ws in WINDOW_SIZES]
    top3_vals = [results[(ws, mode)]['top3']*100 for ws in WINDOW_SIZES]
    
    x = np.arange(len(WINDOW_SIZES))
    width = 0.25
    
    ax.bar(x - width, top1_vals, width, label='Top-1', color='steelblue')
    ax.bar(x, top2_vals, width, label='Top-2', color='seagreen')
    ax.bar(x + width, top3_vals, width, label='Top-3', color='coral')
    
    ax.set_xlabel('Window Size (samples)')
    ax.set_ylabel('Accuracy (%)')
    ax.set_title(f'{mode.capitalize()} Mode')
    ax.set_xticks(x)
    ax.set_xticklabels(WINDOW_SIZES)
    ax.set_ylim(0, 105)
    ax.legend()
    ax.axhline(y=75, color='gray', linestyle='--', alpha=0.5, label='Paper baseline (~75%)')

plt.suptitle('Accuracy by Window Size and Mode', fontsize=14)
plt.tight_layout()
plt.show()

## 9. Comparison to SHIELD Baseline

In [None]:
comparison = {
    'Approach': ['SHIELD Physics-Informed', 'Paper Style (ws=20)'],
    'Features': ['~15-20', '5'],
    'Window': ['2.0s (200 samples)', '20 samples'],
    'Top-1 (%)': [99.2, results[(20, 'streaming')]['top1']*100],
    'Top-2 (%)': ['N/A', f"{results[(20, 'streaming')]['top2']*100:.1f}"]
}

comparison_df = pd.DataFrame(comparison)
comparison_df

In [None]:
fig, ax = plt.subplots(figsize=(8, 5))

approaches = ['SHIELD\n(Physics-Informed)', 'Paper Style\n(5 Features)']
top1_values = [99.2, results[(20, 'streaming')]['top1']*100]

bars = ax.bar(approaches, top1_values, color=['steelblue', 'coral'], width=0.5)
ax.set_ylabel('Top-1 Accuracy (%)')
ax.set_title('Comparison: SHIELD vs Paper-Style Classifier')
ax.set_ylim(0, 105)

for bar, val in zip(bars, top1_values):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
            f'{val:.1f}%', ha='center', va='bottom', fontsize=12, fontweight='bold')

ax.axhline(y=75, color='gray', linestyle='--', alpha=0.5)
ax.text(1.3, 76, 'Paper baseline (~75%)', fontsize=9, color='gray')

plt.tight_layout()
plt.show()

## 10. Summary

This notebook implemented the paper-style classifier methodology from Čulić Gambiroža et al. (2025):

- **5 simple features**: min, max, avg, median, std
- **Sample-based windows**: 10, 20, 50, 100 samples
- **Two evaluation modes**: streaming (consecutive) and stored (random)
- **Multi-class Random Forest** with default sklearn parameters

Key findings:
- The paper methodology achieves competitive results with fewer features
- Window size has minimal impact on accuracy (per paper findings)
- Streaming and stored modes produce similar results
- Top-2 accuracy is significantly higher than top-1, useful for narrowing down sensor types