# Human Activity Recognition – Assignment 1  
### 5ARE0, Academic Year 2025–2026  

**Group 13**  

**Contributors:**  
- Stan Lamerikx
- Simon Lammertink
- Philip Offermans

---

## Import Modules

In [1]:
import zipfile
import os
import pandas as pd
from pathlib import Path
import glob
import numpy as np
import matplotlib.pyplot as plt
import math
from typing import Iterable, Dict, List, Optional, Tuple, Union
from scipy.signal import butter, filtfilt
from collections import Counter
import random
from __future__ import annotations
from typing import Dict, List
import pandas as pd
import numpy as np

## Load Data

In [2]:
# Folder where your zip files are stored
zip_folder = "./data"

data_dirs = []
# Loop through all files in the folder
for file in os.listdir(zip_folder):
    if file.endswith(".zip"):
        zip_path = os.path.join(zip_folder, file)
        extract_dir = os.path.splitext(zip_path)[0]  # remove .zip
        
        # Add folder to list of folders with data
        data_dirs.append(extract_dir)

        # Create the directory if it doesn't exist
        os.makedirs(extract_dir, exist_ok=True)

        # Extract contents
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_dir)

        print(f"Extracted {zip_path} to {extract_dir}")


Extracted ./data/running_2.zip to ./data/running_2
Extracted ./data/walking_2.zip to ./data/walking_2
Extracted ./data/sittingDown+StandingUp_2.zip to ./data/sittingDown+StandingUp_2
Extracted ./data/climbingStairs_1.zip to ./data/climbingStairs_1
Extracted ./data/running_1.zip to ./data/running_1
Extracted ./data/climbingStairs_3.zip to ./data/climbingStairs_3
Extracted ./data/climbingStairs_2.zip to ./data/climbingStairs_2
Extracted ./data/sittingDown+StandingUp_1.zip to ./data/sittingDown+StandingUp_1
Extracted ./data/walking_3.zip to ./data/walking_3
Extracted ./data/sittingDown+StandingUp_3.zip to ./data/sittingDown+StandingUp_3
Extracted ./data/running_3.zip to ./data/running_3
Extracted ./data/walking_1.zip to ./data/walking_1


In [3]:
def _csv_to_df(csv_path: Path) -> pd.DataFrame:
    """
    Read CSV -> drop 'time' if present -> return as DataFrame.
    Keeps all other columns.
    """
    df = pd.read_csv(csv_path)
    if "time" in df.columns:
        df = df.drop(columns=["time"])
    return df

## Load data as object and remove beginning and ending

In [4]:
def _csv_to_df(csv_path: Path) -> pd.DataFrame:
    """
    Read CSV -> drop 'time' if present -> return as DataFrame.
    Keeps all other columns. Handle missing values.
    """
    df = pd.read_csv(csv_path)

    # Drop redundant 'time' column if present
    if "time" in df.columns:
        df = df.drop(columns=["time"])

    # Check and interpolate missing values
    if df.isna().any().any():
        print("MISSING VALUE")
        df = df.interpolate(method="linear").dropna()
    return df


def _cut(df: pd.DataFrame, t_min: float, t_max: float, reset_zero: bool = True) -> pd.DataFrame:
    """Time-cut and (optionally) reset seconds_elapsed to start at 0."""
    cut = df[(df["seconds_elapsed"] >= t_min) & (df["seconds_elapsed"] <= t_max)].copy()
    cut.sort_values("seconds_elapsed", inplace=True, kind="stable")
    cut.reset_index(drop=True, inplace=True)
    if not cut.empty and reset_zero:
        cut["seconds_elapsed"] = cut["seconds_elapsed"] - cut["seconds_elapsed"].iloc[0]
    return cut


SENSORS: list[str] = [
    "AccelerometerUncalibrated",
    "Accelerometer",
    "Gyroscope",
    "GyroscopeUncalibrated",
    "Gravity",
]

ACTIONS: list[str] = [
    "climbingStairs",
    "running",
    "sittingDown+StandingUp",
    "walking",
]

def load_data(sensors: list[str], actions: list[str], location: str = "./data/{action}*/") -> dict:
    """
    Explain what this function does
    """
    # Create empty dict to store data
    cut_data = {}

    # Go trough all actions
    for action in actions:
        # find all recordings for an action
        rec_dirs = [d for d in glob.glob(location.format(action=action))]
        # Set the cut moments (different for the sittingDown+StandingUp action)
        if action == "sittingDown+StandingUp":
            t_min, t_max = 25.0, 565.1
        else:
            t_min, t_max = 25.0, 295.1

        # 
        cut_data[action] = {}
        # Go trough all recordings
        for rec in rec_dirs:
            # Go trough all sensors
            for sensor in sensors:
                # Create location of csv
                csv_path = f"{rec}{sensor}.csv"
                df = _csv_to_df(csv_path)
                df_cut = _cut(df, t_min, t_max, reset_zero=True)
                # Initialize list if it doesn't exist
                if sensor not in cut_data[action]:
                    cut_data[action][sensor] = [df_cut]
                else:
                    cut_data[action][sensor].append(df_cut) 
    return cut_data

