In [1]:
import numpy as np
import pandas as pd
import torch
import scipy.stats as stats
from scipy.signal import welch
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
import joblib
import os

In [2]:
data_path = '/Users/pranavxiyer/Documents/northwestern/classes/first/spring/machine learning and sensing/project/data'

Label Information

In [3]:
sampling_frequency = 100

In [4]:
label_list = ['rotate_next', 'rotate_prev', 'like', 'stop', 'other']

In [5]:
label_to_idx = {
    'rotate_next': 0,
    'rotate_prev': 1,
    'like': 2,
    'stop': 3,
    'other': 4
}

In [6]:
idx_to_label = {
    0: 'rotate_next',
    1: 'rotate_prev',
    2: 'like',
    3: 'stop',
    4: 'other'
}

Functions for Feature Extraction

In [7]:
feature_list = ['mean', 'std', 'energy', 'max_val', 'min_val', 'min_max_range', 
                'median_above_mean', 'iqr', 'skewness', 
                'mean_psd', 'std_psd', 'max_psd', 'median_psd', 'min_psd', 'entropy_psd']

In [8]:
def mean(signal):
    return np.mean(signal)

In [9]:
def std(signal):
    return np.std(signal)

In [10]:
def energy(signal):
    return np.sum(np.square(signal)) / len(signal)

In [11]:
def max_val(signal):
    return np.max(signal)

In [12]:
def min_max_range(signal):
    return np.max(signal) - np.min(signal)

In [13]:
def min_val(signal):
    return np.min(signal)

In [14]:
def median_above_mean(signal):
    mean_val = np.mean(signal)
    above_mean = signal[signal > mean_val]
    if len(above_mean) > 0:
        return np.median(above_mean)
    else:
        return 0

In [15]:
def interquartile_range(signal):
    return np.percentile(signal, 75) - np.percentile(signal, 25)

In [16]:
def skewness(signal):
    return stats.skew(signal)

In [17]:
def psd_features(signal, fs):  # fs: sampling frequency
    freqs, psd = welch(signal, fs=fs, nperseg=min(len(signal), 256))
    mean_psd = np.mean(psd)
    std_psd = np.std(psd)
    max_psd = np.max(psd)
    min_psd = np.min(psd)
    median_psd = np.median(psd)
    
    # Normalize for entropy calculation
    psd_norm = psd / np.sum(psd) if np.sum(psd) > 0 else np.ones_like(psd) / len(psd)
    entropy = -np.sum(psd_norm * np.log2(psd_norm + 1e-12))  # avoid log(0)
    
    return {
        'mean_psd': mean_psd,
        'std_psd': std_psd,
        'max_psd': max_psd,
        'min_psd': min_psd,
        'median_psd': median_psd,
        'entropy_psd': entropy
    }

In [18]:
def extract_features_column(signal, column_name, fs):
    features = {}
    features[column_name + '_mean'] = mean(signal)
    features[column_name + '_std'] = std(signal)
    features[column_name + '_energy'] = energy(signal)
    features[column_name + '_max_val'] = max_val(signal)
    features[column_name + '_min_val'] = min_val(signal)
    features[column_name + '_min_max_range'] = min_max_range(signal)
    features[column_name + '_median_above_mean'] = median_above_mean(signal)
    features[column_name + '_iqr'] = interquartile_range(signal)
    features[column_name + '_skewness'] = skewness(signal)

    signal_psd_features = psd_features(signal, fs)
    for key, value in signal_psd_features.items():
        features[column_name + '_' + key] = value
    
    return features

In [19]:
def extract_features(df, fs):
    combined_features = {}
    for column_name in df.columns:
        # print(f"column name: {column_name}")
        # print(df[column_name])
        column_data = df[column_name].to_numpy()
        # print(column_data.shape)
        column_features = extract_features_column(column_data, column_name, fs)
        combined_features.update(column_features)
    return combined_features


In [20]:
X = []
y = []

total_files = 0

for label in label_list:
    label_path = os.path.join(data_path, label)
    csv_folder = os.path.join(label_path, 'csv')
    # print(csv_folder)
    for file in os.listdir(csv_folder):
        csv_path = os.path.join(csv_folder, file)
        # print(csv_path)
        total_files += 1
        df = pd.read_csv(csv_path)
        df = df.rename(columns={' pitch': 'pitch'})
        df = df[['accel_x', 'accel_y', 'accel_z', 'rotation_x', 'rotation_y', 'rotation_z', 'pitch', 'roll', 'yaw']]
        numpy_df = df.to_numpy()
        # print(numpy_df.shape)
        # print(df.head())
        features = extract_features(df, sampling_frequency)
        # print(len(features))
        X.append(np.array(list(features.values())))
        y.append(label_to_idx[label])

