## Predicting topology of transmembrane proteins

This notebook serves as the final hand in for this project. Using a five-fold crossvalidation, the notebook will train a bidirectional neural network, hyperparameter optimise for the hidden number of units for the LSTM layer as well as number of layers via a validation set. Finally, the best model for each fold will be evaluated against a test set and the model performing best in the most folds will be chosen to visualise its results. The results include a loss curve, accuracy curves and a heat map.

In [1]:
# load modules
from itertools import cycle, islice
from cycler import cycler
import os
import torch
from torch import nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import numpy as np
import json
from torch.nn.utils.rnn import pad_sequence
from typing import List, Union, Dict
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns

## Loading of helper functions

The following cells will load functions that are needed for the analysis itself. When the analysis took place, these functions were scattered across multiple .py files. They have now been gathered here.

### Early stopping
Helper class for the early stopping implemented in the training loop:

In [14]:
class EarlyStopping:
    """This code is from: https://github.com/Bjarten/early-stopping-pytorch/blob/master/MNIST_Early_Stopping_example.ipynb
    However, the code has been adjusted to work for max accuracy as opposed to min loss
    
    Early stops the training if validation topology accuracy doesn't improve after some given patience."""
    
    def __init__(self, patience = 7, verbose = False, delta = 0, path = 'checkpoint.pt', trace_func = print):
        """
        args:
            patience (int): how long to wait after last time improved validation accuracy.Default: 7
            verbose (bool): if True, prints a message for each validation accuracy improvement. Default: False
            delta (float): minimum change in monitored quantity to qualify as improvement. Default: 0
            path (str): path for checkpoint save location. Default: 'checkpoint.pt'
            trace_func (function): trace print function. Default: print            
        """
        
        self.patience = patience
        self.counter = 0
        
        self.path = path
        self.trace_func = trace_func
        self.verbose = verbose
        
        self.best_score = None
        self.delta = delta
        self.early_stop = False
        self.val_acc_max = -np.Inf
        
    def __call__(self, val_acc, model):
        score = -val_acc

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_acc, model)
        
        # changing the inequality such that the accuracy keeps getting better
        elif score >= self.best_score + self.delta:
            self.counter += 1

            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_acc, model)
            self.counter = 0

    def save_checkpoint(self, val_acc, model):
        '''Saves model when validation accuracy increases.'''
        if self.verbose:
            self.trace_func(f'Validation accuracy increased ({self.val_acc_max:.6f} --> {val_acc:.6f}).  Saving model ...')
            
        torch.save(model.state_dict(), self.path)
        self.val_acc_max = val_acc

## BRNN model
Create NN class with hyperparameters as variables

In [3]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, num_layer):
        super(RNN, self).__init__()
        
        self.dropout = nn.Dropout(p = 0.2)
        self.hidden_size = hidden_size
        self.num_layers = num_layer
        # rnn
        self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, 
                            batch_first = True, bidirectional = True, num_layers = num_layer)

        # define fully connected layers
        self.linear = nn.Linear(hidden_size*2, num_classes, bias = False)

    def forward(self, x):
        # return output and last hidden state
        x, (h, c) = self.lstm(self.dropout(x))
        
        # fully-connected output layer
        x = self.linear(x)
        
        return x

## Accuracy computation function

All functions necessary for calculating accuracy in accordance with DeepTMHMM. Code is provided by DeepTMHMM but have been slightly modified to suit our analysis setup: https://www.biorxiv.org/content/10.1101/2022.04.08.487609v1.full

In [4]:
def update_nested_dict(d, keys, value):
    for idx, key in enumerate(keys[:-1]):
        if key not in d:
            d[key] = {}
        d = d[key]
    d[keys[-1]] = value

def custom_encoder(obj):
    if 'confusion_matrix' in obj:
        return json.dumps(obj, separators=(',', ':'), indent=4)
    else:
        return None