data_raw = load_data(sensors=SENSORS, actions=ACTIONS, location="./data/{action}*/")

## Normalize Data

In [5]:
def normalize_data(data: dict) -> dict:
    """
    Normalize all recordings per sensor across participants.
    Uses global mean/std for each axis of each sensor.
    """
    # 1. Collect all values for each sensor+axis
    stats = {}
    for action, sensors in data.items():
        for sensor, recordings in sensors.items():
            if sensor not in stats:
                stats[sensor] = {"x": [], "y": [], "z": []}
            for df in recordings:
                for axis in ["x", "y", "z"]:
                    stats[sensor][axis].extend(df[axis].to_list())

    # 2. Compute mean and std
    for sensor in stats:
        for axis in stats[sensor]:
            arr = np.array(stats[sensor][axis])
            stats[sensor][axis] = {"mean": arr.mean(), "std": arr.std()}

    # 3. Apply normalization
    norm_data = {}
    for action, sensors in data.items():
        norm_data[action] = {}
        for sensor, recordings in sensors.items():
            norm_data[action][sensor] = []
            for df in recordings:
                df_norm = df.copy()
                for axis in ["x", "y", "z"]:
                    mean = stats[sensor][axis]["mean"]
                    std = stats[sensor][axis]["std"]
                    if std > 0:
                        df_norm[axis] = (df_norm[axis] - mean) / std
                norm_data[action][sensor].append(df_norm)

    return norm_data

data_norm = normalize_data(data_raw)

## Filter / noise reduction

In [6]:
def moving_average(series, window_size: int = 5):
    return series.rolling(window=window_size, center=True, min_periods=1).mean()

def low_pass_filter(series, cutoff_hz: float, fs: float, order: int = 4):
    """
    cutoff_hz: cutoff frequency
    fs: sampling rate (Hz)
    """
    nyquist = 0.5 * fs
    normal_cutoff = cutoff_hz / nyquist
    b, a = butter(order, normal_cutoff, btype="low", analog=False)
    return filtfilt(b, a, series.to_numpy())

def filter_data(data: dict, method: str = "moving_average", fs: float = 50.0) -> dict:
    """
    Apply filtering to normalized data.
    method: "moving_average" or "low_pass"
    """
    filt_data = {}
    for action, sensors in data.items():
        filt_data[action] = {}
        for sensor, recordings in sensors.items():
            filt_data[action][sensor] = []
            for df in recordings:
                df_filt = df.copy()
                for axis in ["x", "y", "z"]:
                    if method == "moving_average":
                        df_filt[axis] = moving_average(df[axis], window_size=5)
                    elif method == "low_pass":
                        df_filt[axis] = low_pass_filter(df[axis], cutoff_hz=5.0, fs=fs)
                filt_data[action][sensor].append(df_filt)
    return filt_data

data = filter_data(data_norm)

## Visualize data

In [7]:
# === Dash visualizer with smoothing =========================================
# Expects: data[action][sensor] -> list[pd.DataFrame] with 'seconds_elapsed' + x/y/z numeric cols

import pandas as pd
import numpy as np
from typing import List
from dash import Dash, dcc, html, Input, Output, State, no_update
import plotly.graph_objs as go

# Optional: SciPy for Butterworth + Savitzky–Golay
try:
    from scipy.signal import butter, filtfilt, savgol_filter
    SCIPY_OK = True
except Exception:
    SCIPY_OK = False

# ---------- Helpers ----------
def list_actions(data: dict) -> List[str]:
    return sorted([a for a in data.keys() if isinstance(data[a], dict)])

def list_sensors_for_action(data: dict, action: str) -> List[str]:
    if action not in data or not isinstance(data[action], dict):
        return []
    sensors = []
    for s, dfs in data[action].items():
        valid = any(isinstance(df, pd.DataFrame) and not df.empty for df in (dfs or []))
        if valid:
            sensors.append(s)
    return sorted(sensors)

def list_recording_indices(data: dict, action: str, sensor: str) -> List[int]:
    if action not in data or sensor not in data[action]:
        return []
    return [i for i, df in enumerate(data[action][sensor]) if isinstance(df, pd.DataFrame) and not df.empty]

def available_axes(df: pd.DataFrame) -> List[str]:
    axes = [c for c in ["x", "y", "z"] if c in df.columns]
    if axes:
        return axes
    return [c for c in df.select_dtypes("number").columns if c != "seconds_elapsed"]

def infer_fs(df: pd.DataFrame) -> float | None:
    if "seconds_elapsed" not in df.columns:
        return None
    t = df["seconds_elapsed"].to_numpy()
    dt = np.median(np.diff(t))
    if not np.isfinite(dt) or dt <= 0:
        return None
    return 1.0 / dt