In [21]:
X = np.array(X)
y = np.array(y)

In [22]:
print(X.shape)
print(y.shape)


(88, 135)
(88,)


Random Forest

In [23]:
train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.2, random_state=42)

In [24]:
scaler = StandardScaler()
train_X = scaler.fit_transform(train_X)
test_X = scaler.transform(test_X)

In [25]:
random_forest = RandomForestClassifier(n_estimators=100, random_state=42)

In [26]:
def train_and_evaluate_model(model, X_train, X_test, y_train, y_test, validation=False):
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)

    return model, accuracy

In [27]:
random_forest, accuracy = train_and_evaluate_model(random_forest, train_X, test_X, train_y, test_y)
print("model: ", random_forest)
print("model accuracy: ", accuracy)

model:  RandomForestClassifier(random_state=42)
model accuracy:  1.0


In [28]:
model_path = '/Users/pranavxiyer/Documents/northwestern/classes/first/spring/machine learning and sensing/project/models'
random_forest_path = os.path.join(model_path, 'random_forest.joblib')
joblib.dump(random_forest, random_forest_path)

['/Users/pranavxiyer/Documents/northwestern/classes/first/spring/machine learning and sensing/project/models/random_forest.joblib']

In [29]:
scaler_path = '/Users/pranavxiyer/Documents/northwestern/classes/first/spring/machine learning and sensing/project/scaler'
standard_scaler_path = os.path.join(scaler_path, 'standard_scaler.joblib')
joblib.dump(scaler, standard_scaler_path)

['/Users/pranavxiyer/Documents/northwestern/classes/first/spring/machine learning and sensing/project/scaler/standard_scaler.joblib']

Neural Network (Linear Layers)

In [668]:
import torch.nn as nn

input_size = 9 * 15 # 9 sensors * 15 features
output_size = 5  # 5 gesture classes

model = nn.Sequential(
    # Layer 1: Input -> Hidden
    nn.Linear(input_size, 256),
    nn.ReLU(),
    nn.BatchNorm1d(256),
    nn.Dropout(0.3),
    
    # Layer 2: Hidden -> Hidden
    nn.Linear(256, 128),
    nn.ReLU(),
    nn.BatchNorm1d(128),
    nn.Dropout(0.3),
    
    # Layer 3: Hidden -> Hidden
    nn.Linear(128, 64),
    nn.ReLU(),
    nn.BatchNorm1d(64),
    nn.Dropout(0.3),
    
    # Layer 4: Hidden -> Hidden
    nn.Linear(64, 32),
    nn.ReLU(),
    nn.BatchNorm1d(32),
    nn.Dropout(0.3),
    
    # Layer 5: Hidden -> Output
    nn.Linear(32, output_size)
)

In [669]:
train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_y_tensor = torch.tensor(train_y, dtype=torch.long)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_y_tensor = torch.tensor(test_y, dtype=torch.long)

In [670]:
learning_rate = 0.001
num_epochs = 25
batch_size = 32

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [671]:
train_dataset = torch.utils.data.TensorDataset(train_X_tensor, train_y_tensor)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataset = torch.utils.data.TensorDataset(test_X_tensor, test_y_tensor)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [672]:
# 5. Training loop
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Training loop
    for inputs, labels in train_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        # Track statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    # Calculate epoch statistics
    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100 * correct / total
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_accuracy)
    
    # Evaluation on test set
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        test_outputs = model(test_X_tensor)
        test_loss = criterion(test_outputs, test_y_tensor)
        _, predicted = torch.max(test_outputs.data, 1)
        test_accuracy = 100 * ((predicted == test_y_tensor).sum().item() / test_y_tensor.size(0))
        test_losses.append(test_loss.item())
        test_accuracies.append(test_accuracy)
    
    # Print progress
    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {epoch_loss:.4f}, '
              f'Train Acc: {epoch_accuracy:.2f}%, '
              f'Test Loss: {test_loss:.4f}, '
              f'Test Acc: {test_accuracy:.2f}%')

Epoch [5/25], Train Loss: 0.7523, Train Acc: 85.71%, Test Loss: 1.1314, Test Acc: 88.89%
Epoch [10/25], Train Loss: 0.4988, Train Acc: 95.71%, Test Loss: 0.5083, Test Acc: 100.00%
Epoch [15/25], Train Loss: 0.3233, Train Acc: 98.57%, Test Loss: 0.3972, Test Acc: 94.44%
Epoch [20/25], Train Loss: 0.3046, Train Acc: 98.57%, Test Loss: 0.3563, Test Acc: 94.44%
Epoch [25/25], Train Loss: 0.3739, Train Acc: 97.14%, Test Loss: 0.3623, Test Acc: 88.89%
