# Loading Packages

In [3]:
import re 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns
from sklearn.model_selection import train_test_split
import random 
import math
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.utils as torch_utils
from torch.utils.data import TensorDataset, DataLoader, Dataset
from tqdm import tqdm_notebook
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import torch.nn.functional as F
import d2l
import time
import traceback
import fastprogress
from torchmetrics.classification import BinaryAccuracy, Accuracy 
import torch.nn.init as init
import torch.optim.lr_scheduler as lr_scheduler
from itertools import repeat

In [4]:
seed = 100
torch.manual_seed(seed)
np.random.seed(seed)

In [5]:
# torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Loading Data (with scaling)

In [6]:
# Custom Dataset class for loading data
class MyDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        # Implement data retrieval for each index
        input_data = self.X[idx]
        target_data = self.y[idx]
        input_data = input_data.unsqueeze(0)
        
        # Convert data to torch tensors if required
        input_tensor = torch.Tensor(input_data)
        target_tensor = torch.Tensor(target_data)
        
        return input_tensor, target_tensor

In [7]:
#with scaling
df = pd.read_csv("C:/Users/kacpe/Desktop/study/research lab/data_model_v2.csv")
# List of column names to drop
columns_to_drop = ['lKnee_x','lKnee_y','lKnee_z','lAnkle_x','lAnkle_y','lAnkle_z','rKnee_x','rKnee_y','rKnee_z','rAnkle_x','rAnkle_y','rAnkle_z']
#columns_to_keep =  ['id', 'trial','lShoulder_x', 'lShoulder_y', 'lShoulder_z', 'lElbow_x', 'lElbow_y', 'lElbow_z', 'lWrist_x', 'lWrist_y', 'lWrist_z']
#df = df.drop(columns=columns_to_drop)
#df = df[columns_to_keep]
# Step 1: Separate 'id' and 'trial' columns from the rest of the data
data_to_scale = df.drop(columns=['id', 'trial'])

# Step 2: Apply MinMaxScaler to the remaining columns
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data_to_scale)

# Convert the scaled data back to a DataFrame
scaled_df = pd.DataFrame(scaled_data, columns=data_to_scale.columns)

# Step 3: Merge 'id' and 'trial' columns with the scaled data
scaled_df[['id', 'trial']] = df[['id', 'trial']]

# Step 4: Split the data into training and test sets based on the 'trial' column
train_set = scaled_df[scaled_df['trial'].isin(range(1, 15))].drop(columns=['id', 'trial'])
test_set = scaled_df[scaled_df['trial']==15].drop(columns=['id', 'trial'])
val_set = scaled_df[scaled_df['trial']==16].drop(columns=['id', 'trial'])
full_set = scaled_df.drop(columns=['id','trial'])

# split data into x and y 
X_train, y_train = train_set.iloc[:,:-4], train_set.iloc[:,-4:]
X_test, y_test = test_set.iloc[:,:-4], test_set.iloc[:,-4:]
X_val, y_val = val_set.iloc[:,:-4], val_set.iloc[:,-4:]
X, y = full_set.iloc[:,:-4], full_set.iloc[:,-4:]

# Create custom datasets for training, validation, and testing
full_dataset = MyDataset(torch.tensor(X.values), torch.tensor(y.values))
train_dataset = MyDataset(torch.tensor(X_train.values), torch.tensor(y_train.values))
val_dataset = MyDataset(torch.tensor(X_val.values), torch.tensor(y_val.values))
test_dataset = MyDataset(torch.tensor(X_test.values), torch.tensor(y_test.values))

# Create a DataLoader
#batch_size = 5561#67  # Set your desired batch size
#shuffle = False  # Set to False to preserve the order of your data
#fullset_b_size = X.shape[0]/2
fullset_dataloader = DataLoader(full_dataset, batch_size=X.shape[0], shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=X_train.shape[0], shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=X_test.shape[0], shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=X_val.shape[0], shuffle=False)

# Model Architecture

