<a href="https://colab.research.google.com/github/AtishayJain-AJ/DeepWAG/blob/master/Siamese_Network_and_MLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# This code has been modified from the original source code which can be found at - https://github.com/adambielski/siamese-triplet

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import auc
from sklearn.metrics import f1_score
from sklearn.neighbors import KNeighborsClassifier


import torch
from torch.optim import lr_scheduler
import torch.optim as optim
from torch.autograd import Variable

import numpy as np
import pickle
cuda = torch.cuda.is_available()

%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

from torch.utils.data import Dataset
from torch.utils.data.sampler import BatchSampler

import torch.nn as nn
import torch.nn.functional as F

In [None]:
class TripletLoss(nn.Module):
    """
    Triplet loss
    Takes embeddings of an anchor sample, a positive sample and a negative sample
    """

    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative, size_average=True):
        distance_positive = (anchor - positive).pow(2).sum(1).pow(.5)
        distance_negative = (anchor - negative).pow(2).sum(1).pow(.5)
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return losses.mean() if size_average else losses.sum()



def fit(train_loader, val_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval, metrics=[],
        start_epoch=0):
    """
    Loaders, model, loss function and metrics should work together for a given task,
    i.e. The model should be able to process data output of loaders,
    loss function should process target output of loaders and outputs from the model
    Examples: Classification: batch loader, classification model, NLL loss, accuracy metric
    Siamese network: Siamese loader, siamese model, contrastive loss
    Online triplet learning: batch loader, embedding model, online triplet loss
    """
    for epoch in range(0, start_epoch):
        scheduler.step()

    for epoch in range(start_epoch, n_epochs):
        scheduler.step()

        # Train stage
        train_loss, metrics = train_epoch(train_loader, model, loss_fn, optimizer, cuda, log_interval, metrics)

        message = 'Epoch: {}/{}. Train set: Average loss: {:.4f}'.format(epoch + 1, n_epochs, train_loss)
        for metric in metrics:
            message += '\t{}: {}'.format(metric.name(), metric.value())

        val_loss, metrics = test_epoch(val_loader, model, loss_fn, cuda, metrics)
        val_loss /= len(val_loader)

        message += '\nEpoch: {}/{}. Validation set: Average loss: {:.4f}'.format(epoch + 1, n_epochs,
                                                                                 val_loss)
        for metric in metrics:
            message += '\t{}: {}'.format(metric.name(), metric.value())

        print(message)


def train_epoch(train_loader, model, loss_fn, optimizer, cuda, log_interval, metrics):
    for metric in metrics:
        metric.reset()

    model.train()
    losses = []
    total_loss = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        target = target.long() if len(target) > 0 else None
        if not type(data) in (tuple, list):
            data = (data,)
        if cuda:
            data = tuple(d.cuda() for d in data)
            if target is not None:
                target = target.cuda()


        optimizer.zero_grad()
        outputs = model(*data)


        if type(outputs) not in (tuple, list):
            outputs = (outputs,)

        loss_inputs = outputs
        if target is not None:
            target = (target,)
            loss_inputs += target


        loss_outputs = loss_fn(*loss_inputs)
        loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
        regularization_loss = 0
        for param in model.parameters():
            regularization_loss = torch.sum(torch.abs(param))
            break

        loss = loss + 0.00001 * regularization_loss
        losses.append(loss.item())
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        for metric in metrics:
            metric(outputs, target, loss_outputs)

        if batch_idx % log_interval == 0:
            message = 'Train: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                batch_idx * len(data[0]), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), np.mean(losses))
            for metric in metrics:
                message += '\t{}: {}'.format(metric.name(), metric.value())

            print(message)
            losses = []

    total_loss /= (batch_idx + 1)
    return total_loss, metrics