def type_from_labels(label):
    """
    Function that determines the protein type from labels

    Dimension of each label:
    (len_of_longenst_protein_in_batch)

    # Residue class
    0 = inside cell/cytosol (I)
    1 = Outside cell/lumen of ER/Golgi/lysosomes (O)
    2 = beta membrane (B)
    3 = signal peptide (S)
    4 = alpha membrane (M)
    5 = periplasm (P)

    B in the label sequence -> beta
    I only -> globular
    Both S and M -> SP + alpha(TM)
    M -> alpha(TM)
    S -> signal peptide

    # Protein type class
    0 = TM
    1 = SP + TM
    2 = SP
    3 = GLOBULAR
    4 = BETA
    """

    if 2 in label:
        ptype = 4

    elif all(element == 0 for element in label):
        ptype = 3

    elif 3 in label and 4 in label:
        ptype = 1

    elif 3 in label:
       ptype = 2

    elif 4 in label:
        ptype = 0

    elif all(x == 0 or x == -1 for x in label):
        ptype = 3

    else:
        ptype = None

    return ptype

def label_list_to_topology(labels: Union[List[int], torch.Tensor]) -> List[torch.Tensor]:
    """
    Converts a list of per-position labels to a topology representation.
    This maps every sequence to list of where each new symbol start (the topology), e.g. AAABBBBCCC -> [(0,A),(3, B)(7,C)]

    Parameters
    ----------
    labels : list or torch.Tensor of ints
        List of labels.

    Returns
    -------
    list of torch.Tensor
        List of tensors that represents the topology.
    """

    if isinstance(labels, list):
        labels = torch.LongTensor(labels)

    if isinstance(labels, torch.Tensor):
        zero_tensor = torch.LongTensor([0])
        if labels.is_cuda:
            zero_tensor = zero_tensor.cuda()

        unique, count = torch.unique_consecutive(labels, return_counts=True)
        top_list = [torch.cat((zero_tensor, labels[0:1]))]
        prev_count = 0
        i = 0
        for _ in unique.split(1):
            if i == 0:
                i += 1
                continue
            prev_count += count[i - 1]
            top_list.append(torch.cat((prev_count.view(1), unique[i].view(1))))
            i += 1
        return top_list


def is_topologies_equal(topology_a, topology_b, minimum_seqment_overlap=5):
    """
    Checks whether two topologies are equal.
    E.g. [(0,A),(3, B)(7,C)]  is the same as [(0,A),(4, B)(7,C)]
    But not the same as [(0,A),(3, C)(7,B)]

    Parameters
    ----------
    topology_a : list of torch.Tensor
        First topology. See label_list_to_topology.
    topology_b : list of torch.Tensor
        Second topology. See label_list_to_topology.
    minimum_seqment_overlap : int
        Minimum overlap between two segments to be considered equal.

    Returns
    -------
    bool
        True if topologies are equal, False otherwise.
    """

    if isinstance(topology_a[0], torch.Tensor):
        topology_a = list([a.cpu().numpy() for a in topology_a])
    if isinstance(topology_b[0], torch.Tensor):
        topology_b = list([b.cpu().numpy() for b in topology_b])
    if len(topology_a) != len(topology_b):
        return False
    for idx, (_position_a, label_a) in enumerate(topology_a):
        if label_a != topology_b[idx][1]:
            if (label_a in (1,2) and topology_b[idx][1] in (1,2)): # assume O == P
                continue
            else:
                return False
        if label_a in (3, 4, 5):
            if idx == (len(topology_a) - 1): # it's impossible to end in 3, 4 or 5
                return False
            else:
                overlap_segment_start = max(topology_a[idx][0], topology_b[idx][0])
                overlap_segment_end = min(topology_a[idx + 1][0], topology_b[idx + 1][0])
                    
            if label_a == 5:
                # Set minimum segment overlap to 3 for Beta regions
                minimum_seqment_overlap = 3
            if overlap_segment_end - overlap_segment_start < minimum_seqment_overlap:
                return False
    return True

def calculate_acc(correct, total):
    total = total.float()
    correct = correct.float()
    if total == 0.0:
        return torch.tensor(1)
    return correct / total

The following function calls all helper function for accuracy computations above

