# Unseen Data Evaluation
This notebook loads the trained HMM and preprocessing pipeline, extracts features from a cleaned dataset CSV (preferring `testdata/test_data.csv` if present, otherwise `cleaned_data.csv`), runs predictions, and reports per-class sensitivity, specificity, and overall accuracy.

In [60]:
from pathlib import Path
import sys
import pandas as pd
import numpy as np
from numpy.fft import rfft, rfftfreq

# Load model artifacts saved from training
try:
    import joblib
except ImportError:
    import subprocess
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'joblib', '-q'])
    import joblib

ARTIFACTS = Path('artifacts/model_bundle.joblib')
if not ARTIFACTS.exists():
    raise FileNotFoundError("artifacts/model_bundle.joblib not found. Open markov_model.ipynb and run the cell that saves artifacts.")

bundle = joblib.load(ARTIFACTS)
scaler = bundle['scaler']
pca = bundle.get('pca', None)
hmm = bundle['hmm']
STATES = bundle['STATES']
STATE_TO_IDX = bundle['STATE_TO_IDX']
IDX_TO_STATE = bundle['IDX_TO_STATE']
FEATURE_COLS_TRAIN = bundle['FEATURE_COLS']
window_seconds_default = bundle.get('window_seconds', 2.0)
overlap_default = bundle.get('overlap', 0.5)

print("Loaded artifacts:")
print({k: type(v).__name__ if hasattr(v, '__class__') else type(v) for k, v in bundle.items() if k in ['scaler','pca','hmm']})
print("States:", STATES)

Loaded artifacts:
{'scaler': 'StandardScaler', 'pca': 'PCA', 'hmm': 'GaussianHMM'}
States: ['being_still', 'jumping', 'standing', 'walking']


In [61]:
# Config and label normalization

activity_map = {
    'being still': 'being_still',
    'still': 'being_still',
    'jump': 'jumping',
    'jumping': 'jumping',
    'stand': 'standing',
    'standing': 'standing',
    'walk': 'walking',
    'walking': 'walking',
}

def normalize_activity(name: str) -> str:
    if pd.isna(name):
        return None
    s = str(name).strip().lower()
    return activity_map.get(s, s)

In [62]:
# Feature extraction utilities (aligned with training)
def compute_sampling_rate_hz(time_series: pd.Series) -> float:
    dt = pd.Series(time_series).diff().dropna().median()
    if dt is None or dt <= 0:
        raise ValueError("Non-positive or undefined sampling interval detected in test data.")
    return 1.0 / dt

ACC_COLUMNS = ["acc_x", "acc_y", "acc_z"]
GYRO_COLUMNS = ["gyro_x", "gyro_y", "gyro_z"]
SENSOR_COLUMNS = ACC_COLUMNS + GYRO_COLUMNS

def generate_windows(df: pd.DataFrame, sampling_rate_hz: float, window_seconds: float, overlap: float):
    window_samples = int(round(window_seconds * sampling_rate_hz))
    if window_samples < 2:
        raise ValueError("Increase window_seconds; not enough samples per window.")
    step_samples = max(1, int(round(window_samples * (1.0 - overlap))))
    for activity, group in df.groupby("activity", sort=False):
        group = group.reset_index(drop=True)
        if len(group) < 2:
            continue
        if len(group) <= window_samples:
            yield activity, group.copy()
            continue
        for start in range(0, len(group) - window_samples + 1, step_samples):
            end = start + window_samples
            segment = group.iloc[start:end].copy()
            yield activity, segment

def compute_time_features(window: pd.DataFrame) -> dict:
    feats = {}
    for col in SENSOR_COLUMNS:
        series = window[col].to_numpy()
        feats[f"{col}_mean"] = float(series.mean())
        feats[f"{col}_std"] = float(series.std(ddof=1))
        feats[f"{col}_var"] = float(series.var(ddof=1))
    sma = window[ACC_COLUMNS].abs().to_numpy().sum() / len(window)
    feats["acc_sma"] = float(sma)
    corr_pairs = [("acc_x", "acc_y"), ("acc_x", "acc_z"), ("acc_y", "acc_z")]
    for left, right in corr_pairs:
        corr_value = window[left].corr(window[right])
        feats[f"{left}_{right}_corr"] = float(corr_value) if not np.isnan(corr_value) else 0.0
    feats["acc_magnitude_mean"] = float(np.linalg.norm(window[ACC_COLUMNS], axis=1).mean())
    feats["gyro_magnitude_mean"] = float(np.linalg.norm(window[GYRO_COLUMNS], axis=1).mean())
    return feats

