In [None]:
import os
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import torch
import torch.nn as nn
import warnings
warnings.filterwarnings('ignore')
from torch.utils.data import Dataset
from sklearn.metrics import confusion_matrix


class get_dataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe.drop('subject', axis=1)
        self.labels = self.dataframe['label'].values
        self.dataframe.drop('label', axis=1, inplace=True)
        
    def __getitem__(self, idx):
        x = self.dataframe.iloc[idx].values
        y = self.labels[idx]
        return torch.Tensor(x), y

    def __len__(self):
        return len(self.dataframe)

#LOSO(Leave-one-subject-out) Cross Validation
feats =   ['BVP_mean', 'BVP_std', 'BVP_min', 'BVP_max',
           'EDA_phasic_mean', 'EDA_phasic_std', 'EDA_phasic_min', 'EDA_phasic_max', 'EDA_smna_mean',
           'EDA_smna_std', 'EDA_smna_min', 'EDA_smna_max', 'EDA_tonic_mean',
           'EDA_tonic_std', 'EDA_tonic_min', 'EDA_tonic_max', 'Resp_mean',
           'Resp_std', 'Resp_min', 'Resp_max', 'TEMP_mean', 'TEMP_std', 'TEMP_min',
           'TEMP_max', 'TEMP_slope', 'BVP_peak_freq', 'age', 'height',
           'weight','subject', 'label']
layer_1_dim = len(feats) -2
print(layer_1_dim)

# Load Data with Batch Sizes
def data_loader(df, subject_id, train_batch_size=25, test_batch_size=5):
    #df = pd.read_csv('data/m14_merged.csv', index_col=0)[feats]

    train_df = df[ df['subject'] != subject_id].reset_index(drop=True)
    test_df = df[ df['subject'] == subject_id].reset_index(drop=True)
    
    train_dset = get_dataset(train_df)
    test_dset = get-dataset(test_df)

    train_loader = torch.utils.data.DataLoader(train_dset, batch_size=train_batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dset, batch_size=test_batch_size)
    
    return train_loader, test_loader

# Neural Net Architecture
class StressNet(nn.Module):
    def __init__(self):
        super(StressNet, self).__init__()
        self.fc = nn.Sequential(
                        nn.Linear(29, 128),                        
                        nn.ReLU(),
                        nn.Linear(128, 256),                        
                        nn.ReLU(),
                        nn.Linear(256, 2),                        
                        nn.LogSoftmax(dim=1))
        
    def forward(self, x):
        return self.fc(x)


#Model Training
def train(model, optimizer, train_loader, validation_loader):
    history = {'train_loss': {}, 'train_acc': {}, 'valid_loss': {}, 'valid_acc': {}}
    #
    for epoch in range(num_epochs):

        # Train:   
        total = 0
        correct = 0
        trainlosses = []

        for batch_index, (images, labels) in enumerate(train_loader):

            # Send to GPU (device)
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images.float())

            # Loss
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            trainlosses.append(loss.item())

            # Compute accuracy
            _, argmax = torch.max(outputs, 1)
            correct += (labels == argmax).sum().item() #.mean()
            total += len(labels)

        history['train_loss'][epoch] = np.mean(trainlosses) 
        history['train_acc'][epoch] = correct/total 

        if epoch % 10 == 0:
            with torch.no_grad():

                losses = []
                total = 0
                correct = 0

                for images, labels in validation_loader:
                    # 
                    images, labels = images.to(device), labels.to(device)

                    # Forward pass
                    outputs = model(images.float())
                    loss = criterion(outputs, labels)

                    # Compute accuracy
                    _, argmax = torch.max(outputs, 1)
                    correct += (labels == argmax).sum().item() #.mean()
                    total += len(labels)

                    losses.append(loss.item())
                    
                history['valid_acc'][epoch] = np.round(correct/total, 3)
                history['valid_loss'][epoch] = np.mean(losses)

                print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {np.mean(losses):.4}, Acc: {correct/total:.2}')
                
    return history

# Model Testing
def test(model, validation_loader):
    print('Evaluating model...')
    # Test
    model.eval()

    total = 0
    correct = 0
    testlosses = []
    correct_labels = []
    predictions = []

    with torch.no_grad():

        for batch_index, (images, labels) in enumerate(validation_loader):
            # Send to GPU (device)
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images.float())

            # Loss
            loss = criterion(outputs, labels)

            testlosses.append(loss.item())

            # Compute accuracy
            _, argmax = torch.max(outputs, 1)
            correct += (labels == argmax).sum().item() #.mean()
            total += len(labels)

            correct_labels.extend(labels)
            predictions.extend(argmax)


    test_loss = np.mean(testlosses)
    accuracy = np.round(correct/total, 2)
    print(f'Loss: {test_loss:.4}, Acc: {accuracy:.2}')
    
    y_true = [label.item() for label in correct_labels]
    y_pred = [label.item() for label in predictions]

    cm = confusion_matrix(y_true, y_pred)
    # TODO: return y true and y pred, make cm after ( use ytrue/ypred for classification report)
    # return [y_true, y_pred, test_loss, accuracy]
    return cm, test_loss, accuracy

# Change the label three class to binary
def change_label(label):
    if label == 0 or label == 1:
        return 0
    else:
        return 1

# Call Change Label    
def call_change_label(df):
  df['label'] = df['label'].apply(change_label)
  return df['label']

if __name__ == "__main__":
    #Get csv and Subject List
    df = pd.read_csv('data/stress.csv', index_col=0)
    subject_id_list = df['subject'].unique()
    df.head()
    # Get Features
    df = df[feats]
    df['label'] = call_change_label(df)
    train_batch_size = 25
    test_batch_size = 5

    # Learning Rate
    learning_rate = 5e-3

    # Device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Number of Epochs
    num_epochs = 100

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()


    histories = []
    confusion_matrices = []
    test_losses = []
    test_accs = []
    
    #Get Subject-wise Accuracy
    for _ in subject_id_list:
        print('\nSubject: ', _)
        model = StressNet().to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        
        train_loader, test_loader = data_loader(df, _)
        
        history = train(model, optimizer, train_loader, test_loader)
        histories.append(history)
        
        cm, test_loss, test_acc = test(model, test_loader)
        test_losses.append(test_loss)
        test_accs.append(test_acc)
        confusion_matrices.append(cm)
    
    #Final Accuracy of Model
    print("Final Accuracy")  
    print("Test Accuracy: ",np.mean(test_accs))
    # Test Loss
    print("Validation Loss: ",np.mean(test_losses))
  
    print("Count of Stres and Non-stress ")
    print(df['label'].value_counts())

    # Plot Test Accuracy   
    plt.figure(figsize=(14, 6))
    plt.title('Testing Accuracy')
    sns.barplot(x=subject_id_list, y=test_accs)
    # Plot Validation Loss  
    plt.figure(figsize=(14, 3))
    plt.title('Testing Loss')
    sns.barplot(x=subject_id_list, y=test_losses)