In [5]:
def test_predictions(model, loader, loss_function, cv, experiment_file_path, device, condition = "test", epoch = 0):
    # either loads an empty dict or the dict of the previous fold 
    experiment_json = json.loads(open(experiment_file_path, 'r').read())

    confusion_matrix = torch.zeros((5, 7), dtype=torch.int64)
    protein_label_actual = []
    protein_label_prediction = []
    
    with torch.no_grad():
        model.eval()
        test_batch_loss = []
        
        for _, batch in enumerate(loader):
            inputs, labels, lengths, types = batch['data'], batch['labels'], batch['lengths'], batch["type"]
            inputs, labels, lengths, types = inputs.to(device), labels.to(device), lengths.to(device), types.to(device)
            
            output = model(inputs)
            
            predict_label = []
            predict_prot_type = []
            ground_truth_label = []
            
            # invalid_indices = np.stack(torch.where(labels == -1), axis = 1)
            
            loss = 0
            for l in range(output.shape[0]):
                # masking the zero-padded outputs
                batch_output = output[l][:lengths[l]]
                batch_labels = labels[l][:lengths[l]]
                
                # compute cross-entropy loss
                loss += loss_function(batch_output, batch_labels) / output.shape[0]
                
                # predict labels and type for the masked outputs
                predictions_batch_mask = batch_output.max(-1)[1]
                predict_prot_type_batch = type_from_labels(predictions_batch_mask)
                
                predict_label.append(predictions_batch_mask)
                ground_truth_label.append(batch_labels)
                predict_prot_type.append(predict_prot_type_batch)
                
                # used later for the accuracy computation
                protein_label_actual.extend(ground_truth_label)
            
            # go over each protein and compute accuracy
            for idx, actual_type in enumerate(types):
                predicted_type = predict_prot_type[idx]
                predicted_topology = label_list_to_topology(predict_label[idx])
                predicted_labels_for_protein = predict_label[idx]
                
                # convert output and label to topology sequences    
                ground_truth_topology = label_list_to_topology(ground_truth_label[idx])
                
                prediction_topology_match = is_topologies_equal(ground_truth_topology, predicted_topology, 5)
                
                if actual_type == predicted_type:
                    # if we guessed the type right for SP+GLOB or GLOB,
                    # count the topology as correct
                    if actual_type == 2 or actual_type == 3 or prediction_topology_match:
                        confusion_matrix[actual_type][5] += 1
                    else:
                        confusion_matrix[actual_type][predicted_type] += 1
                elif predicted_type == None:
                    # add to invalid column
                    confusion_matrix[actual_type][6] += 1
                else:
                    confusion_matrix[actual_type][predicted_type] += 1
                
                protein_label_prediction.append(predicted_labels_for_protein)
                
                
            # receive loss from current batch
            test_batch_loss.append(loss.item())
        
    model.train()
    
    type_correct_ratio = \
    calculate_acc(confusion_matrix[0][0] + confusion_matrix[0][5], confusion_matrix[0].sum()) + \
    calculate_acc(confusion_matrix[1][1] + confusion_matrix[1][5], confusion_matrix[1].sum()) + \
    calculate_acc(confusion_matrix[2][2] + confusion_matrix[2][5], confusion_matrix[2].sum()) + \
    calculate_acc(confusion_matrix[3][3] + confusion_matrix[3][5], confusion_matrix[3].sum()) + \
    calculate_acc(confusion_matrix[4][4] + confusion_matrix[4][5], confusion_matrix[4].sum())
    type_accuracy = float((type_correct_ratio / 5).detach())

    tm_accuracy = float(calculate_acc(confusion_matrix[0][5], confusion_matrix[0].sum()).detach())
    sptm_accuracy = float(calculate_acc(confusion_matrix[1][5], confusion_matrix[1].sum()).detach())
    sp_accuracy = float(calculate_acc(confusion_matrix[2][5], confusion_matrix[2].sum()).detach())
    glob_accuracy = float(calculate_acc(confusion_matrix[3][5], confusion_matrix[3].sum()).detach())
    beta_accuracy = float(calculate_acc(confusion_matrix[4][5], confusion_matrix[4].sum()).detach())
    
    topology_accuracy = sum([tm_accuracy, sptm_accuracy, sp_accuracy, glob_accuracy, beta_accuracy]) / 5
    
    tm_type_acc = float(calculate_acc(confusion_matrix[0][0] + confusion_matrix[0][5], confusion_matrix[0].sum()).detach())
    tm_sp_type_acc = float(calculate_acc(confusion_matrix[1][1] + confusion_matrix[1][5], confusion_matrix[1].sum()).detach())
    sp_type_acc = float(calculate_acc(confusion_matrix[2][2] + confusion_matrix[2][5], confusion_matrix[2].sum()).detach())
    glob_type_acc = float(calculate_acc(confusion_matrix[3][3] + confusion_matrix[3][5], confusion_matrix[3].sum()).detach())
    beta_type_acc = float(calculate_acc(confusion_matrix[4][4] + confusion_matrix[4][5], confusion_matrix[4].sum()).detach())
    
    # add data to dictionary
    key_list = [condition, cv, epoch, 'confusion_matrix']
    update_nested_dict(experiment_json, key_list, confusion_matrix.tolist())
    
    experiment_json[condition][cv][epoch].update({
        'type': type_accuracy
    })
    
    experiment_json[condition][cv][epoch].update({
        'topology': topology_accuracy
    })
    
    # Topology 
    experiment_json[condition][cv][epoch].update({
        'tm': {
            'type': tm_type_acc,
            'topology': tm_accuracy
        }
    })
    
    experiment_json[condition][cv][epoch].update({
        'sptm': {
            'type': tm_sp_type_acc,
            'topology': sptm_accuracy
        }
    })
    
    experiment_json[condition][cv][epoch].update({
        'sp': {
            'type': sp_type_acc,
            'topology': sp_accuracy
        }
    })
    
    experiment_json[condition][cv][epoch].update({
        'glob': {
            'type': glob_type_acc,
            'topology': glob_accuracy
        }
    })
    
    experiment_json[condition][cv][epoch].update({
        'beta': {
            'type': beta_type_acc,
            'topology': beta_accuracy
        }
    })
    
    experiment_json[condition][cv][epoch].update({
        'loss': np.mean(test_batch_loss)
    })
    
    if condition == "test":
        experiment_json[condition][cv].update({
            "hyperparameter": (model.hidden_size, model.num_layers)
        })
    
    open(experiment_file_path, 'w').write(json.dumps(experiment_json, indent = 4, default = custom_encoder))
      
    return np.mean(test_batch_loss), topology_accuracy

