<a href="https://colab.research.google.com/github/AnoushkaKareCode111/AnoushkaKareCode111/blob/main/IMU_LSTM_pipeline_zenodo_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IMU-based HAR & Payload Estimation
This notebook reproduces the experiments from the paper.


In [None]:
!pip install -q scikit-learn scipy nbformat
import tensorflow as tf
print("TensorFlow version:", tf.__version__)


import os, glob, re, math, random, json, time
import numpy as np, pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, f1_score, precision_score, recall_score, accuracy_score
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
from scipy.signal import butter, filtfilt
import seaborn as sns
sns.set()
seed = 1234
np.random.seed(seed); random.seed(seed); tf.random.set_seed(seed)


TensorFlow version: 2.19.0


In [None]:
# ==== Reproducibility seeds ====
import os, random, numpy as np, tensorflow as tf

SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

print("Random seed fixed to", SEED)
# ===============================


Random seed fixed to 42


In [None]:
from google.colab import drive
drive.mount('/content/drive')

# EDIT: set this to the folder where you put the Zenodo CSV files
BASE_PATH = "/content/drive/MyDrive/imu_project/"  # <<-- change if needed

files = os.listdir(BASE_PATH)
subjects = sorted({ fname.split('_')[0] for fname in files if fname.startswith('U') })
print('Detected subjects (files present):', subjects)
expected = [f"U{str(i).zfill(3)}" for i in range(1,13)]
print('Expected subjects:', expected)


MessageError: Error: credential propagation was unsuccessful

In [None]:
def detect_label_columns(df):
    cols = df.columns.str.lower()
    label_candidates = [c for c in df.columns if re.search(r'(label|action|intent|intention|class)', c, re.I)]
    payload_candidates = [c for c in df.columns if re.search(r'(payload|weight|mass)', c, re.I)]
    timestamp_candidates = [c for c in df.columns if re.search(r'(time|timestamp|ts)', c, re.I)]
    return {
        'label_col': label_candidates[0] if label_candidates else None,
        'payload_col': payload_candidates[0] if payload_candidates else None,
        'timestamp_col': timestamp_candidates[0] if timestamp_candidates else None
    }

def detect_sensor_columns(df, exclude_cols):
    sensor_cols = [c for c in df.select_dtypes(include=[np.number]).columns if c not in exclude_cols]
    return sensor_cols

from scipy.signal import butter, filtfilt

def lowpass_filtfilt(df, cols, fs=100, cutoff=5.0, order=4):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    out = df.copy()
    for c in cols:
        try:
            out[c] = filtfilt(b, a, df[c].astype(float).to_numpy(), padlen=3*(max(len(a),len(b))-1))
        except Exception:
            out[c] = df[c].astype(float).to_numpy()
    return out


def sliding_windows(X, y, window_size, step):
    Xs, ys = [], []
    T = X.shape[0]
    for start in range(0, T - window_size + 1, step):
        end = start + window_size
        Xw = X[start:end, :]
        yw = y[start:end]
        vals, counts = np.unique(yw, return_counts=True)
        label = vals[np.argmax(counts)]
        Xs.append(Xw)
        ys.append(label)
    return np.array(Xs), np.array(ys)


def derive_action_and_interaction(labels):
    actions = []
    inters  = []
    for v in labels:
        s = str(v).lower()
        if 'walk' in s:
            actions.append('walk'); inters.append('none')
        elif 'stand' in s or 'idle' in s:
            actions.append('stand'); inters.append('none')
        elif 'lift' in s:
            actions.append('interact'); inters.append('lift')
        elif 'lower' in s or 'put' in s:
            actions.append('interact'); inters.append('lower')
        else:
            actions.append('stand'); inters.append('none')
    return np.array(actions), np.array(inters)


In [None]:
from tensorflow.keras import layers, models

def build_har_model(input_shape, n_action=3, n_inter=2):
    inputs = layers.Input(shape=input_shape)
    x = layers.LSTM(100, return_sequences=True)(inputs)
    x = layers.LSTM(50)(x)
    x = layers.Dense(20, activation='relu')(x)
    out_action = layers.Dense(n_action, activation='softmax', name='action')(x)
    out_inter  = layers.Dense(n_inter, activation='softmax', name='interaction')(x)
    model = models.Model(inputs, [out_action, out_inter])
    model.compile(optimizer='adam',
                  loss={'action':'categorical_crossentropy', 'interaction':'categorical_crossentropy'},
                  metrics={'action':'accuracy','interaction':'accuracy'})
    return model