In [6]:
class GRUNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers, drop_prob=0.2, bidirectional=False):
        super(GRUNet, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        
        self.gru = nn.GRU(input_dim, hidden_dim, n_layers, batch_first=True, dropout=drop_prob)
        self.gru = nn.GRUCell(input_dim, hidden_dim, bias=True)
        #Xavier initialization for GRU weights
        #for name, param in self.gru.named_parameters():
        #    if 'weight' in name:
        #        init.xavier_uniform_(param.data)
        #    elif 'bias' in name:
        #        init.constant_(param.data, 0.0)
                
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sotfplus = nn.Softplus()
        self.relu = nn.ReLU()

        
        
    def forward(self, x, h):
        out, h = self.gru(x, h)
        out = self.fc(self.sotfplus(out[:,-1]))
        #out = self.fc(self.relu(out[:,-1]))
        out = F.softmax(out, dim=1)
        return out, h
    
    #def init_hidden(self, batch_size):
        #weight = next(self.parameters()).data
        #hidden = weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device)
        #return hidden
    def init_hidden(self, batch_size):
        if batch_size > 1:
            weight = next(self.parameters()).data
            hidden = weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device)
        else:
            weight = next(self.parameters()).data
            hidden = weight.new(self.n_layers, self.hidden_dim).zero_().to(device)
        return hidden

In [15]:
class GRUCellNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.gru = nn.GRUCell(input_dim, hidden_dim, bias=True)                
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softplus = nn.Softplus()
        self.relu = nn.ReLU()

        self.gru_cells = nn.ModuleList([
            nn.GRUCell(input_dim, hidden_dim) if i == 0 else nn.GRUCell(hidden_dim, hidden_dim)
            for i in range(num_layers)
        ])
        self.batch_norm = nn.BatchNorm1d(hidden_dim)  # Add BatchNorm outside GRU cells

        
        
    def forward(self, x, h=None):
        if h is None:
            h = [torch.zeros(x.size(0), self.hidden_dim) for _ in range(self.num_layers)]
        
        hidden_states = []
        
        for t in range(x.size(1)):
            input_t = x[:, t, :]
            new_hidden_states = []
            for layer_idx, gru_cell in enumerate(self.gru_cells):
                h[layer_idx] = gru_cell(input_t, h[layer_idx])
                new_hidden_states.append(h[layer_idx])
                input_t = h[layer_idx]  # Update input_t with the new hidden state for the next layer
            hidden_states.append(new_hidden_states)
        
        last_hidden_states = [layer_states[-1] for layer_states in hidden_states]
        # Apply BatchNorm to the last hidden state
        last_hidden_states[-1] = self.batch_norm(last_hidden_states[-1])
        #all_hidden_states = torch.cat(hidden_states, dim=1)  # Stack all hidden states across time steps
        
        
        out = self.fc(self.softplus(last_hidden_states[-1]))
        #out = self.fc(self.softplus(all_hidden_states[:, -1]))  # Use the last hidden state for prediction
        #probs = torch.sigmoid(out)  # Apply sigmoid activation to get probabilities
        #preds = torch.round(probs)  # Convert probabilities to binary predictions
        #out = F.sigmoid(out, dim=1)
        
        return out, last_hidden_states 
    
    
    
    #def init_hidden(self, batch_size):
     #   return nn.Parameter(torch.zeros([batch_size, self.gru_cells[0].hidden_size]))

# Training functions

In [9]:
def accuracy(correct, total):
    return float(correct)/total

In [10]:
def train(dataloader, model, optimizer, loss_fn, scheduler=None, device=None):
    epoch_loss = []
    epoch_correct, epoch_total = 0, 0
    model = model.to(device)
    model.train()
    predicted_probs = []
    predicted_labels = []
    #hidden_states = []
    for x, y in dataloader:
        x = x.to(device).float()
        y = y.to(device).float()
        
        optimizer.zero_grad()
        
        out, last_hidden_states = model(x)
        y_prob = torch.sigmoid(out)
        # Append the predicted probabilities to the list
        predicted_probs.append(y_prob.cpu().detach().numpy())
        loss = loss_fn(out, y)
        epoch_loss.append(loss.item())

        #hidden_states.append(hidden)
        
        y_pred = torch.round(y_prob)
        epoch_correct += sum((y == y_pred).flatten()).item()
        epoch_total += y.numel()
        
        loss.backward()
        torch_utils.clip_grad_norm_(model.parameters(), max_norm=10)
        optimizer.step()

        predicted_labels.extend(zip(y_pred.cpu().detach().cpu().numpy(), y.cpu().numpy()))
        if scheduler:
            scheduler.step()
    
    return np.mean(epoch_loss), accuracy(epoch_correct, epoch_total), predicted_labels, predicted_probs 

