# Imports

In [None]:
import os
import numpy as np
import pandas as pd
import pickle

import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import StratifiedKFold

from sklearn.model_selection import cross_val_score

from imblearn.ensemble import BalancedRandomForestClassifier

import warnings

from scipy.signal import resample
import scipy.stats as stats
import scipy.signal as signal

# Functions
## map_labels

In [None]:
def map_labels(data, label_dict, label_mapping):
    """
    Maps annotations in a dataset to their corresponding labels using a given mapping.

    Args:
        data (pd.DataFrame): A DataFrame containing an 'annotation' column with values to be mapped.
        label_dict (dict): A dictionary containing mappings of annotation labels.
                                Keys are label mapping names, and values are Pandas Series or DataFrames
                                where the index represents annotation keys and values represent labels.
        label_mapping (str): The key in `anno_label_dict` corresponding to the desired label mapping.

    Returns:
        pd.DataFrame: The input DataFrame with an added or updated 'label' column containing the mapped labels.
    """
    data['label'] = (label_dict[label_mapping].reindex(data['annotation']).to_numpy())
    return data

## normalize

In [None]:
def normalize(data, feature_cols):
    """
    Normalizes the specified feature columns of a DataFrame using Min-Max scaling.

    Args:
        data (pd.DataFrame): A DataFrame containing the features to be normalized.
        feature_cols (list of str): List of column names in `data` to normalize.
        
    Returns:
        pd.DataFrame: A new DataFrame with the specified columns normalized to the range [0, 1].
        The original `data` DataFrame is modified in place.
        
    Notes:
        - Min-Max normalization scales each feature column to the range [0, 1] based on 
          the minimum and maximum values of the column.
        - This transformation is often used to prepare data for machine learning models 
          that are sensitive to feature magnitudes.
    """
    scaler = MinMaxScaler()
    data[feature_cols] = scaler.fit_transform(data[feature_cols])
    return data

## extract_windows

In [None]:
def extract_windows(data, winsize=90, input_frequency=100, output_frequency=64):
    """
    Extracts sliding windows from time series data and labels for classification tasks.

    Args:
        data (pd.DataFrame): Time series DataFrame with 'x', 'y', 'z' (accelerometer data) and 'label'.
                             Index should be a datetime-like index.
        winsize (int): Window size in seconds.
        input_frequency (int): Sampling frequency of input data in Hz.
        output_frequency (int): Desired output frequency in Hz (must be a divisor of input_frequency).

    Returns:
        X (np.ndarray): Shape (n_samples, output_samples, 3), accelerometer windows.
        Y (np.ndarray): Shape (n_samples,), most frequent label per window.
    """
    # Calculate window size in samples and target output samples
    window_samples = winsize * input_frequency
    output_samples = winsize * output_frequency  # Expected downsampled length

    X, Y = [], []

    # Sliding window extraction
    for start in range(0, len(data) - window_samples + 1, window_samples):
        window = data.iloc[start:start + window_samples]

        # Skip if missing values exist
        if window.isna().any().any() or len(window) != window_samples:
            continue

        # Extract and resample accelerometer data
        x = window[['x', 'y', 'z']].to_numpy()
        x = resample(x, output_samples)  # Resample to match output frequency

        # Extract the most frequent label (mode)
        y = window['label'].mode().iloc[0]

        X.append(x)
        Y.append(y)

    X = np.stack(X) if X else np.empty((0, output_samples, 3))
    Y = np.array(Y) if Y else np.empty((0,))

    return X, Y

## extract_features

In [None]:
def extract_features(xyz, sample_rate=64):
    """Extract commonly used HAR time-series features. xyz is a window of shape (N,3)"""

    feats = {}

    x, y, z = xyz.T

    feats['xmin'], feats['xq25'], feats['xmed'], feats['xq75'], feats['xmax'] = np.quantile(x, (0, .25, .5, .75, 1))
    feats['ymin'], feats['yq25'], feats['ymed'], feats['yq75'], feats['ymax'] = np.quantile(y, (0, .25, .5, .75, 1))
    feats['zmin'], feats['zq25'], feats['zmed'], feats['zq75'], feats['zmax'] = np.quantile(z, (0, .25, .5, .75, 1))

    with np.errstate(divide='ignore', invalid='ignore'):  # ignore div by 0 warnings
        # xy, xy, zx correlation
        feats['xycorr'] = np.nan_to_num(np.corrcoef(x, y)[0, 1])
        feats['yzcorr'] = np.nan_to_num(np.corrcoef(y, z)[0, 1])
        feats['zxcorr'] = np.nan_to_num(np.corrcoef(z, x)[0, 1])

    v = np.linalg.norm(xyz, axis=1)

    feats['min'], feats['q25'], feats['med'], feats['q75'], feats['max'] = np.quantile(v, (0, .25, .5, .75, 1))

    with np.errstate(divide='ignore', invalid='ignore'):  # ignore div by 0 warnings
        # 1s autocorrelation
        feats['corr1s'] = np.nan_to_num(np.corrcoef(v[:-sample_rate], v[sample_rate:]))[0, 1]

    # Angular features
    feats.update(angular_features(xyz, sample_rate))

    # Spectral features
    feats.update(spectral_features(v, sample_rate))

    # Peak features
    feats.update(peak_features(v, sample_rate))

    return feats