def build_payload_model(input_shape, n_classes=3):
    inputs = layers.Input(shape=input_shape)
    x = layers.LSTM(100, return_sequences=True)(inputs)
    x = layers.LSTM(50)(x)
    x = layers.Dense(20, activation='relu')(x)
    out_payload = layers.Dense(n_classes, activation='softmax', name='payload')(x)
    model = models.Model(inputs, out_payload)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model


In [None]:
FS = 100
CUTOFF = 5.0
WIN_SEC = 1.0
WIN = int(WIN_SEC * FS)
STEP = WIN // 2

all_results = []

FAST_MODE = True   # change to False to run ALL 12 subjects

if FAST_MODE:
    subj_list = ["U001"]   # runs quickly
else:
    subj_list = [f"U{str(i).zfill(3)}" for i in range(1,13)]  # full experiment


for SUBJ in subj_list:
    print('\n===== SUBJECT', SUBJ, '=====')
    patt = os.path.join(BASE_PATH, f"{SUBJ}_*.csv")
    files = sorted(glob.glob(patt))
    if not files:
        print('No files for', SUBJ, '- skipping')
        continue
    data = { os.path.basename(f): pd.read_csv(f, low_memory=False) for f in files }
    train_int = data.get(f"{SUBJ}_train_intention.csv")
    val_int   = data.get(f"{SUBJ}_val_intention.csv")
    test_int  = data.get(f"{SUBJ}_test_intention.csv")
    if train_int is None or test_int is None:
        print('Missing intention files for', SUBJ); continue
    train_int_full = pd.concat([train_int, val_int], ignore_index=True) if val_int is not None else train_int
    det = detect_label_columns(train_int_full)
    LABEL_COL_INTENTION = det['label_col'] or det['payload_col'] or train_int_full.columns[-1]
    TIMESTAMP_COL = det['timestamp_col']
    exclude = [LABEL_COL_INTENTION] + ([TIMESTAMP_COL] if TIMESTAMP_COL else [])
    sensor_cols = detect_sensor_columns(train_int_full, exclude)
    train_int_filt = lowpass_filtfilt(train_int_full, sensor_cols, fs=FS, cutoff=CUTOFF, order=4)
    test_int_filt  = lowpass_filtfilt(test_int, sensor_cols, fs=FS, cutoff=CUTOFF, order=4)
    scaler = MinMaxScaler(feature_range=(-1,1))
    scaler.fit(train_int_filt[sensor_cols])
    Xtr = scaler.transform(train_int_filt[sensor_cols])
    Xte = scaler.transform(test_int_filt[sensor_cols])
    ytr_raw = train_int_filt[LABEL_COL_INTENTION].values
    yte_raw = test_int_filt[LABEL_COL_INTENTION].values
    Xtr_seq, ytr_win = sliding_windows(Xtr, ytr_raw, WIN, STEP)
    Xte_seq, yte_win = sliding_windows(Xte, yte_raw, WIN, STEP)
    if len(Xtr_seq)==0 or len(Xte_seq)==0:
        print('Not enough windows for', SUBJ); continue
    action_tr, inter_tr = derive_action_and_interaction(ytr_win)
    action_te, inter_te = derive_action_and_interaction(yte_win)
    le_act = LabelEncoder(); le_act.fit(np.concatenate([action_tr, action_te]))
    le_int = LabelEncoder(); le_int.fit(np.concatenate([inter_tr, inter_te]))
    Yact_tr = to_categorical(le_act.transform(action_tr))
    Yact_te = to_categorical(le_act.transform(action_te))
    Yint_tr = to_categorical(le_int.transform(inter_tr))
    Yint_te = to_categorical(le_int.transform(inter_te))
    input_shape = Xtr_seq.shape[1:]
    har_model = build_har_model(input_shape, n_action=Yact_tr.shape[1], n_inter=Yint_tr.shape[1])
    es = callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)
    history = har_model.fit(Xtr_seq, {'action':Yact_tr, 'interaction':Yint_tr},
                            validation_data=(Xte_seq, {'action':Yact_te, 'interaction':Yint_te}),
                            epochs=60, batch_size=64, callbacks=[es], verbose=0)
    pred_act, pred_int = har_model.predict(Xte_seq, verbose=0)
    yhat_act = le_act.inverse_transform(np.argmax(pred_act, axis=1))
    yhat_int = le_int.inverse_transform(np.argmax(pred_int, axis=1))
    f1_act = f1_score(action_te, yhat_act, average='weighted')
    f1_int = f1_score(inter_te, yhat_int, average='weighted')
    acc_act = accuracy_score(action_te, yhat_act)
    acc_int = accuracy_score(inter_te, yhat_int)
    print('HAR action F1:', round(f1_act,3), 'interaction F1:', round(f1_int,3))
    train_pay = data.get(f"{SUBJ}_train_payload.csv")
    val_pay   = data.get(f"{SUBJ}_val_payload.csv")
    test_pay  = data.get(f"{SUBJ}_test_payload.csv")
    if train_pay is None or test_pay is None:
        print('Missing payload files for', SUBJ); continue
    train_pay_full = pd.concat([train_pay, val_pay], ignore_index=True) if val_pay is not None else train_pay
    detp = detect_label_columns(train_pay_full)
    PAY_COL = detp['payload_col'] or train_pay_full.columns[-1]
    sensor_cols_pay = detect_sensor_columns(train_pay_full, [PAY_COL])
    train_pay_filt = lowpass_filtfilt(train_pay_full, sensor_cols_pay, fs=FS, cutoff=CUTOFF, order=4)
    test_pay_filt  = lowpass_filtfilt(test_pay, sensor_cols_pay, fs=FS, cutoff=CUTOFF, order=4)
    scaler_pay = MinMaxScaler(feature_range=(-1,1))
    scaler_pay.fit(train_pay_filt[sensor_cols_pay])
    Xtr_pay = scaler_pay.transform(train_pay_filt[sensor_cols_pay])
    Xte_pay = scaler_pay.transform(test_pay_filt[sensor_cols_pay])
    ytr_pay_raw = train_pay_filt[PAY_COL].values
    yte_pay_raw = test_pay_filt[PAY_COL].values
    Xtr_pay_seq, ytr_pay_win = sliding_windows(Xtr_pay, ytr_pay_raw, WIN, STEP)
    Xte_pay_seq, yte_pay_win = sliding_windows(Xte_pay, yte_pay_raw, WIN, STEP)
    if len(Xtr_pay_seq)==0 or len(Xte_pay_seq)==0:
        print('Not enough payload windows for', SUBJ); continue
    le_pay = LabelEncoder(); le_pay.fit(np.concatenate([ytr_pay_win, yte_pay_win]))
    Ytr_pay = to_categorical(le_pay.transform(ytr_pay_win))
    Yte_pay = to_categorical(le_pay.transform(yte_pay_win))
    payload_model = build_payload_model(Xtr_pay_seq.shape[1:], n_classes=Ytr_pay.shape[1])
    es2 = callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)
    payload_model.fit(Xtr_pay_seq, Ytr_pay, validation_data=(Xte_pay_seq, Yte_pay), epochs=60, batch_size=64, callbacks=[es2], verbose=0)
    pred_pay = payload_model.predict(Xte_pay_seq, verbose=0)
    yhat_pay = le_pay.inverse_transform(np.argmax(pred_pay, axis=1))
    f1_pay = f1_score(yte_pay_win, yhat_pay, average='weighted')
    acc_pay = accuracy_score(yte_pay_win, yhat_pay)
    print('Payload F1:', round(f1_pay,3), 'Acc:', round(acc_pay,3))
    all_results.append({'subject':SUBJ,
                        'har_action_f1':float(f1_act),'har_inter_f1':float(f1_int),'har_action_acc':float(acc_act),'har_inter_acc':float(acc_int),
                        'payload_f1':float(f1_pay),'payload_acc':float(acc_pay)})
    tf.keras.backend.clear_session()
    time.sleep(1)