def test_epoch(val_loader, model, loss_fn, cuda, metrics):
    with torch.no_grad():
        for metric in metrics:
            metric.reset()
        model.eval()
        val_loss = 0
        for batch_idx, (data, target) in enumerate(val_loader):
            target = target.long() if len(target) > 0 else None
            if not type(data) in (tuple, list):
                data = (data,)
            if cuda:
                data = tuple(d.cuda() for d in data)
                if target is not None:
                    target = target.cuda()

            outputs = model(*data)

            if type(outputs) not in (tuple, list):
                outputs = (outputs,)
            loss_inputs = outputs
            if target is not None:
                target = (target,)
                loss_inputs += target

            loss_outputs = loss_fn(*loss_inputs)
            loss = loss_outputs[0] if type(loss_outputs) in (tuple, list) else loss_outputs
            val_loss += loss.item()

            for metric in metrics:
                metric(outputs, target, loss_outputs)

    return val_loss, metrics


# Triplet generation function
class Triplet(Dataset):
    """
    Train: For each sample (anchor) randomly chooses a positive and negative samples
    Test: Creates fixed triplets for testing
    """

    def __init__(self, dataset, train):
        self.dataset =  dataset
        self.train = train

        if self.train:
            self.train_labels = self.dataset[:,0]
            self.train_data = self.dataset[:,1:]
            self.labels_set = set(self.train_labels)
            self.label_to_indices = {label: np.where(self.train_labels == label)[0]
                                     for label in self.labels_set}

        else:
            self.test_labels = self.dataset[:,0]
            self.test_data = self.dataset[:,1:]
            # generate fixed triplets for testing
            self.labels_set = set(self.test_labels)
            self.label_to_indices = {label: np.where(self.test_labels == label)[0]
                                     for label in self.labels_set}

            random_state = np.random.RandomState(29)

            triplets = [[i,
                         random_state.choice(self.label_to_indices[self.test_labels[i].item()]),
                         random_state.choice(self.label_to_indices[
                                                 np.random.choice(
                                                     list(self.labels_set - set([self.test_labels[i].item()]))
                                                 )
                                             ])
                         ]
                        for i in range(len(self.test_data))]
            self.test_triplets = triplets

    def __getitem__(self, index):
        if self.train:
            sample1, label1 = self.train_data[index], self.train_labels[index].item()
            positive_index = index
            while positive_index == index:
                positive_index = np.random.choice(self.label_to_indices[label1])
            negative_label = np.random.choice(list(self.labels_set - set([label1])))
            negative_index = np.random.choice(self.label_to_indices[negative_label])
            sample2 = self.train_data[positive_index]
            sample3 = self.train_data[negative_index]
        else:
            sample1 = self.test_data[self.test_triplets[index][0]]
            sample2 = self.test_data[self.test_triplets[index][1]]
            sample3 = self.test_data[self.test_triplets[index][2]]

        return (sample1, sample2, sample3), []

    def __len__(self):
        return len(self.dataset)

In [None]:
## MODELS USED FOR AMIKACIN

# Backbone network used for both Siamese network and MLP
class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()
        self.fc = nn.Sequential(nn.Linear(284449, 512),
                                nn.Sigmoid(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(512, 256),
                                nn.PReLU(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(256, 64),
                                nn.PReLU(),
                                nn.Linear(64, 16)
                                )

    def forward(self, x):
        x = x.float()
        output = self.fc(x)
        output = output.float()
        return output

    def get_embedding(self, x):
        return self.forward(x)

# Siamese Network
class TripletNet(nn.Module):
    def __init__(self, embedding_net):
      
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2, x3):
        output1 = self.embedding_net(x1).float()
        output2 = self.embedding_net(x2).float()
        output3 = self.embedding_net(x3).float()
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)
    