def moving_average(x: np.ndarray, window: int) -> np.ndarray:
    if window <= 1:
        return x
    # symmetric centered window via convolution; pad to keep length
    pad = window // 2
    xpad = np.pad(x, (pad, pad), mode="edge")
    kernel = np.ones(window) / window
    y = np.convolve(xpad, kernel, mode="valid")
    return y

def apply_smoothing(df: pd.DataFrame, axes: List[str], method: str, params: dict) -> pd.DataFrame:
    """
    Returns a new DataFrame with same columns; selected axes replaced or added with smoothed data.
    If params.get("replace") is False, smoothed series are added as '<axis>_sm'.
    Supported methods: none, ma, sg, lp, hp
    """
    out = df.copy()
    if method == "none" or not axes:
        return out

    replace = bool(params.get("replace", False))
    # Downsample occurs outside; we smooth current df slice
    if method == "ma":
        window = int(params.get("ma_window", 5))
        window = max(1, window | 1)  # force odd
        for a in axes:
            if a in out.columns:
                y = moving_average(out[a].to_numpy(), window)
                out[a if replace else f"{a}_sm"] = y

    elif method == "sg" and SCIPY_OK:
        window = int(params.get("sg_window", 7))
        poly = int(params.get("sg_poly", 3))
        window = max(poly + 2 | 1, window | 1)  # ensure odd & >= poly+2
        for a in axes:
            if a in out.columns and len(out[a]) >= window:
                y = savgol_filter(out[a].to_numpy(), window_length=window, polyorder=poly, mode="interp")
                out[a if replace else f"{a}_sm"] = y

    elif method in ("lp", "hp") and SCIPY_OK:
        fs = infer_fs(out)
        if not fs:
            return out
        cutoff = float(params.get("bw_cutoff", min(fs * 0.45, fs/2 - 1e-3)))
        order = int(params.get("bw_order", 4))
        nyq = 0.5 * fs
        wc = np.clip(cutoff / nyq, 1e-6, 0.999999)
        btype = "low" if method == "lp" else "high"
        b, a = butter(order, wc, btype=btype)
        for a_col in axes:
            if a_col in out.columns and len(out[a_col]) > order * 3:
                y = filtfilt(b, a, out[a_col].to_numpy(), method="gust")
                out[a_col if replace else f"{a_col}_sm"] = y
    # If SciPy missing for sg/lp/hp, we silently leave data unchanged (raw or MA still works)
    return out

# ---------- App ----------
app = Dash(__name__)
app.title = "Recording Visualizer"

_actions = list_actions(data)
_sensors = list_sensors_for_action(data, _actions[0]) if _actions else []
_recs    = list_recording_indices(data, _actions[0], _sensors[0]) if (_actions and _sensors) else []
_rec_val = _recs[0] if _recs else None

app.layout = html.Div(
    style={"maxWidth": "1180px", "margin": "0 auto", "fontFamily": "Inter, system-ui, sans-serif"},
    children=[
        html.H2("Recordings viewer", style={"marginTop": "16px"}),

        # Row 1: dataset selectors
        html.Div(
            style={"display": "grid", "gridTemplateColumns": "1fr 1fr 1fr", "gap": "12px"},
            children=[
                html.Div([
                    html.Label("Action"),
                    dcc.Dropdown(
                        id="dd-action",
                        options=[{"label": a, "value": a} for a in _actions],
                        value=_actions[0] if _actions else None,
                        clearable=False,
                    )
                ]),
                html.Div([
                    html.Label("Sensor"),
                    dcc.Dropdown(
                        id="dd-sensor",
                        options=[{"label": s, "value": s} for s in _sensors],
                        value=_sensors[0] if _sensors else None,
                        clearable=False,
                    )
                ]),
                html.Div([
                    html.Label("Recording index"),
                    dcc.Dropdown(
                        id="dd-rec",
                        options=[{"label": str(i), "value": i} for i in _recs],
                        value=_rec_val,
                        clearable=False,
                    )
                ]),
            ],
        ),

        # Row 2: plot options
        html.Div(
            style={"display": "grid", "gridTemplateColumns": "1fr 1fr 1fr", "gap": "12px", "marginTop": "12px"},
            children=[
                html.Div([
                    html.Label("Axes to plot"),
                    dcc.Checklist(
                        id="cl-axes",
                        options=[],  # filled dynamically
                        value=[],
                        inputStyle={"marginRight": "6px", "marginLeft": "12px"},
                        inline=True,
                    ),
                ]),
                html.Div([
                    html.Label("Downsample (every Nth point)"),
                    dcc.Slider(
                        id="sl-downsample",
                        min=1, max=20, step=1, value=1,
                        marks={1: "1x", 5: "5x", 10: "10x", 20: "20x"},
                        tooltip={"placement":"bottom"}
                    ),
                ]),
                html.Div([
                    html.Label("Show mode"),
                    dcc.RadioItems(
                        id="ri-showmode",
                        options=[
                            {"label": "Raw only", "value": "raw"},
                            {"label": "Smoothed only", "value": "sm"},
                            {"label": "Overlay", "value": "overlay"},
                        ],
                        value="raw",
                        inline=True,
                    ),
                ]),
            ]
        ),

        # Row 3: smoothing controls
        html.Div(
            style={"display": "grid", "gridTemplateColumns": "1.2fr 1fr 1fr 1fr 1fr", "gap": "12px", "marginTop": "12px"},
            children=[
                html.Div([
                    html.Label("Smoothing method"),
                    dcc.Dropdown(
                        id="dd-smooth",
                        options=[
                            {"label": "None", "value": "none"},
                            {"label": "Moving Average", "value": "ma"},
                            {"label": "Savitzky–Golay", "value": "sg"},
                            {"label": "Butterworth Low-pass", "value": "lp"},
                            {"label": "Butterworth High-pass", "value": "hp"},
                        ],
                        value="none",
                        clearable=False,
                    )
                ]),
                html.Div([
                    html.Label("MA / SG window"),
                    dcc.Input(id="in-ma-sg-window", type="number", value=7, min=1, step=2, style={"width":"100%"}),
                ]),
                html.Div([
                    html.Label("SG polyorder"),
                    dcc.Input(id="in-sg-poly", type="number", value=3, min=1, step=1, style={"width":"100%"}),
                ]),
                html.Div([
                    html.Label("BW cutoff (Hz)"),
                    dcc.Input(id="in-bw-cutoff", type="number", value=2.0, min=0.01, step=0.1, style={"width":"100%"}),
                ]),
                html.Div([
                    html.Label("BW order"),
                    dcc.Input(id="in-bw-order", type="number", value=4, min=1, step=1, style={"width":"100%"}),
                ]),
            ]
        ),

        html.Div(
            style={"marginTop": "6px", "fontSize": "12px", "opacity": 0.75},
            children=("Tip: Window must be odd. For SG, window ≥ polyorder+2. "
                      "For Butterworth, cutoff must be < Nyquist (fs/2).")
        ),

        dcc.Graph(id="graph", figure=go.Figure(), style={"height": "70vh", "marginTop": "8px"}),

        # Stores for preserving UI state
        dcc.Store(id="xrange-store"),
        dcc.Store(id="current-df-meta"),
    ]
)

