In [1]:
import pandas as pd
import numpy as np
import ast
import re
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from model import *

In [None]:
df = pd.read_pickle('data/model_data_window_300_space_50.pkl')
df.head()

In [3]:
name_fix_dict = {'non-e' : 'rest',
                 'nonexercise': 'rest',
                 'staticstretch(atyourownpace)': 'staticstretch',
                 'two-armdumbbellcurl(botharms,notalternating)': 'bicepcurls',
                 'wallballs': 'wallball',
                 'dumbbell_shoulder_press': 'dumbbellshoulderpress',
                 'lateral_shoulder_raises': 'lateralshoulderraises',
                 'sit-up(handspositionedbehindhead)': 'situps'}


df['activity_name'] = df['activity_name'].apply(lambda x: name_fix_dict[x] if x in name_fix_dict.keys() else x)

In [4]:
df['activity_name'].unique()

array(['squats', 'rest', 'pushups', 'dumbbellshoulderpress', 'lunges',
       'dumbbell_rows', 'situps', 'tricep_extensions', 'bicep_curls',
       'lateralshoulderraises', 'jumping_jacks', 'kbpress', 'boxjumps',
       'deadlifts', 'crunches', 'kbsquatpress', 'null', 'wallball',
       'burpees', 'pullups', 'dip', 'bicepscurl(band)',
       'ellipticalmachine', 'staticstretch', 'sideplankleftside',
       'burpee', 'bicepcurls', 'tricepextensions',
       'fastalternatingpunches', 'dynamicstretch(atyourownpace)', 'walk',
       'plank', 'v-up', 'dumbbellrows', 'deviceontable',
       'kettlebellswing', 'russiantwist', 'crunch', 'seatedbackfly',
       'butterflysit-up', 'jumprope', 'sideplankrightside', 'note',
       'taprightdevice', 'repetitivestretching', 'jumpingjacks',
       'powerboatpose', 'tapleftdevice', 'unlistedexercise',
       'armbandadjustment', 'running(treadmill)', 'medicineballslam',
       'overheadtricepsextension(labelspansbotharms)', 'bandpull-downrow',
       

In [5]:
df[df['activity_name']=='null']['dataset'].unique()

array(['har_data'], dtype=object)

In [6]:
# limit to labels with enough examples

counts = df['activity_name'].value_counts()
valid_activities = [activity for activity in counts.index.tolist() if counts[activity] >=1000]
df = df[df['activity_name'].isin(valid_activities)]

print(f"There are {len(df['activity_name'].unique())} unique activities in the dataset")

There are 34 unique activities in the dataset


In [7]:
# Create test train val split on the user level

# split test/train
train_users, test_users = train_test_split(df['subject_id'].unique(), train_size=.8, random_state=42)

train_data = df[df['subject_id'].isin(train_users)]
test_data = df[df['subject_id'].isin(test_users)]

# split train/val
train_users, val_users = train_test_split(train_data['subject_id'].unique(), train_size=.9, random_state=42)

val_data = train_data[train_data['subject_id'].isin(val_users)]
train_data = train_data[train_data['subject_id'].isin(train_users)]

print("The Sizes of the Train, Test, and Val Sets are:")
print(f"Train Size: {len(train_data)}")
print(f"Test Size: {len(test_data)}")
print(f"Val Size: {len(val_data)}")

The Sizes of the Train, Test, and Val Sets are:
Train Size: 111373
Test Size: 30961
Val Size: 10312


In [8]:
# Seperate X and Y components of data

X_train = np.array(train_data['sig_array'])
X_test = np.array(test_data['sig_array'])
X_Val = np.array(val_data['sig_array'])

y_train = np.array(train_data['activity_name'])
y_test = np.array(test_data['activity_name'])
y_val = np.array(val_data['activity_name'])

In [9]:
# Get Data in right format

# Encode labels
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)
y_val_encoded = label_encoder.transform(y_val)


X_train = np.stack(X_train).astype(np.float32)
X_test = np.stack(X_test).astype(np.float32)
X_Val = np.stack(X_Val).astype(np.float32)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
X_Val_tensor = torch.tensor(X_Val, dtype=torch.float32)

# Convert y arrays to tensors
y_train_tensor = torch.tensor(y_train_encoded, dtype=torch.long)
y_test_tensor = torch.tensor(y_test_encoded, dtype=torch.long)
y_val_tensor = torch.tensor(y_val_encoded, dtype=torch.long)

In [10]:
from torch.utils.data import Dataset, DataLoader

class IMUDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).permute(0, 2, 1)  # Convert to tensor and swap axes
        self.y = torch.tensor(y, dtype=torch.long)  # Ensure labels are integers for classification

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create Dataset Objects
train_dataset = IMUDataset(X_train, y_train_encoded)
val_dataset = IMUDataset(X_Val, y_val_encoded)
test_dataset = IMUDataset(X_test, y_test_encoded)