def compute_frequency_features(window: pd.DataFrame, sampling_rate_hz: float) -> dict:
    feats = {}
    n = len(window)
    if n < 2:
        raise ValueError("Each window must contain at least two samples for FFT features.")
    sample_spacing = 1.0 / sampling_rate_hz
    freqs = rfftfreq(n, d=sample_spacing)
    for col in SENSOR_COLUMNS:
        signal = window[col].to_numpy()
        demeaned = signal - signal.mean()
        spectrum = rfft(demeaned)
        magnitudes = np.abs(spectrum)
        if len(magnitudes) > 1:
            dominant_index = int(np.argmax(magnitudes[1:]) + 1)
            dominant_frequency = float(freqs[dominant_index])
        else:
            dominant_frequency = 0.0
        spectral_energy = float(np.sum(magnitudes ** 2) / n)
        feats[f"{col}_dom_freq"] = dominant_frequency
        feats[f"{col}_spectral_energy"] = spectral_energy
    return feats

def extract_features(df: pd.DataFrame, window_seconds: float, overlap: float) -> pd.DataFrame:
    sampling_rate_hz = compute_sampling_rate_hz(df['time_acc'])
    rows = []
    for activity, window in generate_windows(df, sampling_rate_hz, window_seconds, overlap):
        rec = {
            'activity': activity,
            'window_start_time': float(window['time_acc'].iloc[0]),
            'window_end_time': float(window['time_acc'].iloc[-1]),
            'samples_in_window': len(window),
            'window_duration_s': float(len(window) / sampling_rate_hz),
        }
        rec.update(compute_time_features(window))
        rec.update(compute_frequency_features(window, sampling_rate_hz))
        rows.append(rec)
    return pd.DataFrame(rows)

In [63]:
# Load cleaned dataset (prefer testdata/test_data.csv, else cleaned_data.csv)
from pathlib import Path

candidates = [
    Path('testdata') / 'test_data.csv',
    Path('cleaned_data.csv'),
]

data_path = next((p for p in candidates if p.exists()), None)
if data_path is None:
    print("No cleaned CSV found. Expected testdata/test_data.csv or cleaned_data.csv. Setting empty test_df.")
    test_df = pd.DataFrame()
else:
    print(f"Using test data from: {data_path}")
    df = pd.read_csv(data_path)
    # Expected columns: time_acc, acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z, activity
    expected_cols = {'time_acc','acc_x','acc_y','acc_z','gyro_x','gyro_y','gyro_z','activity'}
    missing = expected_cols - set(df.columns)
    if missing:
        print(f"Warning: Missing expected columns in {data_path.name}: {sorted(missing)}. Test set may be unusable.")
    # Normalize activity labels and filter to STATES from the trained model
    if 'activity' in df.columns:
        df['activity'] = df['activity'].apply(normalize_activity)
        df = df[df['activity'].isin(STATES)].copy()
    # Ensure numeric types for sensor/time columns
    for col in ['time_acc','acc_x','acc_y','acc_z','gyro_x','gyro_y','gyro_z']:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    df = df.dropna(subset=['time_acc'])
    # Sort by time and reset index
    test_df = df.sort_values('time_acc').reset_index(drop=True)
    # Quick summary
    if not test_df.empty and 'activity' in test_df.columns:
        print("Activities present:")
        print(test_df['activity'].value_counts())

Using test data from: testdata\test_data.csv
Activities present:
activity
being_still    1116
jumping        1102
standing        745
walking         669
Name: count, dtype: int64


In [64]:
# Extract features on test set
if test_df.empty:
    print("Test dataframe is empty after loading; nothing to evaluate.")
    features_test = pd.DataFrame()
else:
    # We will use default window params from training bundle
    features_test = extract_features(test_df.sort_values('time_acc').reset_index(drop=True),
                                     window_seconds=window_seconds_default,
                                     overlap=overlap_default)
    print(f"Extracted {len(features_test)} windows from test data.")
    print(features_test['activity'].value_counts())
features_test.head()

Extracted 27 windows from test data.
activity
being_still    9
jumping        9
standing       5
walking        4
Name: count, dtype: int64


Unnamed: 0,activity,window_start_time,window_end_time,samples_in_window,window_duration_s,acc_x_mean,acc_x_std,acc_x_var,acc_y_mean,acc_y_std,...,acc_y_dom_freq,acc_y_spectral_energy,acc_z_dom_freq,acc_z_spectral_energy,gyro_x_dom_freq,gyro_x_spectral_energy,gyro_y_dom_freq,gyro_y_spectral_energy,gyro_z_dom_freq,gyro_z_spectral_energy
0,walking,0.073426,3.534859,346,0.99872,0.573608,1.649239,2.719989,0.639106,2.531528,...,170.217842,1106.690555,7.00897,1180.026559,170.217842,96.783234,173.221687,157.932726,173.221687,496.524599
1,walking,0.938666,4.400097,346,0.99872,0.479255,1.86496,3.478077,0.456438,2.531758,...,170.217842,1119.774272,7.00897,1337.681334,170.217842,102.108669,173.221687,161.441119,173.221687,478.540163
2,walking,1.803906,5.265337,346,0.99872,0.293886,1.910185,3.648808,0.436314,2.545961,...,170.217842,1133.375428,7.00897,1212.870721,170.217842,94.899141,173.221687,163.331555,173.221687,419.02482
3,walking,2.669148,6.130578,346,0.99872,0.020809,1.93842,3.757474,0.466903,2.479065,...,170.217842,1084.71823,6.007689,1236.699251,169.216561,106.742887,173.221687,147.868693,173.221687,363.567621
4,standing,0.084621,3.553686,346,0.99872,0.049771,0.244237,0.059652,0.003244,0.165846,...,161.206309,4.752761,160.205028,40.667051,160.205028,1.80003,160.205028,2.065773,2.002563,1.925089


