In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

from utils import *

In [2]:
torch.cuda.is_available()

True

In [3]:
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.data import WeightedRandomSampler
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        
    def __len__(self):
        return self.X.shape[0]
    
    def __getitem__(self, idx):
        x = self.X[idx, :]
        tensor = torch.from_numpy(x).float()
        class_id = self.y[idx]
        class_id = torch.tensor([class_id]).float()
        return tensor, class_id
        
    def getY(self):
        return self.y
    
class Data_Loaders():
    def __init__(self, batch_size, test_size=0.2, scaling=True, weighting=True):
        self.weighting=weighting
        self.scaling=scaling
        
        X = np.loadtxt("data/protein_train.data")
        y = np.loadtxt("data/protein_train.solution")

        #self.X_test__ = np.loadtxt("data/protein_test.data")
        #self.X_valid__ = np.loadtxt("data/protein_valid.data")
        print("Data Loaded")
        
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, stratify=y, random_state=113)

        if scaling:
            self.scaler=StandardScaler()
            X_train = self.scaler.fit_transform(X_train)
            X_test = self.scaler.transform(X_test)
            #X_test__ = self.scaler.transform(X_test__)
            #X_valid__ = self.scaler.transform(X_valid__)
            print("Data standardized")
        
        self.train_set = CustomDataset(X_train, y_train)
        self.test_set = CustomDataset(X_test, y_test)
        
        
        # Weighting
        if weighting:
            target_list = y_train
            _, counts = np.unique(target_list, return_counts=True)
            class_weights = [1 - (x / sum(counts)) for x in counts]
            class_weights = torch.tensor(class_weights).float().to(device)
            class_weights_all = class_weights[target_list]

            weighted_sampler = WeightedRandomSampler(
                weights=class_weights_all,
                num_samples=len(class_weights_all),
                replacement=True
            )
            
            self.train_loader = DataLoader(self.train_set, batch_size=batch_size, sampler=weighted_sampler)
        else:
            self.train_loader = DataLoader(self.train_set, batch_size=batch_size)
            
        self.test_loader = DataLoader(self.test_set, batch_size=1)


In [4]:
dataset = Data_Loaders(20, scaling=True, weighting=False)

Data Loaded
Data standardized


In [5]:
from sklearn.metrics import balanced_accuracy_score


class BinaryMLP(nn.Module):
    def __init__(self, **kwargs):
        super().__init__()
        self.hidden_layer_1 = nn.Linear(
            in_features=kwargs["input_shape"], out_features=256
        )
        #self.batchnorm1 = nn.BatchNorm1d(256)
        
        self.hidden_layer_2 = nn.Linear(
            in_features=256, out_features=128
        )
        
        #self.batchnorm2 = nn.BatchNorm1d(128)
        
        self.out_layer = nn.Linear(
            in_features=128, out_features=1
        )

    def forward(self, features):
        x = self.hidden_layer_1(features)
        #x = self.batchnorm1(x)
        x = torch.relu(x)
        x = self.hidden_layer_2(x)
        #x = self.batchnorm2(x)
        x = torch.relu(x)
        out = torch.sigmoid(self.out_layer(x))
        return out
    

def binary_acc(y_pred, y_test):
    y_pred_tag = torch.round(y_pred)
    y_pred_tag = y_pred_tag.detach().cpu().numpy().flatten()
    y_test = y_test.cpu().numpy().flatten()
    
    #print("y_pred_tag")
    #print(y_pred_tag)
    #print("y_test")
    #print(y_test)
    
    acc = balanced_accuracy_score(y_pred_tag, y_test)
    
    acc = np.round(acc * 100, 2)
    
    return acc

"""y_pred_tag = torch.round(y_pred)

correct_results_sum = (y_pred_tag == y_test).sum().float()
acc = correct_results_sum/y_test.shape[0]
acc = torch.round(acc * 100)

return acc"""


base_size = 952
    
#  use gpu if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# create a model from `AE` autoencoder class
# load it to the specified device, either gpu or cpu
model = BinaryMLP(input_shape=base_size).to(device)

# create an optimizer object
# Adam optimizer with learning rate 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# mean-squared error loss
criterion = nn.BCELoss()

In [6]:
epochs=10
for epoch in range(epochs):
    loss = 0
    epoch_acc = 0
    for batch_features, batch_labels in dataset.train_loader:
        # reshape mini-batch data to [N, 941] matrix
        # load it to the active device
        batch_features = batch_features.view(-1, base_size).to(device)
        batch_labels = batch_labels.view(-1, 1).to(device)
        
        # reset the gradients back to zero
        # PyTorch accumulates gradients on subsequent backward passes
        optimizer.zero_grad()
        
        # compute reconstructions
        outputs = model(batch_features)
        
        # compute training reconstruction loss
        train_loss = criterion(outputs, batch_labels)
        acc = binary_acc(outputs, batch_labels)
        
        # compute accumulated gradients
        train_loss.backward()
        
        # perform parameter update based on current gradients
        optimizer.step()
        
        # add the mini-batch training loss to epoch loss
        loss += train_loss.item()
        epoch_acc += acc.item()
    
    # compute the epoch training loss
    epoch_loss = loss / len(dataset.train_loader)
    epoch_acc = epoch_acc / len(dataset.train_loader)
    
    # display the epoch training loss
    print(f'Epoch {epoch+0:03}: | Loss: {epoch_loss:.5f} | Acc: {epoch_acc:.3f}')





Epoch 000: | Loss: 0.29729 | Acc: 84.715




Epoch 001: | Loss: 0.17229 | Acc: 92.192




Epoch 002: | Loss: 0.10659 | Acc: 95.318




Epoch 003: | Loss: 0.07249 | Acc: 96.697




Epoch 004: | Loss: 0.05669 | Acc: 97.389




Epoch 005: | Loss: 0.04481 | Acc: 98.010




Epoch 006: | Loss: 0.03797 | Acc: 98.297




Epoch 007: | Loss: 0.03724 | Acc: 98.327
Epoch 008: | Loss: 0.03030 | Acc: 98.735
Epoch 009: | Loss: 0.02414 | Acc: 98.934


In [7]:
from sklearn.metrics import confusion_matrix

y_pred_list = []
y_test = []
model.eval()
with torch.no_grad():
    for X_batch, y_batch in dataset.test_loader:
        X_batch = X_batch.to(device)
        y_test.append(y_batch.cpu().numpy())
        y_test_pred = model(X_batch)
        y_pred_tag = torch.round(y_test_pred)
        y_pred_list.append(y_pred_tag.cpu().numpy())
    y_test = np.asarray(y_test).flatten().tolist()
y_pred_list = [a.squeeze().tolist() for a in y_pred_list]

confusion_matrix(y_test, y_pred_list)
print(f"Balanced score is {balanced_accuracy_score(y_test, y_pred_list)}")

Balanced score is 0.9175128330947508