# ---------- Callbacks ----------
@app.callback(
    Output("dd-sensor", "options"),
    Output("dd-sensor", "value"),
    Input("dd-action", "value"),
)
def on_action_change(action):
    if not action:
        return [], None
    sensors = list_sensors_for_action(data, action)
    opts = [{"label": s, "value": s} for s in sensors]
    val = sensors[0] if sensors else None
    return opts, val

@app.callback(
    Output("dd-rec", "options"),
    Output("dd-rec", "value"),
    Input("dd-action", "value"),
    Input("dd-sensor", "value"),
)
def on_sensor_change(action, sensor):
    if not action or not sensor:
        return [], None
    recs = list_recording_indices(data, action, sensor)
    opts = [{"label": str(i), "value": i} for i in recs]
    val = recs[0] if recs else None
    return opts, val

@app.callback(
    Output("cl-axes", "options"),
    Output("cl-axes", "value"),
    Output("current-df-meta", "data"),
    Input("dd-action", "value"),
    Input("dd-sensor", "value"),
    Input("dd-rec", "value"),
)
def update_axes(action, sensor, idx):
    if not action or not sensor or idx is None:
        return [], [], None
    try:
        df = data[action][sensor][idx]
        if df is None or df.empty:
            return [], [], None
        axes = available_axes(df)
        opts = [{"label": a.upper(), "value": a} for a in axes]
        return opts, axes, {"action": action, "sensor": sensor, "idx": idx}
    except Exception:
        return [], [], None

# Remember zoom/pan; keep when changing controls
@app.callback(
    Output("xrange-store", "data"),
    Input("graph", "relayoutData"),
    State("xrange-store", "data"),
    prevent_initial_call=True,
)
def remember_zoom(relayout, stored):
    if not relayout:
        return no_update
    if relayout.get("xaxis.autorange"):
        return None
    x0 = relayout.get("xaxis.range[0]") or (relayout.get("xaxis.range", [None, None])[0] if "xaxis.range" in relayout else None)
    x1 = relayout.get("xaxis.range[1]") or (relayout.get("xaxis.range", [None, None])[1] if "xaxis.range" in relayout else None)
    if x0 is None or x1 is None:
        return no_update
    try:
        return {"x0": float(x0), "x1": float(x1)}
    except Exception:
        return no_update