In [65]:
# Prepare X_test and y_test using the training feature columns
from sklearn.metrics import confusion_matrix
if features_test.empty:
    print("No features to evaluate.")
    y_true_idx = np.array([], dtype=int)
    y_pred_idx = np.array([], dtype=int)
else:
    # Align feature columns with training set (drop extras, fill missing with zeros)
    feat_cols = [c for c in FEATURE_COLS_TRAIN if c in features_test.columns]
    missing_cols = [c for c in FEATURE_COLS_TRAIN if c not in features_test.columns]
    if missing_cols:
        print("Warning: missing feature columns in test set; filling zeros:", missing_cols)
        for c in missing_cols:
            features_test[c] = 0.0
        feat_cols = FEATURE_COLS_TRAIN
    X_test_raw = features_test[feat_cols].values
    y_true_idx = features_test['activity'].map(STATE_TO_IDX).values
    # Apply scaler and PCA (if present)
    X_test_scaled = scaler.transform(X_test_raw)
    if pca is not None:
        X_test_scaled = pca.transform(X_test_scaled)
    # Predict state indices
    y_pred_idx = hmm.predict(X_test_scaled)
    print("Predicted first 20:", [IDX_TO_STATE[i] for i in y_pred_idx[:20]])

# Compute metrics
def compute_metrics(y_true: np.ndarray, y_pred: np.ndarray, labels: list):
    if len(y_true) == 0:
        # Always return a tuple (DataFrame, overall_accuracy)
        empty_df = pd.DataFrame(columns=['State','NumSamples','Sensitivity','Specificity'])
        return empty_df, np.nan
    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(labels))))
    # cm[i,i] = TP for class i
    metrics = []
    total = cm.sum()
    for i, state in enumerate(labels):
        TP = cm[i,i]
        FN = cm[i,:].sum() - TP
        FP = cm[:,i].sum() - TP
        TN = total - TP - FN - FP
        sens = TP / (TP + FN) if (TP + FN) > 0 else np.nan
        spec = TN / (TN + FP) if (TN + FP) > 0 else np.nan
        support = TP + FN
        metrics.append({'State': state, 'NumSamples': int(support), 'Sensitivity': sens, 'Specificity': spec})
    acc = (y_true == y_pred).mean() if len(y_true) else np.nan
    return pd.DataFrame(metrics), acc

metrics_df, overall_acc = compute_metrics(y_true_idx, y_pred_idx, STATES)
print("\nPer-class metrics:")
display(metrics_df)
print(f"\nOverall Accuracy: {overall_acc:.3f}" if not np.isnan(overall_acc) else "Overall Accuracy: N/A")

Predicted first 20: ['walking', 'walking', 'walking', 'walking', 'standing', 'being_still', 'standing', 'being_still', 'standing', 'being_still', 'being_still', 'being_still', 'being_still', 'standing', 'being_still', 'standing', 'being_still', 'standing', 'jumping', 'jumping']

Per-class metrics:


Unnamed: 0,State,NumSamples,Sensitivity,Specificity
0,being_still,9,0.666667,0.888889
1,jumping,9,0.333333,1.0
2,standing,5,0.6,0.863636
3,walking,4,1.0,0.73913



Overall Accuracy: 0.593


In [66]:
# Save metrics table for the report and a formatted version matching the assignment table
out_dir = Path('artifacts')
out_dir.mkdir(exist_ok=True)
metrics_path = out_dir / 'unseen_eval_metrics.csv'
metrics_df.to_csv(metrics_path, index=False)
print(f"Saved per-class metrics to {metrics_path}")

# Create assignment-style table
assign_table = metrics_df.copy()
assign_table.rename(columns={'State':'State (Activity)','NumSamples':'Number of Samples'}, inplace=True)
assign_table['Sensitivity'] = assign_table['Sensitivity'].round(3)
assign_table['Specificity'] = assign_table['Specificity'].round(3)
assign_table['Overall Accuracy'] = overall_acc if 'overall_acc' in locals() and not np.isnan(overall_acc) else np.nan
assign_table['Overall Accuracy'] = assign_table['Overall Accuracy'].round(3) if not assign_table['Overall Accuracy'].isna().all() else assign_table['Overall Accuracy']
display(assign_table)

Saved per-class metrics to artifacts\unseen_eval_metrics.csv


Unnamed: 0,State (Activity),Number of Samples,Sensitivity,Specificity,Overall Accuracy
0,being_still,9,0.667,0.889,0.593
1,jumping,9,0.333,1.0,0.593
2,standing,5,0.6,0.864,0.593
3,walking,4,1.0,0.739,0.593