In [11]:
def validate(dataloader, model, loss_fn, device=None):
    epoch_loss = []
    epoch_correct, epoch_total = 0, 0
    model = model.to(device).float()
    model.eval()
    predicted_probs = []
    predicted_labels = []
    #hidden_states = []
    with torch.no_grad():
        for x, y in dataloader:
            x = x.to(device).float()
            y = y.to(device).float()
            
            out, last_hidden_states = model(x)
            
            loss = loss_fn(out, y)
            epoch_loss.append(loss.item())

            #hidden_states.append(hidden)
            y_pred = torch.sigmoid(out)
            predicted_probs.append(y_pred.cpu().detach().numpy())
            y_pred = torch.round(y_pred)
            epoch_correct += sum((y == y_pred).flatten())
            epoch_total += y.numel()
            predicted_labels.extend(zip(y_pred.cpu().numpy(), y.cpu().numpy()))
    
    return np.mean(epoch_loss), accuracy(epoch_correct, epoch_total), predicted_labels, predicted_probs

In [12]:
def run_training(train_dataloader, val_dataloader, model, optimizer, loss_fn, num_epochs, scheduler=None, device=None, schedule_on_train=True, verbose=True):
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []

    #train_hidden_states, val_hidden_states = [], []
    
    for epoch in range(num_epochs):
        epoch_train_loss, epoch_train_acc, train_preds, train_probs = train(train_dataloader, model, optimizer, loss_fn, scheduler, device)
        
        train_losses.append(epoch_train_loss)
        train_accs.append(epoch_train_acc)

        #train_hidden_states.extend(train_hidden)
        
        if val_dataloader is not None:
            epoch_val_loss, epoch_val_acc, val_preds, val_probs = validate(val_dataloader, model, loss_fn, device)
        
            val_losses.append(epoch_val_loss)
            val_accs.append(epoch_val_acc)

            #val_hidden_states.extend(val_hidden)
        
        #if isinstance(scheduler, ReduceLROnPlateau):
        #    scheduler.step(epoch_train_acc if schedule_on_train or val_dataloader is None else epoch_val_acc)
            
        if epoch % 50 == 0:
            val_str = f", val loss: {epoch_val_loss}, val acc: {epoch_val_acc}" if val_dataloader is not None else ""
            print(f"Epoch {epoch}, train loss: {epoch_train_loss}, train acc: {epoch_train_acc}{val_str}")
        if epoch == num_epochs - 1:  # Store values only for the final epoch
            train_predicted_labels = train_preds
            #val_predicted_labels = val_preds
            train_probs_final = train_probs
            #val_probs_final = val_probs
            if val_dataloader is not None:
                val_predicted_labels = val_preds
                val_probs_final = val_probs

    if val_dataloader is not None:        
        return train_losses, train_accs, val_losses, val_accs, train_predicted_labels, val_predicted_labels, train_probs_final, val_probs_final
    else: 
        return train_losses, train_accs, val_losses, val_accs, train_predicted_labels, train_probs_final

# Testing

In [15]:
X_train.shape

(6107, 54)

In [18]:

# Hyperparameters
input_dim = 54
hidden_dim = 22
output_dim = 4
num_layers = 1
n_epochs =2500
lr = 0.01

# Create an instance of GRUCellNet
model = GRUCellNet(input_dim, hidden_dim, output_dim, num_layers)

# Define loss function and optimizer
class_weights = torch.tensor([1.8]).to(device)
loss_fn = nn.BCEWithLogitsLoss(pos_weight=class_weights)  
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.95)

In [21]:
train_losses, train_accs, val_losses, val_accs, train_predicted, val_predicted, train_probs, val_probs= run_training(
    train_dataloader, val_dataloader=val_dataloader, model=model, optimizer=optimizer, loss_fn=loss_fn, num_epochs=n_epochs, scheduler=scheduler)

Epoch 0, train loss: 0.1825670599937439, train acc: 0.9345423284755199, val loss: 1.117614984512329, val acc: 0.8097014925373134


KeyboardInterrupt: 

In [20]:
# Load the saved weights into the model
#torch.save(model.state_dict(), 'model_weights.pth')
#model = GRUCellNet(input_dim, hidden_dim, output_dim, num_layers)
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()  # Set the model in evaluation mode