@app.callback(
    Output("graph", "figure"),
    Input("current-df-meta", "data"),
    Input("cl-axes", "value"),
    Input("sl-downsample", "value"),
    Input("ri-showmode", "value"),
    Input("dd-smooth", "value"),
    Input("in-ma-sg-window", "value"),
    Input("in-sg-poly", "value"),
    Input("in-bw-cutoff", "value"),
    Input("in-bw-order", "value"),
    State("xrange-store", "data"),
)
def update_graph(meta, axes_selected, ds, showmode,
                 sm_method, ma_sg_window, sg_poly, bw_cutoff, bw_order,
                 xr):
    if not meta:
        return go.Figure()

    action = meta["action"]; sensor = meta["sensor"]; idx = meta["idx"]
    df = data[action][sensor][idx]
    if df is None or df.empty or "seconds_elapsed" not in df.columns:
        return go.Figure()

    df = df.sort_values("seconds_elapsed", kind="stable").dropna()

    # Downsample first (so smoothing runs on what you plot)
    if isinstance(ds, int) and ds > 1:
        df = df.iloc[::ds, :]

    axes_all = available_axes(df)
    if not axes_selected:
        axes_selected = axes_all

    # Build smoothing params
    params = {
        "replace": (showmode == "sm"),
        "ma_window": int(ma_sg_window or 5),
        "sg_window": int(ma_sg_window or 7),
        "sg_poly": int(sg_poly or 3),
        "bw_cutoff": float(bw_cutoff or 2.0),
        "bw_order": int(bw_order or 4),
    }

    # Apply smoothing (adds *_sm columns if not replacing)
    df_sm = apply_smoothing(df, axes_selected, sm_method or "none", params)

    fig = go.Figure()

    # Decide which traces to draw
    draw_raw = showmode in ("raw", "overlay")
    draw_sm  = (showmode in ("sm", "overlay")) and (sm_method != "none")

    for a in axes_selected:
        if draw_raw and a in df_sm.columns:
            fig.add_trace(go.Scatter(
                x=df_sm["seconds_elapsed"], y=df_sm[a],
                mode="lines", name=a.upper()+" (raw)",
                line=dict(dash="dot"),
                hovertemplate="t=%{x:.3f}s<br>%{y:.5f}<extra>"+a.upper()+" raw</extra>"
            ))
        if draw_sm:
            sm_col = a if params["replace"] else f"{a}_sm"
            if sm_col in df_sm.columns:
                fig.add_trace(go.Scatter(
                    x=df_sm["seconds_elapsed"], y=df_sm[sm_col],
                    mode="lines", name=a.upper()+" (smoothed)",
                    hovertemplate="t=%{x:.3f}s<br>%{y:.5f}<extra>"+a.upper()+" sm</extra>"
                ))

    # If smoothing is 'none' but user chose "Smoothed only", fall back to raw
    if sm_method == "none" and showmode == "sm":
        for a in axes_selected:
            if a in df_sm.columns:
                fig.add_trace(go.Scatter(
                    x=df_sm["seconds_elapsed"], y=df_sm[a],
                    mode="lines", name=a.upper(),
                ))

    fig.update_layout(
        xaxis_title="Time (s)",
        yaxis_title="Value",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0),
        margin=dict(l=40, r=20, t=30, b=40),
        uirevision="keep-zoom",  # preserve zoom/pan across updates
    )
    fig.update_xaxes(rangeslider=dict(visible=True))

    # Re-apply stored range if present
    if xr and all(k in xr for k in ("x0", "x1")):
        fig.update_xaxes(range=[xr["x0"], xr["x1"]])

    return fig

# ---------- Launch (works in Jupyter) ----------
PORT = 8050
app.run(host="127.0.0.1", port=PORT, debug=False)


## Create Samples

In [8]:
def _split_into_samples(data: pd.DataFrame, window_s: float = 5.0, reset_zero: bool = True) -> List[pd.DataFrame]:
    t_end = float(data["seconds_elapsed"].iloc[-1])

    out: List[pd.DataFrame] = []
    start = 0.0
    while start + window_s <= t_end + 1e-9:
        end = start + window_s
        mask = (data["seconds_elapsed"] >= start) & (data["seconds_elapsed"] < end)
        chunk = data.loc[mask].copy()
        if not chunk.empty:
            if reset_zero:
                chunk["seconds_elapsed"] = chunk["seconds_elapsed"] - chunk["seconds_elapsed"].iloc[0]
            out.append(chunk.reset_index(drop=True))
        start = end
    return out


def build_samples(data: Dict[str, Dict[str, List[pd.DataFrame]]]) -> Dict[str, Dict[str, List[pd.DataFrame]]]:
    """
    Convert cut_data into samples:
        samples[action][sensor] -> list of 5s DataFrames
    """
    samples: Dict[str, Dict[str, List[pd.DataFrame]]] = {}
    for action, sensors in data.items():
        if action != 'sittingDown+StandingUp':
            samples[action] = {}
        else:
            samples['sittingDown'] = {}
            samples['standingUp'] = {}
        for sensor, recordings in sensors.items():
            if action != 'sittingDown+StandingUp':
                sensor_samples: List[pd.DataFrame] = []
                for rec_df in recordings:
                    split_samples = _split_into_samples(rec_df)
                    sensor_samples.extend(split_samples)
                samples[action][sensor] = sensor_samples
            else:
                sitting_down_samples: List[pd.DataFrame] = []
                standing_up_samples: List[pd.DataFrame] = []
                for rec_df in recordings:
                    split_samples = _split_into_samples(rec_df)
                    sitting_down_samples.extend(split_samples[0::2])
                    standing_up_samples.extend(split_samples[1::2])
                samples['sittingDown'][sensor] = sitting_down_samples
                samples['standingUp'][sensor] = standing_up_samples
    return samples

