In [2]:
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn import metrics
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# Model definition

In [3]:
class CNN(nn.Module):
    def __init__(self, device):
        super(CNN, self).__init__()
        
        self.device = device

        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=7, padding=0)
        self.batchnorm = nn.BatchNorm1d(32)
        self.relu = nn.ReLU()
        self.flat = nn.Flatten()
        self.lin1 = nn.Linear(32576, 512)
        self.lin2 = nn.Linear(512, 1)
        self.sigmoid = nn.Sigmoid()
        
        self.loss = nn.functional.binary_cross_entropy
 

    def forward(self, X):
        X = self.conv1(X)
        X = self.batchnorm(X)
        X = self.relu(X)
        X = self.flat(X)
        X = self.lin1(X)
        X = self.lin2(X)
        X = self.sigmoid(X)
        return X
    
    def train_model(self, dataset, epochs):  
        model.train()
        optimizer = torch.optim.Adam(self.parameters())
        
        for epoch in range(epochs):
            with tqdm(dataset, unit="batch") as tepoch:
                for inputs, targets in tepoch:
                    
                    inputs, targets = inputs.to(self.device), targets.to(self.device)
                    tepoch.set_description(f"Epoch {epoch + 1}")
                    
                    # clear the gradients
                    optimizer.zero_grad()
                    # compute the model output
                    yhat = self(inputs.float())
                    # calculate accuracy
                    correct = (torch.round(yhat) == targets).sum().item()
                    accuracy = correct / len(inputs)
                    # calculate loss
                    loss = self.loss(yhat, targets.float())
                    # credit assignment
                    loss.backward()
                    # update model weights
                    optimizer.step()
                    
                    tepoch.set_postfix(loss=loss.item(), accuracy=100. * accuracy)
                
    def test(self, dataloader):
        model.eval()
        predictions, actuals = list(), list()
        
        with torch.no_grad():
            for inputs, targets in dataloader:
                inputs = inputs.to(self.device)
                
                # evaluate the model on the test set
                yhat = self(inputs.float())
                yhat = yhat.cpu().detach().numpy()
                actual = targets.numpy()
                # reshape for stacking
                actual = actual.reshape((len(actual), 1))
                yhat = yhat.reshape((len(yhat), 1))
                # store
                predictions.append(yhat)
                actuals.append(actual)
        predictions, actuals = np.vstack(predictions), np.vstack(actuals)
        print("Predictions: ", predictions[:10])
        print("Real labels: ", actuals[:10])
        # calculate accuracy
        pred_label = np.round(predictions)
        acc = metrics.accuracy_score(actuals, pred_label)
        f1 = metrics.f1_score(actuals, pred_label, average='binary', zero_division=0)
        precision, recall, thresholds = metrics.precision_recall_curve(actuals, predictions)
        auprc = metrics.auc(recall, precision)
        print(f"Test metrics: \n Accuracy: {float(acc):>6f}, F1 score: {float(f1):>6f}, AUPRC: {float(auprc):>6f}\n")
        return acc, f1, auprc


## Dataset preparation

In [5]:
class EmbeddingDataset(Dataset):
    def __init__(self, df):
        self.X = np.expand_dims(df.drop(columns=['label']), axis=1)
        self.y = np.expand_dims(df['label'].to_numpy(), axis=1)
        self.len = len(df)
    
    def __len__(self):
        return self.len
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def prepare_dataset(path):
    # prepare train and test df dataset
    df = pd.read_csv(path)
    df = df.drop(columns=['seq'])
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    return train_df, test_df

In [5]:
train_df, test_df = prepare_dataset("embeddings.csv")

In [33]:
print(f"Train dataset size: {len(train_df)}")
print(f"Test dataset size: {len(test_df)}\n")
print(train_df.head(5))

Train dataset size: 198755
Test dataset size: 49689

        label        f0        f1        f2        f3        f4        f5  \
195351      1  0.004589 -0.013615 -0.003538  0.005501  0.007457  0.008011   
205311      1  0.004341 -0.001395  0.004811  0.004091  0.004792 -0.000042   
2620        0  0.003129 -0.012673 -0.010728  0.003278  0.029388  0.016654   
96580       0 -0.000483 -0.003341 -0.000792 -0.002914  0.002960  0.005678   
171204      1 -0.005718 -0.009920 -0.006310 -0.001162  0.002309  0.001646   

              f6        f7        f8  ...     f1014     f1015     f1016  \
195351 -0.001625 -0.017335  0.004249  ...  0.007692 -0.022566 -0.005259   
205311 -0.004655 -0.006764 -0.000739  ...  0.003729 -0.010219 -0.008266   
2620    0.016626 -0.019582 -0.001566  ...  0.002420 -0.027691 -0.022426   
96580   0.002523 -0.002995 -0.002277  ...  0.001476 -0.002968 -0.002744   
171204  0.004880 -0.012419  0.001810  ...  0.015332 -0.008871 -0.009826   

           f1017     f1018     f1

In [34]:
train_dset = EmbeddingDataset(train_df)
train_loader = DataLoader(train_dset, batch_size=32, shuffle=True)
test_dset = EmbeddingDataset(test_df)
test_loader = DataLoader(test_dset, batch_size=1, shuffle=False)

In [6]:
# Run on GPU or CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

Using cuda device


## Model training

In [12]:
model = CNN(device=device).to(device)

In [15]:
print(model)

CNN(
  (conv1): Conv1d(1, 32, kernel_size=(7,), stride=(1,))
  (batchnorm): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
  (flat): Flatten(start_dim=1, end_dim=-1)
  (lin1): Linear(in_features=32576, out_features=512, bias=True)
  (lin2): Linear(in_features=512, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [None]:
model.train_model(train_loader, 10)

Epoch 1: 100%|██████████| 6212/6212 [00:33<00:00, 186.52batch/s, accuracy=100, loss=0.00112] 
Epoch 6: 100%|██████████| 6212/6212 [00:33<00:00, 186.38batch/s, accuracy=100, loss=0.00392] 
Epoch 7: 100%|██████████| 6212/6212 [00:34<00:00, 182.08batch/s, accuracy=100, loss=0.0241]  
Epoch 8: 100%|██████████| 6212/6212 [00:34<00:00, 181.40batch/s, accuracy=100, loss=3.62e-5] 
Epoch 9:   7%|▋         | 424/6212 [00:02<00:33, 171.44batch/s, accuracy=100, loss=8.14e-5] 

In [39]:
model.test(test_loader)

Predictions:  [[1.2111097e-03]
 [9.9999428e-01]
 [6.3317369e-09]
 [3.9129428e-04]
 [1.3323086e-05]
 [8.8208942e-09]
 [9.9999416e-01]
 [1.6985595e-09]
 [2.0679631e-06]
 [9.9999595e-01]]
Real labels:  [[0]
 [1]
 [0]
 [0]
 [0]
 [0]
 [1]
 [0]
 [0]
 [1]]
Test metrics: 
 Accuracy: 0.997424, F1 score: 0.997731, AUPRC: 0.999885



(0.9974239771377971, 0.9977305769298961, 0.9998846468275826)

In [40]:
torch.save(model.state_dict(), "embedding_CNN.pth")