In [None]:
import torch
import torch.nn as nn
import pandas as pd
from torch.autograd import Variable
from sklearn.model_selection import train_test_split

In [None]:
class ClassificationModel(nn.Module):
    def __init__(self,input_dim):
        super(ClassificationModel, self).__init__()
        self.layers = nn.Sequential(
          nn.Linear(input_dim,512),
          nn.ReLU(),
          nn.Linear(512,256),
          nn.ReLU(),
          nn.Linear(256,128),
          nn.ReLU(),
          nn.Linear(128,64),
          nn.ReLU(),
          nn.Linear(64,1),
          nn.Sigmoid()
        )
               
    # 3. Define a forward method containing the forward pass computation
    def forward(self, x):
        return self.layers(x)

In [None]:
def reset_weights(m):
  '''
    Try resetting model weights to avoid
    weight leakage.
  '''
  for layer in m.children():
   if hasattr(layer, 'reset_parameters'):
    print(f'Reset trainable parameters of layer = {layer}')
    layer.reset_parameters()

In [None]:
def transformAndScale(X,y,train_ids, test_ids):
  scaler = StandardScaler()
  xtrain, xtest = X[train_ids], X[test_ids]
  ytrain, ytest = y[train_ids], y[test_ids]
  X_train = scaler.fit_transform(xtrain)
  X_test = scaler.transform(xtest)
  train_tensor = torch.tensor(X_train)
  test_tensor = torch.tensor(X_test)
  y_tensor =  torch.from_numpy(ytrain.values.ravel()).float()
  ytest_tensor =  torch.from_numpy(ytest.values.ravel()).float()
  y_tensor = y_tensor.unsqueeze(1)
  ytest_tensor = ytest_tensor.unsqueeze(1)
  return train_tensor,test_tensor,y_tensor,ytest_tensor

In [None]:
def predictEncoder(train_tensor,test_tensor):
  train_tensor = Variable(train_tensor).cuda()
  test_tensor = Variable(test_tensor).cuda()
  with torch.no_grad():  
    encoded_train_balanced = autoencoder.encoder(train_tensor.float())
    encoded_test_balanced = autoencoder.encoder(test_tensor.float())
  return encoded_train_balanced,encoded_test_balanced

In [None]:
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item() # torch.eq() calculates where two tensors are equal
    acc = (correct / len(y_pred)) * 100 
    return acc

In [None]:
from sklearn.model_selection import StratifiedKFold
import itertools
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score,recall_score,f1_score,matthews_corrcoef, cohen_kappa_score,roc_curve,RocCurveDisplay
import numpy as np
from time import process_time

#task = Task.init(project_name='MusIkA', task_name='5-KFold Classification try-2')
#task = Task.init(project_name='MusIkA', task_name='5-KFold Classification try-whitout-encoding')

bs = 256
k_folds = 5
num_epochs = 200

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  
# For fold results
results = {}
  
# Set fixed random number seed
torch.manual_seed(42)
  
  
# Define the K-fold Cross Validator
kfold = StratifiedKFold(n_splits=k_folds, shuffle=True)
    
# Start print
print('--------------------------------')