## Create dataset structure

Create dataset to use for the dataloader

In [6]:
class CustomDataset(Dataset):
    def __init__(self, folder_path, split):
        self.folder_path = folder_path
        self.split = split
        self.data = self.load_data()
        
        self.label_encoding = {'I': 0, 'O': 1, 'P': 2, 'S': 3, 'M': 4, 'B': 5}

    def load_data(self):
        file_list = [f for f in os.listdir(self.folder_path) if f.endswith('.npy')]
        file_list.sort()  # Make sure the order is consistent

        data = []
        for file_name in file_list:
            file_path = os.path.join(self.folder_path, file_name)
            data.append(np.load(file_path, allow_pickle=True).item())

        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        sample = self.data[index]
        inputs = torch.tensor(sample['data'])
        labels_str = sample['labels']

        labels_list = [self.label_encoding[label] for label in labels_str]
        labels_tensor = torch.tensor(labels_list, dtype=torch.long)
        
        p_type = type_from_labels(labels_tensor)

        return {'data': inputs, 'labels': labels_tensor, 'type': p_type}

def collate_fn(batch):
    # sort the batch by sequence length in descending order
    batch = sorted(batch, key=lambda x: len(x['data']), reverse=True)
    
    # pad sequences for data
    data = [torch.tensor(sample['data']) for sample in batch]
    padded_data = pad_sequence(data, batch_first=True, padding_value=-1)

    # pad sequences for labels
    labels = [torch.tensor(sample['labels']) for sample in batch]
    padded_labels = pad_sequence(labels, batch_first=True, padding_value=-1)
    
    # pack the padded sequences for data
    lengths = torch.tensor([len(seq) for seq in data])

    types = torch.tensor([sample["type"] for sample in batch])
    
    return {'data': padded_data, 'labels': padded_labels, "lengths": lengths, "type": types} 


