# Feature Engineering for Activity Recognition

This notebook demonstrates feature extraction techniques for time-series sensor data:
- Time-domain features (statistical measures)
- Frequency-domain features (FFT-based)
- Windowing strategies for raw time-series

The UCI HAR dataset provides pre-computed features, but we also demonstrate how to extract features from raw sensor data.

In [None]:
import sys
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

# Add src to path
sys.path.insert(0, str(Path("../src").resolve()))

from fittrack.data.ingestion import HARDataLoader, ACTIVITY_LABELS
from fittrack.features.engineering import (
    FeatureConfig,
    FeatureExtractor,
    extract_time_domain_features,
    extract_frequency_domain_features,
    extract_har_features,
    create_windows,
    compute_sma,
    compute_zero_crossing_rate,
)

plt.style.use("seaborn-v0_8-whitegrid")
%matplotlib inline

FIGURES_DIR = Path("../docs/figures")
FIGURES_DIR.mkdir(parents=True, exist_ok=True)

## 1. Load UCI HAR Dataset

In [None]:
loader = HARDataLoader()
train_data, test_data = loader.load_all()

print(f"Training samples: {train_data.n_samples:,}")
print(f"Features per sample: {train_data.n_features}")

## 2. Feature Subset Extraction

The UCI HAR dataset has 561 pre-computed features. Let's explore different subsets.

In [None]:
# Extract feature subsets
time_features = extract_har_features(train_data.X, feature_subset="time")
body_acc_features = extract_har_features(train_data.X, feature_subset="body_acc")
gyro_features = extract_har_features(train_data.X, feature_subset="gyro")

print(f"Time-domain features: {time_features.shape[1]}")
print(f"Body acceleration features: {body_acc_features.shape[1]}")
print(f"Gyroscope features: {gyro_features.shape[1]}")

In [None]:
# Feature naming breakdown
all_features = train_data.X.columns.tolist()

# Count by type
feature_types = {
    "mean()": len([f for f in all_features if "mean()" in f]),
    "std()": len([f for f in all_features if "std()" in f]),
    "mad()": len([f for f in all_features if "mad()" in f]),
    "max()": len([f for f in all_features if "max()" in f]),
    "min()": len([f for f in all_features if "min()" in f]),
    "energy()": len([f for f in all_features if "energy()" in f]),
    "entropy()": len([f for f in all_features if "entropy()" in f]),
}

print("Feature type counts:")
for ftype, count in sorted(feature_types.items(), key=lambda x: -x[1]):
    print(f"  {ftype}: {count}")

## 3. Time-Domain Feature Extraction Demo

Demonstrate extracting time-domain features from a synthetic signal.

In [None]:
# Create synthetic accelerometer data (3-axis)
np.random.seed(42)
t = np.linspace(0, 2, 128)  # 2 seconds at 64Hz

# Simulate walking: periodic signal with noise
walking_x = 0.5 * np.sin(2 * np.pi * 2 * t) + 0.1 * np.random.randn(128)  # 2Hz stride
walking_y = 0.3 * np.sin(2 * np.pi * 2 * t + np.pi/4) + 0.1 * np.random.randn(128)
walking_z = 0.8 * np.sin(2 * np.pi * 4 * t) + 0.15 * np.random.randn(128)  # 4Hz vertical
walking_signal = np.column_stack([walking_x, walking_y, walking_z])

# Simulate sitting: mostly static with small noise
sitting_signal = 0.05 * np.random.randn(128, 3)
sitting_signal[:, 2] += 1.0  # Gravity component

fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)

axes[0].plot(t, walking_signal)
axes[0].set_ylabel("Acceleration")
axes[0].set_title("Simulated Walking Signal")
axes[0].legend(["X", "Y", "Z"])

axes[1].plot(t, sitting_signal)
axes[1].set_ylabel("Acceleration")
axes[1].set_xlabel("Time (s)")
axes[1].set_title("Simulated Sitting Signal")
axes[1].legend(["X", "Y", "Z"])

plt.tight_layout()
plt.show()

In [None]:
# Extract time-domain features
walking_features = extract_time_domain_features(walking_signal, ["X", "Y", "Z"])
sitting_features = extract_time_domain_features(sitting_signal, ["X", "Y", "Z"])

# Compare key features
comparison_df = pd.DataFrame({
    "Feature": list(walking_features.keys()),
    "Walking": list(walking_features.values()),
    "Sitting": list(sitting_features.values()),
})
comparison_df["Difference"] = abs(comparison_df["Walking"] - comparison_df["Sitting"])