# DataLoader
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [11]:
def compute_accuracy(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()
    return 100 * correct / total

In [12]:
import importlib
import model
importlib.reload(model)

<module 'model' from '/Users/jacobgottesman/Public/DS 4440/smartwatch-activity-recognition/model.py'>

In [13]:
# Initialize parameters
input_channels = 6  # Number of input channels in your data
num_classes = 34     # Number of classes in your classification task
window_length = 300 # Length of your input sequences
# Set device to mps if available
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Run hyperparameter tuning
study = model.run_hyperparameter_tuning(
    train_loader=train_loader,
    val_loader=val_loader,
    device=device,
    input_channels=input_channels,
    num_classes=num_classes,
    window_length=window_length,
    n_trials=30,
    class_names = list(label_encoder.classes_)
)



[I 2025-02-18 17:34:09,120] A new study created in memory with name: cnn_optimization_20250218_173409


  0%|          | 0/2 [00:00<?, ?it/s]



Epoch 1/1
[W 2025-02-18 17:34:11,230] Trial 0 failed with parameters: {'conv_layers': 3, 'kernel_size': 15, 'initial_filters': 192, 'dropout_rate': 0.559195090518222, 'learning_rate': 2.9380279387035334e-05, 'weight_decay': 2.9375384576328313e-06, 'n_hidden_layers': 1, 'hidden_layer_0': 448} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/Users/jacobgottesman/Public/DS 4440/smartwatch-activity-recognition/model.py", line 769, in <lambda>
    lambda trial: objective(
                  ^^^^^^^^^^
  File "/Users/jacobgottesman/Public/DS 4440/smartwatch-activity-recognition/model.py", line 722, in objective
    model, history = train_model_with_advanced_logging(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jacobgottesman/Public/DS 4440/smartwatch-

KeyboardInterrupt: 


Best trial:

Best F1 Macro Score: 0.7746

Best hyperparameters:
  conv_layers: 3
  kernel_size: 15
  initial_filters: 192
  dropout_rate: 0.559195090518222
  learning_rate: 2.9380279387035334e-05
  weight_decay: 2.9375384576328313e-06
  n_hidden_layers: 1
  hidden_layer_0: 448

Best validation loss: 0.5193
dict_keys(['conv_layers', 'kernel_size', 'initial_filters', 'dropout_rate', 'learning_rate', 'weight_decay', 'n_hidden_layers', 'hidden_layer_0'])


In [37]:
# Analyze results
best_params = analyze_study_results(study)


non_model_param_keys = ['learning_rate', 'weight_decay', 'n_hidden_layers'] + [f'hidden_layer_{i}' for i in range(best_params['n_hidden_layers'])]
model_params = {key:val for key, val in best_params.items() if key not in non_model_param_keys}
model_params['hidden_layers'] = [best_params[f'hidden_layer_{i}'] for i in range(best_params['n_hidden_layers'])]

{'conv_layers': 3, 'kernel_size': 15, 'initial_filters': 192, 'dropout_rate': 0.559195090518222, 'hidden_layers': [448]}


In [None]:


# Train final model with best parameters
final_model = FlexibleCNN(
    input_channels=input_channels,
    num_classes=num_classes,
    window_length=window_length,
    **model_params
).to(device)

# Train the final model with full epochs
final_model, history = train_model_with_advanced_logging(
    model=final_model,
    train_loader=train_loader,
    val_loader=val_loader,
    device=device,
    model_name="final_model",
    learning_rate=best_params['learning_rate'],
    weight_decay=best_params['weight_decay'],
    num_epochs=100 ,
    class_names = list(label_encoder.classes_)
)

# Evaluate on test set
final_model.eval()
test_predictions = []
test_true = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(device)
        outputs = final_model(X_batch)
        _, predicted = torch.max(outputs, 1)
        test_predictions.extend(predicted.cpu().numpy())
        test_true.extend(y_batch.numpy())

# Calculate final test metrics
test_accuracy = accuracy_score(test_true, test_predictions)
test_f1_macro = f1_score(test_true, test_predictions, average='macro')
test_f1_weighted = f1_score(test_true, test_predictions, average='weighted')

print(f"\nFinal Test Results:")
print(f"Accuracy: {test_accuracy:.4f}")
print(f"Macro F1: {test_f1_macro:.4f}")
print(f"Weighted F1: {test_f1_weighted:.4f}")


The verbose parameter is deprecated. Please use get_last_lr() to access the learning rate.



Epoch 1/100


KeyboardInterrupt: 