def spectral_features(v, sample_rate):
    """ Spectral entropy, 1st & 2nd dominant frequencies """

    feats = {}

    # Spectrum using Welch's method with 3s segment length
    # First run without detrending to get the true spectrum
    freqs, powers = signal.welch(v, fs=sample_rate,
                                 nperseg=3 * sample_rate,
                                 noverlap=2 * sample_rate,
                                 detrend=False,
                                 average='median')

    with np.errstate(divide='ignore', invalid='ignore'):  # ignore div by 0 warnings
        feats['pentropy'] = np.nan_to_num(stats.entropy(powers + 1e-16))

    # Spectrum using Welch's method with 3s segment length
    # Now do detrend to focus on the relevant freqs
    freqs, powers = signal.welch(v, fs=sample_rate,
                                 nperseg=3 * sample_rate,
                                 noverlap=2 * sample_rate,
                                 detrend='constant',
                                 average='median')

    peaks, _ = signal.find_peaks(powers)
    peak_powers = powers[peaks]
    peak_freqs = freqs[peaks]
    peak_ranks = np.argsort(peak_powers)[::-1]
    if len(peaks) >= 2:
        feats['f1'] = peak_freqs[peak_ranks[0]]
        feats['f2'] = peak_freqs[peak_ranks[1]]
        feats['p1'] = peak_powers[peak_ranks[0]]
        feats['p2'] = peak_powers[peak_ranks[1]]
    elif len(peaks) == 1:
        feats['f1'] = feats['f2'] = peak_freqs[peak_ranks[0]]
        feats['p1'] = feats['p2'] = peak_powers[peak_ranks[0]]
    else:
        feats['f1'] = feats['f2'] = 0
        feats['p1'] = feats['p2'] = 0

    return feats


def peak_features(v, sample_rate):
    """ Features of the signal peaks. A proxy to step counts. """

    feats = {}
    u = butterfilt(v, (.6, 5), fs=sample_rate)
    peaks, peak_props = signal.find_peaks(u, distance=0.2 * sample_rate, prominence=0.25)
    feats['numPeaks'] = len(peaks)
    if len(peak_props['prominences']) > 0:
        feats['peakPromin'] = np.median(peak_props['prominences'])
    else:
        feats['peakPromin'] = 0

    return feats


def angular_features(xyz, sample_rate):
    """ Roll, pitch, yaw.
    Hip and Wrist Accelerometer Algorithms for Free-Living Behavior
    Classification, Ellis et al.
    """

    feats = {}

    # Raw angles
    x, y, z = xyz.T

    roll = np.arctan2(y, z)
    pitch = np.arctan2(x, z)
    yaw = np.arctan2(y, x)

    feats['avgroll'] = np.mean(roll)
    feats['avgpitch'] = np.mean(pitch)
    feats['avgyaw'] = np.mean(yaw)
    feats['sdroll'] = np.std(roll)
    feats['sdpitch'] = np.std(pitch)
    feats['sdyaw'] = np.std(yaw)

    # Gravity angles
    xyz = butterfilt(xyz, 0.5, fs=sample_rate)

    x, y, z = xyz.T

    roll = np.arctan2(y, z)
    pitch = np.arctan2(x, z)
    yaw = np.arctan2(y, x)

    feats['rollg'] = np.mean(roll)
    feats['pitchg'] = np.mean(pitch)
    feats['yawg'] = np.mean(yaw)

    return feats


def butterfilt(x, cutoffs, fs, order=10, axis=0):
    nyq = 0.5 * fs
    if isinstance(cutoffs, tuple):
        hicut, lowcut = cutoffs
        if hicut > 0:
            btype = 'bandpass'
            Wn = (hicut / nyq, lowcut / nyq)
        else:
            btype = 'low'
            Wn = lowcut / nyq
    else:
        btype = 'low'
        Wn = cutoffs / nyq
    sos = signal.butter(order, Wn, btype=btype, analog=False, output='sos')
    y = signal.sosfiltfilt(sos, x, axis=axis)
    return y


def get_feature_names():
    """ Hacky way to get the list of feature names """

    feats = extract_features(np.zeros((1000, 3)), 100)
    return list(feats.keys())

# Print input files

In [None]:
# Dictionary of labels
label_dict = pd.read_csv('/kaggle/input/capture-24-human-activity-recognition/capture24/annotation-label-dictionary.csv', index_col='annotation', dtype='string')
print("Annotation-Label Dictionary")
print(label_dict)

# Chosen label mapping
label_mapping = "label:Willetts2018"
print(label_dict[label_mapping])
print()

# Print files
print('Content of data/')
print(sorted(os.listdir('/kaggle/input/capture-24-human-activity-recognition/capture24')))
print()