## Training and validation of neural network

Helper function for training and validating the neural network

In [7]:
def train_nn(model, trainloader, valloader, loss_function, optimizer, fold, experiment_file_path, device, num_epochs = 50, patience = 15, verbose = False):

    model.train()
    
    early_stopping = EarlyStopping(patience=patience, verbose=True, path = f"best_model_{model.hidden_size}_{model.num_layers}.pt")
    train_loss = []
    valid_loss = []
    top_acc = []
    
    for epoch in range(num_epochs):
        for _, batch in enumerate(trainloader):
            inputs, labels, lengths, types = batch['data'], batch['labels'], batch['lengths'], batch["type"]
            inputs, labels, lengths, types = inputs.to(device), labels.to(device), lengths.to(device), types.to(device)
            
            # receive output from rnn
            output = model(inputs)  
            
            # loss = loss_function(output.permute(0, 2, 1), labels)
            
            loss = 0    
            for l in range(output.shape[0]):
                # masking the zero-padded outputs
                batch_output = output[l][:lengths[l]]
                batch_labels = labels[l][:lengths[l]]
                
                # compute cross-entropy loss
                loss += loss_function(batch_output, batch_labels) / output.shape[0]
            
            # gradient update
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()  
        
        train_loss_epoch, _ = test_predictions(model = model, loader = trainloader, loss_function = loss_function,
                         cv = str(fold), experiment_file_path = experiment_file_path, condition = "train", epoch = str(epoch),
                         device = device)   

        valid_loss_epoch, valid_top_acc = test_predictions(model = model, loader = valloader, loss_function = loss_function,
                        cv = str(fold), experiment_file_path = experiment_file_path, condition = "val", epoch = str(epoch),
                        device = device)
        
        # receive mean loss for this epoch
        train_loss.append(train_loss_epoch)
        valid_loss.append(valid_loss_epoch)
        top_acc.append(valid_top_acc)
        
        # checking val loss for early stopping
        early_stopping(valid_top_acc, model)
        
        if early_stopping.early_stop:
            if verbose:
                print("Early stopping, best loss: ", train_loss[-16], -early_stopping.best_score)
            
            return train_loss[-16], -early_stopping.best_score
        
        if verbose:
            if epoch % 10 == 0:
                print("training loss: ", train_loss[-1], \
                    "\t validation loss: ", valid_loss[-1],
                    "\t early stop score:", -early_stopping.best_score)
    
    return train_loss[-1], top_acc[-1]

## Running the analysis

Configurations to be set before cross-validating

In [8]:
# set device to gpu if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
    torch.set_default_device("cuda:0")

In [9]:
# config
k_folds = 5
num_epochs = 100
batch_size = 32
loss_function = nn.CrossEntropyLoss(ignore_index = -1)
lr = 1e-3
tuning = [256, 256, 512, 512] # was previously [64, 128, 256, 512] but a tie between 256 and 512 was found.
layers = [1, 2, 1, 2]
verbose = False

# run the file create_latent_dataset for full dataset and change path to "encoder_proteins"
encoder_path = "encoder_proteins_test"

experiment_file_list = []
for i, j in zip(tuning, layers):
    experiment_file_list.append(f"stat_data_hs_{i}_l_{j}.json")
    experiment_json = {}
    open(experiment_file_list[-1], 'w').write(json.dumps(experiment_json))

# set fixed random number seed
torch.manual_seed(42)

<torch._C.Generator at 0x7fbb3888c230>

Load the data and concatenate in order to create the full dataset. Serves as the prerequisite to split the data according to DeepTMHMM in the cross-validation

In [10]:
# create cv's corresponding to deeptmhmm's
cvs = list(range(5))
kfolds = []
for idx, split in enumerate(range(5)):
    
    # make cycling list and define train/val/test splits
    idxs = np.asarray(list(islice(cycle(cvs), idx, idx + 5)))
    train_idx, val_idx, test_idx = idxs[:3], idxs[3], idxs[4]
    
    kfolds.append((train_idx, val_idx, test_idx))