# Show features with largest differences
print("Features with largest activity differences:")
print(comparison_df.nlargest(10, "Difference").to_string(index=False))

In [None]:
# Visualize feature differences
key_features = ["std_X", "std_Y", "std_Z", "sma", "zcr_X", "zcr_Y", "energy_X"]

fig, ax = plt.subplots(figsize=(10, 5))
x = np.arange(len(key_features))
width = 0.35

walking_vals = [walking_features[f] for f in key_features]
sitting_vals = [sitting_features[f] for f in key_features]

ax.bar(x - width/2, walking_vals, width, label="Walking", color="steelblue")
ax.bar(x + width/2, sitting_vals, width, label="Sitting", color="coral")
ax.set_xlabel("Feature")
ax.set_ylabel("Value")
ax.set_title("Time-Domain Features: Walking vs Sitting")
ax.set_xticks(x)
ax.set_xticklabels(key_features, rotation=45, ha="right")
ax.legend()

plt.tight_layout()
plt.savefig(FIGURES_DIR / "time_domain_features.png", dpi=150, bbox_inches="tight")
plt.show()

## 4. Frequency-Domain Feature Extraction

In [None]:
# Extract frequency features
walking_freq_features = extract_frequency_domain_features(
    walking_signal, sampling_rate=64.0, n_coeffs=5, axis_names=["X", "Y", "Z"]
)
sitting_freq_features = extract_frequency_domain_features(
    sitting_signal, sampling_rate=64.0, n_coeffs=5, axis_names=["X", "Y", "Z"]
)

print("Walking - Dominant frequencies:")
print(f"  X: {walking_freq_features['dominant_freq_X']:.2f} Hz")
print(f"  Y: {walking_freq_features['dominant_freq_Y']:.2f} Hz")
print(f"  Z: {walking_freq_features['dominant_freq_Z']:.2f} Hz")

print("\nSitting - Dominant frequencies:")
print(f"  X: {sitting_freq_features['dominant_freq_X']:.2f} Hz")
print(f"  Y: {sitting_freq_features['dominant_freq_Y']:.2f} Hz")
print(f"  Z: {sitting_freq_features['dominant_freq_Z']:.2f} Hz")

In [None]:
# Visualize FFT
fig, axes = plt.subplots(2, 3, figsize=(14, 8))

for i, (signal, name) in enumerate([(walking_signal, "Walking"), (sitting_signal, "Sitting")]):
    for j, axis in enumerate(["X", "Y", "Z"]):
        ax = axes[i, j]
        fft_vals = np.fft.rfft(signal[:, j])
        freqs = np.fft.rfftfreq(len(signal), d=1/64.0)
        
        ax.plot(freqs[1:], np.abs(fft_vals[1:]), linewidth=1.5)
        ax.set_xlabel("Frequency (Hz)")
        ax.set_ylabel("Magnitude")
        ax.set_title(f"{name} - {axis}-axis FFT")
        ax.set_xlim(0, 20)

plt.tight_layout()
plt.savefig(FIGURES_DIR / "fft_comparison.png", dpi=150, bbox_inches="tight")
plt.show()

## 5. Windowing for Raw Time-Series

In [None]:
# Demonstrate windowing
raw_signal = np.random.randn(500, 3)  # 500 samples, 3 axes

# Different overlap strategies
windows_50 = create_windows(raw_signal, window_size=64, overlap=0.5)
windows_75 = create_windows(raw_signal, window_size=64, overlap=0.75)
windows_0 = create_windows(raw_signal, window_size=64, overlap=0.0)

print(f"500 samples with window_size=64:")
print(f"  50% overlap: {windows_50.shape[0]} windows")
print(f"  75% overlap: {windows_75.shape[0]} windows")
print(f"   0% overlap: {windows_0.shape[0]} windows")

In [None]:
# Visualize windowing
fig, ax = plt.subplots(figsize=(12, 4))

signal_1d = raw_signal[:200, 0]  # First 200 samples, first axis
ax.plot(signal_1d, 'k-', alpha=0.3, label="Raw signal")

# Show first 3 windows
colors = ['blue', 'green', 'red']
for i in range(3):
    start = i * 32  # 50% overlap with window_size=64
    end = start + 64
    ax.axvspan(start, end, alpha=0.2, color=colors[i], label=f"Window {i+1}")
    ax.plot(range(start, end), signal_1d[start:end], color=colors[i], linewidth=2)