# All of the labels
all_labels = list({*label_dict[label_mapping]})
print('All of the possible labels:')
print(all_labels)

# Training the model
## Training parameters

In [None]:
# Path of the dataset
path = '/kaggle/input/capture-24-human-activity-recognition/capture24/'

# The files in the dataset
file_list = sorted([f for f in os.listdir(path) if f.endswith('.csv') and f.startswith('P')])

# Data and window parameters
winsize = 90
num_train_files = 100
num_test_files = 51
frequency = 64

if num_train_files + num_test_files > 151:
    raise Exception("Number of training and testing files exceeds the total number of files.")

# Choose test files
test_files = file_list[-num_test_files:]

# Choose train files
train_files = file_list[:num_train_files]

num_train_files = len(train_files)
num_test_files = len(test_files)

print("Number of files used for training:", num_train_files)
print("Number of files used for testing:", num_test_files)

## Loading the data

In [None]:
# Load and process data in chunks to manage memory
data_list = []
counter = 0

X, Y = [], []

# Load and concatenate data from all files
for file in train_files:
    counter += 1
    print("Processing File:", counter)
    
    file_path = os.path.join(path, file)
    file_path = os.path.join(file_path, file)
    data = pd.read_csv(file_path, index_col='time', parse_dates=['time'],
                           dtype={'x': 'f4', 'y': 'f4', 'z': 'f4', 'annotation': 'string'})
    
    data = map_labels(data, label_dict, label_mapping)      
    data = normalize(data, ['x','y','z'])
    
    X_, Y_ = extract_windows(data)
    del data
    
    X_feats = pd.DataFrame([extract_features(x) for x in X_])
    del X_
    
    X.append(X_feats)
    Y.append(Y_)
    
    del X_feats, Y_

# Concatenate all data
X_feats_train = pd.concat(X, ignore_index=True)
Y_train = np.concatenate(Y, axis=0)

del X, Y

print('Data preprocessing done.\n')

## Training the model

In [None]:
warnings.filterwarnings("ignore", category=FutureWarning, module="sklearn")

model = BalancedRandomForestClassifier(
    random_state=0, 
    n_estimators=3000, 
    criterion='gini'
)

# Use Stratified K-Fold Cross-Validation
cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

cv_scores = cross_val_score(
    estimator=model,
    X=X_feats_train,
    y=Y_train,
    scoring='accuracy',
    cv=cv,
    error_score='raise'
)

# Print the results
print(f'Cross-validation scores: {cv_scores}')
print(f'Mean cross-validation score: {cv_scores.mean()}')
print(f'Standard deviation of cross-validation score: {cv_scores.std()}')

model.classes_ = np.array(all_labels)

model.fit(X_feats_train, Y_train)

# Testing the model

## Loading the data

In [None]:
# Load and concatenate data from all files
data_list = []
counter = 0

# Load and process data in chunks to manage memory
for file in test_files:
    counter += 1
    print("Processing File:", counter)
    
    file_path = os.path.join(path, file)
    file_path = os.path.join(file_path, file)
    data = pd.read_csv(file_path, index_col='time', parse_dates=['time'],
                           dtype={'x': 'f4', 'y': 'f4', 'z': 'f4', 'annotation': 'string'})

    data = map_labels(data, label_dict, label_mapping)      
    data = normalize(data, ['x','y','z'])
    data_list.append(data)

    # Map the label
    map_labels(data, label_dict, label_mapping)
    data_list.append(data)

X = []
Y = []

# Extract windows and features
for data in data_list:
    X_, Y_ = extract_windows(data)
    X.append(X_)
    Y.append(Y_)

X_feats = []

# Extract features
for X_ in X:
    X_feats.append(pd.DataFrame([
        extract_features(x) for x in X_]))

X_feats_test = pd.concat(X_feats, ignore_index=True)
Y_test = np.concatenate(Y, axis=0)

print('Data preprocessing done.\n')

## Testing the model

In [None]:
Y_pred = model.predict(X_feats_test)

# Calculate the important metrics
accuracy = accuracy_score(Y_test, Y_pred)
f1_score = f1_score(Y_test, Y_pred, average='weighted')
precision = precision_score(Y_test, Y_pred, average='weighted')
recall = recall_score(Y_test, Y_pred, average='weighted')

print(f'Accuracy: {accuracy}')
print(f'F1 score: {model.score(X_feats_test, Y_test)}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'Confusion matrix: {model.classes_}')

# Subset labels present in the test set
present_labels = np.unique(Y_test)

# Confusion matrix for present labels
cm = confusion_matrix(Y_test, Y_pred, labels=present_labels, normalize='true')

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=present_labels)
disp.plot(cmap='Blues', values_format=".2%")
disp.ax_.set_title("Confusion Matrix (Percentage)")

plt.show()

## Save the model

In [None]:
# Save the model
with open('rf_model.pkl', 'wb') as f:
    pickle.dump(model, f)