# make on big concatenated dataset of all splits
data_cvs = np.squeeze([CustomDataset(os.path.join(encoder_path, folder), 'train') for folder in ['cv0', 'cv1', 'cv2', 'cv3' , 'cv4']])

Begin the actual analysis. Five-fold cross validation with hyperparameter-tuning and testing. JSON-files will be created. OBS: The code will take some time to run.

In [15]:
# k-fold cross validation
for fold, (train_ids, val_id, test_id) in enumerate(kfolds):    
    if verbose:
        print(f'\nFOLD {fold + 1}')
        print('--------------------------------')
    
    # concatenates the data from the different cv's
    training_data = np.concatenate(data_cvs[train_ids], axis = 0)
    
    # create weighted sampler with replacement to class balance
    y_train = torch.tensor([prot_type["type"] for prot_type in training_data])
    
    weight = torch.zeros(5)
    for t in torch.unique(y_train):
        class_sample_count = torch.tensor([len(torch.where(y_train == t)[0])])
        weight[t] = 1. / class_sample_count
    
    samples_weight = torch.tensor([weight[t] for t in y_train])
    
    sampler = WeightedRandomSampler(samples_weight.type('torch.DoubleTensor'), len(samples_weight), replacement = True, generator = torch.Generator(device = device))
    
    # define data loaders for train/val/test data in this fold (collate 0 pads for same-length)
    trainloader = DataLoader(
                        training_data, batch_size=batch_size, collate_fn=collate_fn, drop_last = False, generator = torch.Generator(device = device), sampler = sampler)

    valloader = DataLoader(data_cvs[val_id], batch_size=batch_size, shuffle=False, collate_fn=collate_fn, drop_last = False)
    testloader = DataLoader(data_cvs[test_id], batch_size=batch_size, shuffle=False, collate_fn=collate_fn, drop_last = False)
    
    val_acc_param = np.zeros((len(tuning)))
    
    # hyperparameter tune
    for idx, (param, layer) in enumerate(zip(tuning, layers)):
        experiment_file_path = experiment_file_list[idx] 
        
        if verbose:
            print(f'\nHIDDEN_SIZE {param}')
            print('--------------------------------')
        
        # define models to be analyzed
        model_rnn = RNN(512, param, 6, num_layer = layer)
        model_rnn.to(device)
        
        optimizer = optim.Adam(model_rnn.parameters(), lr = lr)
        
        # train and validate model
        train_loss, valid_top_acc = train_nn(model = model_rnn, 
                                        trainloader = trainloader, valloader = valloader,
                                        loss_function = loss_function, optimizer = optimizer, 
                                        fold = fold, experiment_file_path = experiment_file_path, 
                                        device = device, num_epochs = num_epochs, verbose = verbose)
        
        # save topology accuracies
        val_acc_param[idx] = valid_top_acc
    
    # test for the best model
    best_param_idx = val_acc_param.argmax()
    
    best_model = RNN(512, tuning[best_param_idx], 6, layers[best_param_idx])
    best_model.load_state_dict(torch.load(f"best_model_{tuning[best_param_idx]}_{layers[best_param_idx]}.pt"))
    best_model.eval()
    
    experiment_file_path = experiment_file_list[best_param_idx]
    
    if verbose:
        print(f"\nbest params for fold {fold + 1}: ", tuning[best_param_idx])  
    
    test_loss, test_top_acc = test_predictions(model = best_model,
                    loader = testloader,
                    loss_function = loss_function,
                    cv = str(fold), experiment_file_path = experiment_file_path,
                    device = device)

    if verbose:
        print(f"test loss for fold {fold + 1}: ", test_loss, \
            f"\ntest topology accuracy for fold {fold + 1}: ", test_top_acc)

  data = [torch.tensor(sample['data']) for sample in batch]
  labels = [torch.tensor(sample['labels']) for sample in batch]