ax.set_xlabel("Sample Index")
ax.set_ylabel("Value")
ax.set_title("Sliding Window with 50% Overlap")
ax.legend()
ax.set_xlim(0, 200)

plt.tight_layout()
plt.savefig(FIGURES_DIR / "windowing_demo.png", dpi=150, bbox_inches="tight")
plt.show()

## 6. Full Feature Extraction Pipeline

In [None]:
# Configure feature extractor
config = FeatureConfig(
    include_time_domain=True,
    include_frequency_domain=True,
    window_size=128,
    window_overlap=0.5,
    n_fft_coeffs=10,
)

extractor = FeatureExtractor(config)

# Generate longer synthetic data
np.random.seed(42)
t_long = np.linspace(0, 10, 640)  # 10 seconds

# Walking signal
walking_long = np.column_stack([
    0.5 * np.sin(2 * np.pi * 2 * t_long) + 0.1 * np.random.randn(640),
    0.3 * np.sin(2 * np.pi * 2 * t_long + np.pi/4) + 0.1 * np.random.randn(640),
    0.8 * np.sin(2 * np.pi * 4 * t_long) + 0.15 * np.random.randn(640),
])

# Extract features
features_df = extractor.extract_features(
    walking_long,
    axis_names=["X", "Y", "Z"],
    sampling_rate=64.0,
)

print(f"Extracted {len(features_df)} windows")
print(f"Features per window: {len(features_df.columns)}")
print(f"\nFeature columns:")
print(features_df.columns.tolist()[:20], "...")

In [None]:
# Show feature statistics across windows
print("\nFeature statistics across windows:")
features_df.describe().T.head(15)

## 7. Feature Importance Analysis (UCI HAR)

Analyze which pre-computed features in UCI HAR are most discriminative.

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder

# Encode labels
le = LabelEncoder()
y_encoded = le.fit_transform(train_data.y["activity"])

# Quick RF for feature importance (subset for speed)
rf = RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=-1)
rf.fit(train_data.X, y_encoded)

# Get feature importances
importance_df = pd.DataFrame({
    "feature": train_data.X.columns,
    "importance": rf.feature_importances_
}).sort_values("importance", ascending=False)

print("Top 20 most important features:")
print(importance_df.head(20).to_string(index=False))

In [None]:
# Plot top features
fig, ax = plt.subplots(figsize=(10, 8))

top_20 = importance_df.head(20)
ax.barh(range(len(top_20)), top_20["importance"].values, color="steelblue")
ax.set_yticks(range(len(top_20)))
ax.set_yticklabels(top_20["feature"].values)
ax.invert_yaxis()
ax.set_xlabel("Feature Importance")
ax.set_title("Top 20 Most Important Features (Random Forest)")

plt.tight_layout()
plt.savefig(FIGURES_DIR / "feature_importance.png", dpi=150, bbox_inches="tight")
plt.show()

In [None]:
# Group importance by feature type
def categorize_feature(name):
    if "Acc" in name:
        if "Gravity" in name:
            return "Gravity Acc"
        elif "Jerk" in name:
            return "Body Acc Jerk"
        else:
            return "Body Acc"
    elif "Gyro" in name:
        if "Jerk" in name:
            return "Gyro Jerk"
        else:
            return "Gyro"
    elif "angle" in name:
        return "Angle"
    else:
        return "Other"

importance_df["category"] = importance_df["feature"].apply(categorize_feature)
category_importance = importance_df.groupby("category")["importance"].sum().sort_values(ascending=False)

fig, ax = plt.subplots(figsize=(8, 5))
category_importance.plot(kind="bar", ax=ax, color="steelblue")
ax.set_xlabel("Feature Category")
ax.set_ylabel("Cumulative Importance")
ax.set_title("Feature Importance by Category")
plt.xticks(rotation=45, ha="right")

plt.tight_layout()
plt.savefig(FIGURES_DIR / "feature_importance_by_category.png", dpi=150, bbox_inches="tight")
plt.show()

## 8. Summary

Key insights from feature engineering:

1. **Time-domain features** (mean, std, SMA, ZCR) effectively distinguish static vs dynamic activities
2. **Frequency-domain features** capture periodic patterns like walking stride frequency
3. **Gravity acceleration features** are most important for distinguishing activity types
4. **Body acceleration magnitude** and **angle features** are highly discriminative
5. **50% window overlap** is a good balance between data augmentation and computational cost

In [None]:
# List saved figures
print("Saved figures:")
for fig_path in sorted(FIGURES_DIR.glob("*.png")):
    print(f"  - {fig_path.name}")