samples = build_samples(data)


## Find good sample window based on samples

In [9]:
def fft(df, signal_col, time_col="seconds_elapsed"):
    # Extract time and signal
    t = df[time_col].to_numpy()
    sig = df[signal_col].to_numpy()

    # Sampling frequency (Hz)
    dt = np.median(np.diff(t))   # average timestep
    fs = 1.0 / dt                # sampling rate

    # Remove DC offset
    sig = sig - np.mean(sig)

    # FFT
    n = len(sig)
    f = np.fft.rfftfreq(n, d=1/fs)   # frequency bins
    Y = np.fft.rfft(sig)             # real FFT
    mag = np.abs(Y) / n              # normalize magnitude

    return f, mag

def periods(freqs, mag, t_max=8, step=0.1):
    mask = freqs > 0
    freqs = freqs[mask]
    mag = mag[mask]

    periods = 1.0 / freqs
    bins = np.arange(0, t_max + step, step)

    # USE POWER WEIGHTS (energy), not ones
    power = (mag ** 2)

    counts, edges = np.histogram(periods, bins=bins, weights=power)

    result = pd.DataFrame({
        "period_s": edges[:-1],
        "power": counts  # rename to reflect energy
    })
    return result

combined_hist = None
for action in samples.keys():
    for sensor in samples[action].keys():
        for sample in samples[action][sensor]:
            for axis in ['x','y','z']:
                f, mag = fft(sample, axis)
                hist = periods(f, mag)
                if combined_hist is None:
                    combined_hist = hist.copy()
                else:
                    combined_hist["power"] += hist["power"].to_numpy()

total = combined_hist["power"].sum()
combined_hist["cum_power"] = combined_hist["power"].cumsum()
combined_hist["cum_frac"]  = combined_hist["cum_power"] / total

target = 0.95
cut_period = combined_hist.loc[combined_hist["cum_frac"] >= target, "period_s"].iloc[0]
print(f"{int(target*100)}% at or below period ≈ {cut_period:.2f} s")

cycles = 2
window_seconds = cycles * cut_period
print(f"Suggested window ≈ {window_seconds:.2f} s (≈ {cycles} cycles)")

95% at or below period ≈ 2.50 s
Suggested window ≈ 5.00 s (≈ 2 cycles)


## Find good sample window based on full data

In [10]:
combined_hist = None
for action in data.keys():
    for sensor in data[action].keys():
        for sample in data[action][sensor]:
                for axis in ['x','y','z']:
                    f, mag = fft(sample, axis)
                    hist = periods(f, mag)
                    if combined_hist is None:
                        combined_hist = hist.copy()
                    else:
                        combined_hist["power"] += hist["power"].to_numpy()

total = combined_hist["power"].sum()
combined_hist["cum_power"] = combined_hist["power"].cumsum()
combined_hist["cum_frac"]  = combined_hist["cum_power"] / total

target = 0.95
cut_period = combined_hist.loc[combined_hist["cum_frac"] >= target, "period_s"].iloc[0]
print(f"{int(target*100)}% at or below period ≈ {cut_period:.2f} s")

cycles = 2
window_seconds = cycles * cut_period
print(f"Suggested window ≈ {window_seconds:.2f} s (≈ {cycles} cycles)")

95% at or below period ≈ 2.20 s
Suggested window ≈ 4.40 s (≈ 2 cycles)


## Split Data

In [None]:
def split_samples(
    samples: Dict[str, Dict[str, List[pd.DataFrame]]],
    train_ratio: float = 0.70,
    val_ratio: float = 0.15,
    test_ratio: float = 0.15,
    seed: int = 42,
    shuffle: bool = True,
) -> Tuple[Dict[str, Dict[str, List[pd.DataFrame]]],
           Dict[str, Dict[str, List[pd.DataFrame]]],
           Dict[str, Dict[str, List[pd.DataFrame]]]]:
    
    def _init_empty_object(src):
        out: Dict[str, Dict[str, List[pd.DataFrame]]] = {}
        for action, sensors in src.items():
            out[action] = {}
            for sensor in sensors.keys():
                out[action][sensor] = []
        return out
    
    train = _init_empty_object(samples)
    val   = _init_empty_object(samples)
    test  = _init_empty_object(samples)
    
    random_number = random.Random(seed)

    for action in samples.keys():
        for sensor in samples[action].keys():
            sample_list = samples[action][sensor] 
            n = len(sample_list)

            sample_nums = list(range(n))
            if shuffle:
                random_number.shuffle(sample_nums)

            # Compute split sizes (ensure they sum to n)
            n_train = int(n * train_ratio)
            n_val   = int(n * val_ratio)

            sample_nums_train = sample_nums[:n_train]
            sample_nums_val   = sample_nums[n_train:n_train + n_val]
            sample_nums_test  = sample_nums[n_train + n_val:]


            train[action][sensor] = [sample_list[i] for i in sample_nums_train]
            val[action][sensor]   = [sample_list[i] for i in sample_nums_val]
            test[action][sensor]  = [sample_list[i] for i in sample_nums_test]

    return train, val, test