Validation accuracy increased (-inf --> 0.600000).  Saving model ...
Validation accuracy increased (-inf --> 0.400000).  Saving model ...
Validation accuracy increased (0.400000 --> 0.600000).  Saving model ...
Validation accuracy increased (-inf --> 0.400000).  Saving model ...
Validation accuracy increased (0.400000 --> 0.600000).  Saving model ...
Validation accuracy increased (0.600000 --> 0.800000).  Saving model ...
Validation accuracy increased (-inf --> 0.600000).  Saving model ...
Validation accuracy increased (0.600000 --> 0.800000).  Saving model ...
Validation accuracy increased (-inf --> 0.600000).  Saving model ...
Validation accuracy increased (0.600000 --> 0.800000).  Saving model ...


KeyboardInterrupt: 

## Visualize results

The following cells include some helper functions for plotting and the plotting itself of the model which performed the best in a majority of the five folds. How to interpret the plots/figures is thoroughly reviewed in the paper.

REMARK: If you run the code using the encoder_proteins_test path, the plots that will be plotted are very bad due to the very very very small dataset. As there are only 9 samples in the training set and 3 in the validation set, and no guarantee that all protein types are present in the dataset (the first three were chosen from the original full dataset). Thus, if you want an actual representation of what the graphs look like, you should either run the code using the full dataset (see README) or have a look at the paper.

Helper functions for plot of training and validation loss

In [16]:
def plot_fn(data, conf_int, early_stops, title, ylabel, labels):
    plt.figure(figsize=(8, 4))
    for fold_data, conf_data, i in zip(data, conf_int, range(len(data))):
        label=labels[i]
        x = np.arange(len(fold_data))
        plt.plot(fold_data, label=label) # label for the best model    
        plt.fill_between(x, (fold_data - conf_data), (fold_data + conf_data), alpha = .1)
    plt.vlines(early_stops, ymin = 0, ymax = np.max(fold_data), linestyles = "dashed", color = "red", label = "early stop", alpha = .3)
    plt.ylabel(ylabel)
    plt.xlabel("Epoch")
    plt.title(title)
    plt.legend()
    plt.show()


def plot_stat(data, stat, conditions):
    condition_loss = []
    condition_conf = []
    early_stops = []
    for _, condition in enumerate(conditions):
        avg_epoch_loss = []
        conf_epoch = []
        
        for epoch in range(100):
            epoch_loss_per_fold = []

            for fold in data[condition]:
                if str(epoch) in data[condition][fold].keys():
                    epoch_loss_per_fold.append(data[condition][fold][str(epoch)][stat.lower()])

            if epoch_loss_per_fold:
                avg_epoch_loss.append(np.mean(epoch_loss_per_fold))
                conf_epoch.append(1.96 * np.std(epoch_loss_per_fold) / np.sqrt(len(epoch_loss_per_fold)))
        
           
        condition_loss.append(avg_epoch_loss)   
        condition_conf.append(conf_epoch) 

    for fold in data["val"]:
        early_stops.append(np.argmax([data["val"][fold][epochs]["topology"] for epochs in data["val"][fold].keys()]))
             
    plot_fn(np.asarray(condition_loss), np.asarray(condition_conf), np.asarray(early_stops), stat, stat, conditions)

Helper function for training and validation accuracy plot