# K-fold Cross Validation model evaluation
t1 = process_time()
for fold, (train_ids, test_ids) in enumerate(kfold.split(X,y)):
    
    # Print
  print(f'FOLD {fold}')
  print('--------------------------------')
  
  train_tensor,test_tensor,y_tensor,ytest_tensor = transformAndScale(X,y,train_ids, test_ids)

  #encodedtrain, encodedtest = predictEncoder(train_tensor,test_tensor)

  train_ds = torch.utils.data.TensorDataset(train_tensor, y_tensor)
  train_loader = torch.utils.data.DataLoader(train_ds, batch_size=bs)
  test_ds = torch.utils.data.TensorDataset(test_tensor, ytest_tensor)
  test_loader = torch.utils.data.DataLoader(test_ds, batch_size=128)
  net = ClassificationModel(192)
  net = net.to(device)
  net.apply(reset_weights)

  loss_function = nn.BCELoss()  # binary cross entropy
  learning_rate = 0.0001
  optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
  net.train()
  
  for epoch in range(0, num_epochs):

      # Print epoch
      running_loss = 0.0
      print(f'Starting epoch {epoch+1}')
      # Set current loss value
      for xb, yb in train_loader:
          xb = xb.to(torch.float32)
          yb = yb.to(device)
          xb = xb.to(device)
          optimizer.zero_grad() 
          y_pred = net(xb)            # Forward Propagation
          loss = loss_function(y_pred, yb)  # Loss Computation
          loss.backward()               # Back Propagation
          optimizer.step()
          running_loss += loss.item() * bs
      epoch_loss = running_loss / len(train_tensor)              # Updating the parameters 
      print("Loss in iteration :"+str(epoch)+" is: "+str(loss.item()))
      #Logger.current_logger().report_scalar(f'loss graph in fold {fold+1}', "train loss", iteration=epoch+1,value=epoch_loss)     
      print('Last iteration loss value: '+str(loss.item()))

  net.eval()
  y_pred_list = []
  y_score_list = []
  with torch.no_grad():
    for xb_test,yb_test  in test_loader:
        xb_test = xb_test.to(torch.float32)
        xb_test = xb_test.to(device)
        yb_test = yb_test.to(device)
        y_test_pred = net(xb_test)
        y_score_list.append(y_test_pred.numpy(force=True))
        y_pred_tag = torch.round(y_test_pred)
        y_pred_tag = y_pred_tag.cpu()
        y_pred_list.append(y_pred_tag.detach().numpy())     
    #Takes arrays and makes them list of list for each batch        
  y_pred_list = [a.squeeze().tolist() for a in y_pred_list]
  y_score_list = [a.squeeze().tolist() for a in y_score_list]
  ytest_pred = list(itertools.chain.from_iterable(y_pred_list))
  y_score_pred = list(itertools.chain.from_iterable(y_score_list))
  ytest_pred = torch.FloatTensor(ytest_pred) 
  y_score_pred = torch.FloatTensor(y_score_pred) 
  ytest_pred = ytest_pred.unsqueeze(1)
  y_score_pred = y_score_pred.unsqueeze(1)
  conf_matrix = confusion_matrix(ytest_tensor ,ytest_pred)
  tn, fp, fn, tp = confusion_matrix(ytest_tensor ,ytest_pred).ravel()
  print(f'true negatives: {tn} false positives: {fp} false negatives {fn} true positives {tp}')
  print(f'confusion matrix fold {fold}')
  print("-----------")
  print(conf_matrix)
  # Logger.current_logger().report_matrix(
  #   f'confusion matrix of fold {fold+1}',
  #   "ignored",
  #   iteration=fold,
  #   matrix=conf_matrix,
  #   xaxis="True class",
  #   yaxis="Predicted class",
  #   xlabels= ['Negative','Positive'],
  #   ylabels = ['Negative','Positive'],
  #   yaxis_reversed = False
  # )
  precision = precision_score(ytest_tensor,ytest_pred)
  recall = recall_score(ytest_tensor,ytest_pred)
  f1_score_value = f1_score(ytest_tensor,ytest_pred)
  mcc = matthews_corrcoef(ytest_tensor,ytest_pred)
  kappa = cohen_kappa_score(ytest_tensor,ytest_pred)
  fpr, tpr, thresholds = roc_curve(ytest_tensor, y_score_pred)
  roc_curve_save = RocCurveDisplay.from_predictions(ytest_tensor, y_score_pred,name=f'Classifier in fold {fold+1}')
  plt.show()
  print(type(roc_curve_save))
  #task.get_logger().report_plotly(title=f'Roc curve fold {fold+1}', series="", iteration=0, figure=roc_curve_save)
  print("Precision of the MLP :\t"+str(precision))
  print("Recall of the MLP    :\t"+str(recall))
  print("F1 Score of the Model :\t"+str(f1_score_value))
  acc = accuracy_fn(ytest_tensor,ytest_pred)
  print(f'Accuracy of the model: {acc}')
  print(f'Matthews correlation coefficient: {mcc}')
  print(f'cohen kappa score: {kappa}')
  # Logger.current_logger().report_single_value(f'Accuracy in fold {fold+1}',acc)
  # Logger.current_logger().report_single_value(f'Precision in fold {fold+1}',precision)
  # Logger.current_logger().report_single_value(f'Recall in fold {fold+1}',recall)
  # Logger.current_logger().report_single_value(f'F1_score in fold {fold+1}',f1_score_value)
  # Logger.current_logger().report_single_value(f'Cohen kappa score in fold {fold+1}',kappa)
  # Logger.current_logger().report_single_value(f'Matthews correlation coefficient in fold {fold+1}',mcc)
  results[fold] = acc
print(f'K-FOLD CROSS VALIDATION RESULTS FOR {k_folds} FOLDS')
print('--------------------------------')
sum = 0.0
for key, value in results.items():
  print(f'Fold {key}: {value} %')
  sum += value
avg = sum/len(results.items())
print(f'Average: {avg} %') 
t2 = process_time() 
print("Elapsed time:", t2-t1) 
# Logger.current_logger().report_single_value('Average accuracy in 5 folds',avg)
# task.close()