train_samples, val_samples, test_samples = split_samples(samples, 0.70, 0.15, 0.15, seed=42)

## Visualize Samples

In [12]:
# app_samples_viewer.py
import os
from typing import Dict, List
import pandas as pd
from dash import Dash, dcc, html, Input, Output, State, ctx, no_update
import plotly.graph_objs as go

# ---------- IMPORTANT ----------
# Expect these to be available in the global scope if you've split already:
#   train_samples, val_samples, test_samples
# If not present, the app will fall back to `samples`.
try:
    samples  # type: ignore # noqa: F821
except NameError:
    # Dummy fallback to avoid NameError if you run this file standalone.
    # Replace this with your real data before running.
    samples = {"walking": {"Accelerometer": []}}

# Helper: return the dict for a chosen dataset name
def get_dataset_dict(dataset_name: str) -> Dict[str, Dict[str, List[pd.DataFrame]]]:
    # Prefer explicitly split sets if available; otherwise fallback to `samples`
    if dataset_name == "train" and "train_samples" in globals():
        return globals()["train_samples"]
    if dataset_name == "val" and "val_samples" in globals():
        return globals()["val_samples"]
    if dataset_name == "test" and "test_samples" in globals():
        return globals()["test_samples"]
    # Fallback (treat whole thing as "all")
    return samples

def get_actions(dataset_name: str) -> List[str]:
    ds = get_dataset_dict(dataset_name)
    return list(ds.keys())

def get_sensors(dataset_name: str, action: str) -> List[str]:
    ds = get_dataset_dict(dataset_name)
    return list(ds.get(action, {}).keys())

def get_sample_count(dataset_name: str, action: str, sensor: str) -> int:
    ds = get_dataset_dict(dataset_name)
    return len(ds.get(action, {}).get(sensor, []))

def make_sample_options(dataset_name: str, action: str, sensor: str, max_options: int = 10000):
    n = get_sample_count(dataset_name, action, sensor)
    return [{"label": f"Sample {i}", "value": i} for i in range(min(n, max_options))]

app = Dash(__name__)
server = app.server

app.layout = html.Div([
    html.H2("Sample Browser"),
    html.Div([
        html.Div([
            html.Label("Dataset"),
            dcc.Dropdown(
                id="ddl-dataset",
                options=[
                    {"label": "Train", "value": "train"},
                    {"label": "Validation", "value": "val"},
                    {"label": "Test", "value": "test"},
                    {"label": "All (fallback to samples)", "value": "all"},
                ],
                value="train" if "train_samples" in globals() else ("all"),
                clearable=False,
            ),
        ], style={"width": "18%"}),

        html.Div([
            html.Label("Action"),
            dcc.Dropdown(
                id="ddl-action",
                options=[],
                value=None,
                clearable=False,
            ),
        ], style={"width": "22%"}),

        html.Div([
            html.Label("Sensor"),
            dcc.Dropdown(
                id="ddl-sensor",
                options=[],
                value=None,
                clearable=False,
            ),
        ], style={"width": "22%"}),

        html.Div([
            html.Label("Samples"),
            dcc.Dropdown(
                id="ddl-samples",
                options=[],
                value=[],
                multi=True,
                placeholder="Select one or more samples",
            ),
        ], style={"width": "28%"}),

        html.Div([
            html.Label("View Mode"),
            dcc.RadioItems(
                id="rad-mode",
                options=[
                    {"label": "Overlay", "value": "overlay"},
                    {"label": "Separate", "value": "separate"},
                ],
                value="overlay",
                inline=True,
            ),
        ], style={"width": "10%"}),
    ], style={"display": "flex", "gap": "1rem", "flexWrap": "wrap", "alignItems": "end"}),

    html.Div([
        dcc.Checklist(
            id="chk-axes",
            options=[{"label": "x", "value": "x"},
                     {"label": "y", "value": "y"},
                     {"label": "z", "value": "z"}],
            value=["x", "y", "z"],
            inline=True
        ),
        html.Span("  (toggle axes)", style={"marginLeft": "0.5rem", "color": "#666"})
    ], style={"margin": "0.5rem 0 1rem 0"}),

    dcc.Loading(
        dcc.Graph(id="graph", figure=go.Figure()),
        type="default"
    ),
], style={"maxWidth": "1200px", "margin": "1.5rem auto", "fontFamily": "sans-serif"})

# --- Callbacks ---