In [17]:
def plot_acc(data, conditions, stats):
    condition_acc = []
    condition_conf = []
    early_stops = []
    for _, condition in enumerate(conditions):
        avg_epoch_acc = np.zeros((2, 100, 5))
        conf_epoch_acc = np.zeros((2, 100, 5))
        
        for epoch in range(100):
            epoch_acc_type_per_fold = [[], [], [], [], []]
            epoch_acc_top_per_fold = [[], [], [], [], []]

            for fold in data[condition]:
                count = 0
                if str(epoch) in data[condition][fold].keys():
                    for i, stat in enumerate(stats):
                        epoch_acc_type_per_fold[i].append(data[condition][fold][str(epoch)][stat.lower()]["type"])
                        epoch_acc_top_per_fold[i].append(data[condition][fold][str(epoch)][stat.lower()]["topology"])
                        count += 1

            if epoch_acc_type_per_fold:
                avg_epoch_acc[0, epoch, :] = np.mean(epoch_acc_type_per_fold, axis = 1)
                avg_epoch_acc[1, epoch, :] = np.mean(epoch_acc_top_per_fold, axis = 1)
                
                conf_epoch_acc[0, epoch, :] = (1.96 * np.std(epoch_acc_type_per_fold, axis = 1) / np.sqrt(len(epoch_acc_type_per_fold)))
                conf_epoch_acc[1, epoch, :] = (1.96 * np.std(epoch_acc_top_per_fold, axis = 1) / np.sqrt(len(epoch_acc_top_per_fold)))
        
        nans_acc = avg_epoch_acc[~np.isnan(avg_epoch_acc)].shape[0]
        nans_conf = avg_epoch_acc[~np.isnan(avg_epoch_acc)].shape[0]
        
        avg_epoc_acc = avg_epoch_acc[~np.isnan(avg_epoch_acc)].reshape((2, nans_acc//(2*5), 5))
        conf_epoch_acc = conf_epoch_acc[~np.isnan(avg_epoch_acc)].reshape((2, nans_conf//(2*5), 5))
        
        condition_acc.append(avg_epoc_acc)   
        condition_conf.append(conf_epoch_acc) 

    for fold in data[condition]:
        early_stops.append(np.argmax([data["val"][fold][epochs]["topology"] for epochs in data["val"][fold].keys()]))
     
    titles = ["type", "topology"]
    labels = ["train", "val"]
    colors = ["darkred", "darkorange", "darkgreen", "darkviolet", "darkblue"]
    plt.figure(figsize=(12, 7))
    for i, train_val in enumerate(condition_acc):
        for j, types in enumerate(train_val):
            plt.subplot(1, 2, j + 1)
            plt.gca().set_prop_cycle(cycler('color', colors))
            x = np.arange(types.shape[0])
            plt.plot(types, label = [stat + f"_{labels[i]}" for stat in stats], linestyle = "dashed" if labels[i] == "val" else "solid")
            
            for k, line in enumerate(types.T):
                l_bound =  (line - condition_conf[i][j, :, k])
                l_bound[(line - condition_conf[i][j, :, k]) < 0] = 0
                u_bound =  (line + condition_conf[i][j, :, k])
                u_bound[(line + condition_conf[i][j, :, k]) > 1] = 1
                
                plt.fill_between(x, l_bound, u_bound, alpha = .1)
            
            if i == 1:
                plt.ylabel("accuracy")
                plt.xlabel("epoch")
                plt.title(f"{titles[j]} accuracy")
                plt.legend(loc = "lower left")
                plt.vlines(early_stops, ymin = 0, ymax = np.max(np.max(train_val, axis = 0)), linestyles = "dashed", color = "red", label = "early stop", alpha = .3)
    plt.show()

Helper function for plot of heat map (the function does say confusion matrix, but it is not exactly a textbook confusion matrix)

In [18]:
def plot_confusion(data):
    plot_data = np.zeros((5,7))
    for split in data["test"]:
        confusion_matrix = data["test"][split]["0"]["confusion_matrix"]
        plot_data += confusion_matrix
    
    # percentage row-wise
    plot_data = plot_data / (np.sum(plot_data, axis = 1, keepdims = True) + 1e-9)

    row_labels = []
    column_labels = []
    for i in ["tm", "sptm", "sp+glob", "glob", "beta", "topology", "invalid"]: 
        row_labels.append(str(i) if i != "topology" and i != "invalid" else None)
        column_labels.append(str(i))

    fig, ax = plt.subplots(figsize=(10, 6))
    sns.heatmap(plot_data, ax=ax, annot=True, xticklabels=column_labels, yticklabels=row_labels, fmt='.3g', robust = True)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.show()

Plot everything

In [None]:
best = 0
best_model_path = ""
for idx, path in enumerate(experiment_file_list):
    f = open(path)
    file = json.load(f)
    if "test" in file.keys() and len(file["test"].keys()) > best:
        best_model_path = path

f = open(best_model_path)
data = json.load(f)

conditions = ["train", "val"]
#Plot loss average across splits/folds
plot_stat(data, "loss", conditions)

#Plot total and each accuracy avarged across splits/folds
plot_acc(data, conditions, ["tm", "sptm", "sp", "glob", "beta"])

# Plot confusion matrix as table 
plot_confusion(data)