# Save CSV of results and compute medians
import pandas as pd
if len(all_results)>0:
    df_res = pd.DataFrame(all_results)
    out_csv = os.path.join(BASE_PATH, 'all_subjects_results.csv')
    df_res.to_csv(out_csv, index=False)
    print('\nSaved results to', out_csv)
    print('Median HAR action F1:', df_res['har_action_f1'].median())
    print('Median HAR interaction F1:', df_res['har_inter_f1'].median())
    print('Median Payload F1:', df_res['payload_f1'].median())
else:
    print('No results to save.')


In [None]:
res_path = os.path.join(BASE_PATH, 'all_subjects_results.csv')
if os.path.exists(res_path):
    df_res = pd.read_csv(res_path)
    display(df_res)
    print('Medians:')
    print('HAR action F1 median:', df_res['har_action_f1'].median())
    print('HAR interaction F1 median:', df_res['har_inter_f1'].median())
    print('Payload F1 median:', df_res['payload_f1'].median())
else:
    print('Results file not found. Run previous cell first.')


### Notes & troubleshooting
- If auto-detection of columns fails, set label column names manually in the code before running the loop.
- If you run out of RAM/GPU, consider processing subjects one-by-one or reduce batch_size.
- The notebook trains subject-specific models; to reproduce paper medians, ensure you process all U001..U012.
- The window length is 1 second with 50% overlap; you can tune WIN_SEC variable.

After you download the notebook, upload it to your Drive and open in Colab. If you want, I can now provide the download link to the generated notebook file.