# Populate actions based on dataset, and persist selection if valid
@app.callback(
    Output("ddl-action", "options"),
    Output("ddl-action", "value"),
    Input("ddl-dataset", "value"),
    State("ddl-action", "value"),
)
def on_dataset_change(dataset_name, current_action):
    actions = get_actions(dataset_name or "all")
    opts = [{"label": a, "value": a} for a in actions]
    if not actions:
        return [], None
    value = current_action if current_action in actions else actions[0]
    return opts, value

# Keep sensor selection when action or dataset changes (if still valid)
@app.callback(
    Output("ddl-sensor", "options"),
    Output("ddl-sensor", "value"),
    Input("ddl-dataset", "value"),
    Input("ddl-action", "value"),
    State("ddl-sensor", "value"),
)
def on_action_change(dataset_name, action, current_sensor):
    if not dataset_name or not action:
        return [], None
    sensors = get_sensors(dataset_name, action)
    opts = [{"label": s, "value": s} for s in sensors]
    if not sensors:
        return [], None
    value = current_sensor if current_sensor in sensors else sensors[0]
    return opts, value

# Keep sample selection when sensor/action/dataset changes (if still valid)
@app.callback(
    Output("ddl-samples", "options"),
    Output("ddl-samples", "value"),
    Input("ddl-dataset", "value"),
    Input("ddl-action", "value"),
    Input("ddl-sensor", "value"),
    State("ddl-samples", "value"),
)
def on_sensor_change(dataset_name, action, sensor, current_samples):
    if not dataset_name or not action or not sensor:
        return [], []
    opts = make_sample_options(dataset_name, action, sensor)
    valid = {o["value"] for o in opts}
    kept = [v for v in (current_samples or []) if v in valid]
    # auto-pick first if nothing selected
    if not kept and opts:
        kept = [opts[0]["value"]]
    return opts, kept

@app.callback(
    Output("graph", "figure"),
    Input("ddl-dataset", "value"),
    Input("ddl-action", "value"),
    Input("ddl-sensor", "value"),
    Input("ddl-samples", "value"),
    Input("rad-mode", "value"),
    Input("chk-axes", "value"),
)
def update_plot(dataset_name, action, sensor, selected_samples, mode, axes_on):
    fig = go.Figure()

    if not dataset_name or not action or not sensor or not selected_samples:
        fig.update_layout(
            title="Select dataset, action, sensor, and samples",
            xaxis_title="Time (s)",
            yaxis_title="Value",
            template="plotly_white",
            height=550,
        )
        return fig

    ds = get_dataset_dict(dataset_name if dataset_name != "all" else "all")
    rec_samples = ds.get(action, {}).get(sensor, [])

    def traces_for(df, prefix):
        traces = []
        for axis in ["x", "y", "z"]:
            if axis in axes_on and axis in df.columns:
                traces.append(go.Scatter(
                    x=df["seconds_elapsed"],
                    y=df[axis],
                    mode="lines",
                    name=f"{prefix}{axis}",
                    hovertemplate="t=%{x:.3f}s<br>"+f"{axis}=%{{y:.3f}}<extra></extra>"
                ))
        return traces

    if mode == "overlay":
        for idx in selected_samples:
            if 0 <= idx < len(rec_samples):
                df = rec_samples[idx]
                fig.add_traces(traces_for(df, prefix=f"#{idx} "))
        fig.update_layout(
            title=f"[{dataset_name}] {action} — {sensor} — overlay {len(selected_samples)} sample(s)",
            xaxis_title="Time (s)",
            yaxis_title="Value",
            template="plotly_white",
            height=650,
            legend={"orientation": "h", "yanchor": "bottom", "y": 1.02, "x": 0.01},
        )
    else:
        rows = len(selected_samples)
        domains = []
        gap = 0.04
        panel = (1.0 - (rows - 1) * gap) / rows
        for r in range(rows):
            top = 1.0 - r * (panel + gap)
            bottom = top - panel
            domains.append((bottom, top))

        for r, idx in enumerate(selected_samples):
            if 0 <= idx < len(rec_samples):
                df = rec_samples[idx]
                yaxis_name = "yaxis" if r == 0 else f"yaxis{r+1}"
                for tr in traces_for(df, prefix=f"#{idx} "):
                    tr.update(yaxis=f"y{'' if r == 0 else r+1}")
                    fig.add_trace(tr)
                fig.update_layout(**{
                    yaxis_name: dict(domain=list(domains[r]), title=f"Sample #{idx}"),
                })

        fig.update_layout(
            title=f"[{dataset_name}] {action} — {sensor} — {rows} separate panel(s)",
            xaxis=dict(title="Time (s)"),
            template="plotly_white",
            height=max(350, 280 * rows),
            legend={"orientation": "h", "yanchor": "bottom", "y": 1.02, "x": 0.01},
            margin=dict(t=60, r=30, l=60, b=40)
        )

    return fig

if __name__ == "__main__":
    # For Dash >= 2.0 use app.run, not app.run_server
    host = os.environ.get("HOST", "127.0.0.1")
    port = int(os.environ.get("PORT", "8050"))
    app.run(host=host, port=port, debug=False)