GRUCellNet(
  (gru): GRUCell(54, 22)
  (fc): Linear(in_features=22, out_features=4, bias=True)
  (softplus): Softplus(beta=1, threshold=20)
  (relu): ReLU()
  (gru_cells): ModuleList(
    (0): GRUCell(54, 22)
  )
  (batch_norm): BatchNorm1d(22, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [28]:
# Forward pass to get predictions
model.eval()
epoch_loss, accuracy, predicted_labels, predicted_probs = validate(test_dataloader,model,loss_fn)
predicted_labels

TypeError: 'float' object is not callable

In [35]:
model.load_state_dict(torch.load('model_weights.pth'))  # Load saved weights
model.eval()  # Set the model to evaluation mode
# Initialize an empty list to store predictions
all_predictions = []

# Iterate through the test data batches
for inputs, _ in test_dataloader:
    inputs = inputs.float()
    # Forward pass to get predictions
    with torch.no_grad():
        predictions, _ = model(inputs)
        probabilities = torch.sigmoid(predictions)
        preds = torch.round(probabilities)

    # Append predictions to the list
    all_predictions.append(preds)

# Concatenate the predicted batches
all_predictions = torch.cat(all_predictions, dim=0)

In [49]:
predictions_df

Unnamed: 0,s_1,s_2,s_3,s_4
6107,1.0,1.0,1.0,1.0
6108,1.0,1.0,1.0,1.0
6109,1.0,1.0,1.0,1.0
6110,1.0,1.0,1.0,1.0
6111,1.0,1.0,1.0,1.0
...,...,...,...,...
6612,1.0,0.0,0.0,1.0
6613,1.0,0.0,0.0,1.0
6614,1.0,0.0,0.0,1.0
6615,1.0,0.0,0.0,1.0


In [50]:
# Convert the tensor of predictions to a DataFrame
predictions_df = pd.DataFrame(all_predictions.numpy(), columns=y_test.columns, index=y_test.index)
# Calculate accuracy for each output state
accuracies = (predictions_df == y_test).mean()

print("Accuracy for each output state:")
print(np.mean(accuracies))

Accuracy for each output state:
0.7480392156862744


In [None]:
def train(dataloader, model, optimizer, loss_fn, scheduler=None, device=None):
    epoch_loss = []
    epoch_correct, epoch_total = 0, 0
    model = model.to(device)
    model.train()
    predicted_probs = []
    predicted_labels = []
    hidden_states = []
    
    for x, y in dataloader:
        x = x.to(device).float()
        y = y.to(device).float()
        
        optimizer.zero_grad()
        
        y_pred_probs, _, hidden = model(x)  # Get probabilities and hidden states
        
        loss = loss_fn(y_pred_probs, y)
        epoch_loss.append(loss.item())

        hidden_states.append(hidden)
        
        y_pred = torch.round(y_pred_probs)
        epoch_correct += sum((y == y_pred).flatten()).item()
        epoch_total += y.numel()
        
        loss.backward()
        optimizer.step()

        predicted_probs.extend(y_pred_probs.cpu().detach().numpy())
        predicted_labels.extend(zip(y_pred.cpu().detach().numpy(), y.cpu().numpy()))
        
        if scheduler:
            scheduler.step()
    
    #all_hidden_states = torch.cat([torch.stack(layer_states) for layer_states in hidden_states], dim=1)  # Concatenate tensors within each time step
    
    return np.mean(epoch_loss), accuracy(epoch_correct, epoch_total), predicted_labels,predicted_probs #all_hidden_states 


In [None]:
def run_training(train_dataloader, val_dataloader, model, optimizer, loss_fn, num_epochs, scheduler=None, device=None, schedule_on_train=True, verbose=True):
    train_losses, train_accs = [], []
    val_losses, val_accs = [], []
    train_predicted_labels = []  
    val_predicted_labels = []  
    train_hidden_states = []  
    val_hidden_states = []  
    train_predicted_probs = []  
    
    for epoch in range(num_epochs):
        #epoch_train_loss, epoch_train_acc, train_preds, train_hidden, train_probs = train(train_dataloader, model, optimizer, loss_fn, scheduler, device)
        epoch_train_loss, epoch_train_acc, train_preds, train_probs = train(train_dataloader, model, optimizer, loss_fn, scheduler, device)
        
        train_losses.append(epoch_train_loss)
        train_accs.append(epoch_train_acc)
        train_predicted_labels.extend(train_preds)
        train_hidden_states.extend(train_hidden)
        train_predicted_probs.extend(train_probs)
        
        if val_dataloader is not None:
            epoch_val_loss, epoch_val_acc, val_preds, val_hidden, val_probs = validate(val_dataloader, model, loss_fn, device)
        
            val_losses.append(epoch_val_loss)
            val_accs.append(epoch_val_acc)
            val_predicted_labels.extend(val_preds)
            val_hidden_states.extend(val_hidden)
            val_predicted_probs.extend(val_probs)
        
        if epoch % 50 == 0:
            val_str = f", val loss: {epoch_val_loss}, val acc: {epoch_val_acc}" if val_dataloader is not None else ""
            print(f"Epoch {epoch}, train loss: {epoch_train_loss}, train acc: {epoch_train_acc}{val_str}")
            
    return (
        train_losses, train_accs, val_losses, val_accs,
        train_predicted_labels, val_predicted_labels,
        train_predicted_probs, val_predicted_probs )