# MLP
class ClassificationNet(nn.Module):
    def __init__(self, embedding_net, n_classes):
        super(ClassificationNet, self).__init__()
        self.embedding_net = embedding_net
        self.n_classes = n_classes
        self.nonlinear = nn.PReLU()
        self.fc1 = nn.Linear(16, n_classes)

    def forward(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores

    def get_embedding(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores

In [None]:
## MODELS USED FOR OFLOXACIN

# Backbone network used for both Siamese network and MLP
class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()
        self.fc = nn.Sequential(nn.Linear(103954, 2048),
                                nn.Sigmoid(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(2048, 1024),
                                nn.Sigmoid(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(1024, 512),
                                nn.PReLU(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(512, 256),
                                nn.PReLU(),
                                nn.Linear(256, 16)
                                )

    def forward(self, x):
        x = x.float()
        output = self.fc(x)
        output = output.float()
        return output

    def get_embedding(self, x):
        return self.forward(x)

# Siamese Network
class TripletNet(nn.Module):
    def __init__(self, embedding_net):
      
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2, x3):
        output1 = self.embedding_net(x1).float()
        output2 = self.embedding_net(x2).float()
        output3 = self.embedding_net(x3).float()
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)

# MLP
class ClassificationNet(nn.Module):
    def __init__(self, embedding_net, n_classes):
        super(ClassificationNet, self).__init__()
        self.embedding_net = embedding_net
        self.n_classes = n_classes
        self.nonlinear = nn.PReLU()
        self.fc1 = nn.Linear(16, n_classes)

    def forward(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores

    def get_embedding(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores


In [None]:
## MODELS USED FOR ETHIONAMIDE

# Backbone network used for both Siamese network and MLP
class EmbeddingNet(nn.Module):
    def __init__(self):
        super(EmbeddingNet, self).__init__()

        self.fc = nn.Sequential(nn.Linear(1503, 512),
                                nn.PReLU(),
                                nn.Dropout(p = 0.4),
                                nn.Linear(512, 256),
                                nn.PReLU(),
                                nn.Linear(256, 16)
                                )

    def forward(self, x):
        x = x.float()
        output = self.fc(x)
        output = output.float()
        return output

    def get_embedding(self, x):
        return self.forward(x)

# Siamese Network
class TripletNet(nn.Module):
    def __init__(self, embedding_net):
      
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, x1, x2, x3):
        output1 = self.embedding_net(x1).float()
        output2 = self.embedding_net(x2).float()
        output3 = self.embedding_net(x3).float()
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)

# MLP
class ClassificationNet(nn.Module):
    def __init__(self, embedding_net, n_classes):
        super(ClassificationNet, self).__init__()
        self.embedding_net = embedding_net
        self.n_classes = n_classes
        self.nonlinear = nn.PReLU()
        self.fc1 = nn.Linear(16, n_classes)

    def forward(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores

    def get_embedding(self, x):
        output = self.embedding_net(x)
        output = self.nonlinear(output)
        scores = F.log_softmax(self.fc1(output), dim=-1)
        return scores

Training MLP



In [None]:
path = "/content/drive/MyDrive/bacteria data/ofloxacin/" #Folder containing X and Y (X is the feature matrix and Y has the target labels)
X = np.load(path+"X.npy")
Y = np.load(path+"Y.npy")

In [None]:
Y = (Y+1)/2 #the labels are saved as -1 and 1, this line converts them to 0 and 1

In [None]:
data = np.c_[Y,X]

In [None]:
np.random.shuffle(data)

In [None]:
n_samples = data.shape[0]
num_train = round(n_samples * 0.6)
num_val = round(n_samples * 0.2)
train_dataset = data[:num_train]
val_dataset = data[num_train:num_train+num_val]
test_dataset = data[num_val+num_train:]

In [None]:
print(train_dataset.shape, val_dataset.shape, test_dataset.shape)

In [None]:
mlp_train_dataset = []
for i in train_dataset:
  mlp_train_dataset.append((i[1:],i[0]))

mlp_val_dataset = []
for i in val_dataset:
  mlp_val_dataset.append((i[1:],i[0]))

mlp_test_dataset = []
for i in test_dataset:
  mlp_test_dataset.append((i[1:],i[0]))


In [None]:
batch_size = 700
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
train_loader = torch.utils.data.DataLoader(mlp_train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
val_loader = torch.utils.data.DataLoader(mlp_val_dataset, batch_size=batch_size, shuffle=False, **kwargs)
test_loader = torch.utils.data.DataLoader(mlp_test_dataset, batch_size=batch_size, shuffle=False, **kwargs)

# Set up the network and training parameters


embedding_net = EmbeddingNet()
model = ClassificationNet(embedding_net, n_classes=2)
if cuda:
    model.cuda()
loss_fn = torch.nn.NLLLoss()
lr = 0.001
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
n_epochs = 100
log_interval = 50

In [None]:
fit(train_loader, test_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval, [])

In [None]:
classes = ['Susceptible', 'Resistant']
class_no = [0,1]
colors = ['#1f77b4', '#ff7f0e']


# Can be used to plot the first 2 dimensions of the embeddings
def plot_embeddings(embeddings, targets, xlim=None, ylim=None):
    plt.figure(figsize=(10,10))
    for i in range(10):
        inds = np.where(targets==i)[0]
        plt.scatter(embeddings[inds,0], embeddings[inds,1], alpha=0.5, color=colors[i])
    if xlim:
        plt.xlim(xlim[0], xlim[1])
    if ylim:
        plt.ylim(ylim[0], ylim[1])
    plt.legend(classes)

def extract_embeddings(dataloader, model):
    with torch.no_grad():
        model.eval()
        embeddings = np.zeros((len(dataloader.dataset), 2))
        labels = np.zeros(len(dataloader.dataset))
        k = 0
        for images, target in dataloader:
            if cuda:
                images = images.cuda()
            embeddings[k:k+len(images)] = model.get_embedding(images).data.cpu().numpy()
            labels[k:k+len(images)] = target.numpy()
            k += len(images)
    return embeddings, labels

In [None]:
train_embeddings_tl, train_labels_tl = extract_embeddings(train_loader, model)
val_embeddings_tl, val_labels_tl = extract_embeddings(val_loader, model)
test_embeddings_tl, test_labels_tl = extract_embeddings(test_loader, model)


In [None]:
# Obtain the validation and test metrics (auroc, auprc, f1 score)

val_y = np.argmax(val_embeddings_tl, axis = 1)
val_auroc = roc_auc_score(val_labels_tl, val_embeddings_tl[:, 1])
precision, recall, threshold = precision_recall_curve(val_labels_tl, val_embeddings_tl[:, 1])
val_auprc = auc(recall, precision)
val_f1 = f1_score(val_labels_tl, val_y)

test_y = np.argmax(test_embeddings_tl, axis = 1)
test_auroc = roc_auc_score(test_labels_tl, test_embeddings_tl[:,1])
precision, recall, threshold = precision_recall_curve(test_labels_tl, test_embeddings_tl[:,1])
test_auprc = auc(recall, precision)
test_f1 = f1_score(test_labels_tl, test_y)

In [None]:
print("Validation AUROC -", val_auroc)
print("Validation AUPRC -", val_auprc)
print("Validation F1 Score -", val_f1)

print("Test AUROC -", test_auroc)
print("Test AUPRC -", test_auprc)
print("Test F1 Score -", test_f1)

Training Siamese Network

In [None]:
path = "/content/drive/MyDrive/bacteria data/ofloxacin/" #Folder containing X and Y (X is the feature matrix and Y has the target labels)
X = np.load(path+"X.npy")
Y = np.load(path+"Y.npy")

data = np.c_[Y,X]

In [None]:
np.random.shuffle(data)

In [None]:
n_samples = data.shape[0]
num_train = round(n_samples * 0.6)
num_val = round(n_samples * 0.2)
train_dataset = data[:num_train]
val_dataset = data[num_train:num_train+num_val]
test_dataset = data[num_val+num_train:]

In [None]:
print(train_dataset.shape, val_dataset.shape, test_dataset.shape)

In [None]:
triplet_train_dataset = Triplet(train_dataset, True) # Returns triplets of train dataset
triplet_val_dataset = Triplet(val_dataset, False) # Returns triplets of validation dataset
batch_size = 700
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
triplet_train_loader = torch.utils.data.DataLoader(triplet_train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
triplet_val_loader = torch.utils.data.DataLoader(triplet_val_dataset, batch_size=batch_size, shuffle=False, **kwargs)

# Set up the network and training parameters

## HYPERPARAMETERS
margin = 1.
embedding_net = EmbeddingNet()
model = TripletNet(embedding_net)
model = model.float()
if cuda:
    model.cuda()
loss_fn = TripletLoss(margin)
lr = 0.005
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = lr_scheduler.StepLR(optimizer, 8, gamma=0.1, last_epoch=-1)
n_epochs = 100
log_interval = 100

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, **kwargs)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, **kwargs)

In [None]:
fit(triplet_train_loader, triplet_val_loader, model, loss_fn, optimizer, scheduler, n_epochs, cuda, log_interval)

In [None]:
classes = ['Susceptible', 'Resistant']
class_no = [-1,1]
colors = ['#1f77b4', '#ff7f0e']

# Can be used to plot the first 2 dimensions of the embeddings
def plot_embeddings(embeddings, targets, xlim=None, ylim=None):
    plt.figure(figsize=(10,10))
    for i in range(2):
        inds = np.where(targets==class_no[i])[0]
        plt.scatter(embeddings[inds,0], embeddings[inds,1], alpha=0.5, color=colors[i])
    if xlim:
        plt.xlim(xlim[0], xlim[1])
    if ylim:
        plt.ylim(ylim[0], ylim[1])
    plt.legend(classes)

def extract_embeddings(dataloader, model):
    with torch.no_grad():
        model.eval()
        embeddings = np.zeros((len(dataloader.dataset), 16))
        labels = np.zeros(len(dataloader.dataset))
        k = 0
        for i in dataloader:
            
            target = i[:, 0]
            images = i[:, 1:]
            if cuda:
                images = images.cuda()
            embeddings[k:k+len(images)] = model.get_embedding(images).data.cpu().numpy()
            labels[k:k+len(images)] = target.numpy()
            k += len(images)
    return embeddings, labels

In [None]:
train_embeddings_tl, train_labels_tl = extract_embeddings(train_loader, model)
val_embeddings_tl, val_labels_tl = extract_embeddings(val_loader, model)
test_embeddings_tl, test_labels_tl = extract_embeddings(test_loader, model)


In [None]:
# KNN algorithm to perform classification on the siamese network embeddings

neigh = KNeighborsClassifier(n_neighbors=15, weights = 'distance', algorithm='auto')
neigh.fit(train_embeddings_tl, train_labels_tl)

In [None]:
val_proba = neigh.predict_proba(val_embeddings_tl)
test_proba = neigh.predict_proba(test_embeddings_tl)
train_proba = neigh.predict_proba(train_embeddings_tl)
val_y = neigh.predict(val_embeddings_tl)
test_y = neigh.predict(test_embeddings_tl)
train_y = neigh.predict(train_embeddings_tl)

In [None]:
# Obtain the validation and test metrics (auroc, auprc, f1 score)

val_auroc = roc_auc_score(val_labels_tl, val_proba[:, 1])
precision, recall, threshold = precision_recall_curve(val_labels_tl, val_proba[:, 1])
val_auprc = auc(recall, precision)
val_f1 = f1_score(val_labels_tl, val_y)

test_auroc = roc_auc_score(test_labels_tl, test_proba[:, 1])
precision, recall, threshold = precision_recall_curve(test_labels_tl, test_proba[:, 1])
test_auprc = auc(recall, precision)
test_f1 = f1_score(test_labels_tl, test_y)

In [None]:
print("Validation AUROC -", val_auroc)
print("Validation AUPRC -", val_auprc)
print("Validation F1 Score -", val_f1)

print("Test AUROC -", test_auroc)
print("Test AUPRC -", test_auprc)
print("Test F1 Score -", test_f1)