**CNN** Net for DNA classification

In [1]:
"""
Main libraries
"""

import torch as th
import torch.nn as nn
import numpy as np
import pandas as pd
import tensorflow as tf

import random

In [2]:
"""
Only for the metrics analysis
"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

Functions for Onehot Encoding

In [3]:
"""
Functions for the onehot encoding
"""

def onehot_encoder(dataset):
    """
    Function that encodes a DNA dataset into a onehot encoding dataset.
    """
    onehot_dataset = [dna_onehot_encoder(dna_string) for dna_string in dataset]
    onehot_dataset_numpy = np.array(onehot_dataset)

    return onehot_dataset_numpy


def dna_onehot_encoder(dna_sequence):
    """
    Function that encodes a single DNA string into a onehot encoding string.
    """
    onehot_dict = {
        'A' : [1, 0, 0, 0],
        'C' : [0, 1, 0, 0],
        'G' : [0, 0, 1, 0],
        'T' : [0, 0, 0, 1]
    }
    encoder = [onehot_dict[nuc] for nuc in dna_sequence]

    return encoder

The CNN Model

In [4]:
"""
!- CNN Model
"""

class ConvNet(nn.Module):
    # We can use a differnet pool for each layer
    def __init__(self):
        super(ConvNet, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv1d(300, 200, kernel_size=2, padding=1),
            nn.BatchNorm1d(200),
            nn.PReLU(),
            nn.AvgPool1d(2),
            nn.Dropout1d(0.45)
        )

        # I remove random connection to help the convergency
        self.drop_out = nn.Dropout()

        self.layer2 = nn.Sequential(
            nn.Conv1d(200, 100, kernel_size=2, padding=1),
            nn.BatchNorm1d(100),
            nn.PReLU(),
            nn.AvgPool1d(2),
            nn.Dropout1d(0.45)
        )

        # I remove random connection to help the convergency
        self.drop_out = nn.Dropout()

        self.layer3 = nn.Sequential(
            nn.Conv1d(100, 75, kernel_size=2, padding=1),
            nn.BatchNorm1d(75),
            nn.PReLU(),
            nn.MaxPool1d(2),
            nn.Dropout1d(0.3)
        )

        self.layer4 = nn.Sequential(
            nn.Conv1d(75, 50, kernel_size = 2, padding = 1),
            nn.BatchNorm1d(50),
            nn.PReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.3)
        )


        self.layer5 = nn.Sequential(
            nn.Conv1d(50, 32, kernel_size=2, padding=1),
            nn.BatchNorm1d(32),
            nn.PReLU(),
            nn.MaxPool1d(2),
            nn.Dropout1d(0.3)
        )

        self.linear1 = nn.Linear(32, 128)
        self.linear2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.layer1(x)
        # print("Layer 1: ", x.shape)
        x = self.layer2(x)
        # print("Layer 2: ", x.shape)
        x = self.layer3(x)
        # print("Layer 3: ", x.shape)
        x = self.layer4(x)
        # print("Layer 4: ", x.shape)
        x = self.layer5(x)
        # print("Layer 5: ", x.shape)
        # Flatten the output for the linear layer
        x = x.view(x.size(0), -1)
        x = self.linear1(x)
        x = self.linear2(x)
        # print("Linear: ", x.shape)
        return x

Conversion of the data

In [5]:
"""
!- MAIN
"""

# Set the device to be used (GPU or CPU)
device = th.device("cuda" if th.cuda.is_available() else "cpu")
print("Device: ", device)

# Read the input from the cvc file
rel_path_train = '/content/fullset_train.csv'
rel_path_val = '/content/fullset_validation.csv'
rel_path_test = '/content/fullset_test.csv'

# Training Set

# Read the input from the csv file
train_csv = pd.read_csv(rel_path_train, sep=",")
# Drop the NaN values
train_csv = train_csv.dropna()
# Describe the data
print(train_csv.describe())

# Get the data from the csv file
train_data = train_csv.values
# m = number of input samples
m = train_data.shape[0]

X_train = train_data[:m,1]
Y_train = train_data[:m,2].astype(np.int32)

# OneHot encoding for the training data
print("Start onehot encoding for the training data")
X_train = onehot_encoder(X_train)

# Convert the data to a tensor
X_train = th.from_numpy(X_train).to(device)
Y_train = th.tensor(Y_train).to(device)

print("X_train shape: ", X_train.shape)
print("Y_train shape: ", Y_train.shape)

# Free memory
del train_csv, train_data, m

# Validation Set

# Read the input from the csv file
val_csv = pd.read_csv(rel_path_val, sep=",")
# Drop the NaN values
val_csv = val_csv.dropna()
# Describe the data
print(val_csv.describe())

val_data = val_csv.values
# m = number of input samples
m = val_data.shape[0]

X_val = val_data[:m,1]
Y_val = val_data[:m,2].astype(np.int32)

# OneHot encoding for the validation data
print("Start onehot encoding for the validation data")
X_val = onehot_encoder(X_val)

X_val = th.from_numpy(X_val).to(device)
Y_val = th.tensor(Y_val).to(device)

print("X_val shape", X_val.shape)
print("Y_val shape", Y_val.shape)

# Free memory
del val_csv, val_data, m

# Test

# Read the input from the csv file
test_csv = pd.read_csv(rel_path_test, sep=",")
# Drop the NaN values
test_csv = test_csv.dropna()
# Describe the data
print(test_csv.describe())

test_data = test_csv.values
# m = number of input samples
m = test_data.shape[0]

X_test = test_data[:m,1]
Y_test = test_data[:m,2].astype(np.int32)

# OneHot encoding for the test data
print("Start onehot encoding for the test data")
X_test = onehot_encoder(X_test)

X_test = th.from_numpy(X_test).to(device)
Y_test = th.tensor(Y_test).to(device)

print("X_test shape", X_test.shape)
print("Y_test shape", Y_test.shape)

# Free memory
del test_csv, test_data, m

Device:  cuda
                   0
count  211238.000000
mean        0.021142
std         0.143858
min         0.000000
25%         0.000000
50%         0.000000
75%         0.000000
max         1.000000
Start onehot encoding for the training data
X_train shape:  torch.Size([211238, 300, 4])
Y_train shape:  torch.Size([211238])
                  0
count  26404.000000
mean       0.020224
std        0.140769
min        0.000000
25%        0.000000
50%        0.000000
75%        0.000000
max        1.000000
Start onehot encoding for the validation data
X_val shape torch.Size([26404, 300, 4])
Y_val shape torch.Size([26404])
                  0
count  26404.000000
mean       0.020868
std        0.142945
min        0.000000
25%        0.000000
50%        0.000000
75%        0.000000
max        1.000000
Start onehot encoding for the test data
X_test shape torch.Size([26404, 300, 4])
Y_test shape torch.Size([26404])


Creation of **Training set** (211238), **Validation Set** (26404) and **Test Set** (26404) with random seed

In [6]:
"""
I merge the three tensor array to a big one and then, after a shuffle, I split the data into:
  - Training: 211238 data
  - Validation: 26404 data
  - Test: 26404 data

  Random seed: 2121346 (Chemello's ID number)

  EXAMPLE:

  a = X_train[:15].to(device)
  b = X_test[:15].to(device)

  print("Shape of a: ", a.shape)
  print("Shape of b: ", b.shape)

  c = th.concat((a, b), axis=0)

  print("Shape of c: ", c.shape)
"""

# Merge the arrays

print("Shape of X_train: ", X_train.shape)
print("Shape of X_val: ", X_val.shape)
print("Shape of X_test: ", X_test.shape)

X_data = th.concat((X_train, X_val), axis = 0).to(device)
X_data = th.concat((X_data, X_test), axis = 0).to(device)

print("Shape of data: ", X_data.shape)

del X_train, X_val, X_test

print("Shape of Y_train: ", Y_train.shape)
print("Shape of Y_val: ", Y_val.shape)
print("Shape of Y_test: ", Y_test.shape)

Y_data = th.concat((Y_train, Y_val), axis = 0).to(device)
Y_data = th.concat((Y_data, Y_test), axis = 0).to(device)

print("Shape of data: ", Y_data.shape)

del Y_train, Y_val, Y_test

# Random Seed
random_seed = 1234567
th.manual_seed(random_seed)

X_index_shuffle = th.randperm(X_data.size(0))
X_data_shuffled = X_data[X_index_shuffle]

Y_index_shuffle = th.randperm(Y_data.size(0))
Y_data_shuffled = Y_data[Y_index_shuffle]

print("X_data_shuffled shape: ", X_data_shuffled.shape)
print("Y_data_shuffled shape: ", Y_data_shuffled.shape)

# Split data into Training, Validation and Test

X_train = X_data_shuffled[:211238].to(device)
Y_train = Y_data_shuffled[:211238].to(device)
print("X_train shape: ", X_train.shape)
print("Y_train shape: ", Y_train.shape)

X_val = X_data_shuffled[211238:237642]
Y_val = Y_data_shuffled[211238:237642]
print("X_val shape: ", X_val.shape)
print("Y_val shape: ", Y_val.shape)

X_test = X_data_shuffled[237642:].to(device)
Y_test = Y_data_shuffled[237642:].to(device)
print("X_test shape: ", X_test.shape)
print("Y_test shape: ", Y_test.shape)

Shape of X_train:  torch.Size([211238, 300, 4])
Shape of X_val:  torch.Size([26404, 300, 4])
Shape of X_test:  torch.Size([26404, 300, 4])
Shape of data:  torch.Size([264046, 300, 4])
Shape of Y_train:  torch.Size([211238])
Shape of Y_val:  torch.Size([26404])
Shape of Y_test:  torch.Size([26404])
Shape of data:  torch.Size([264046])
X_data_shuffled shape:  torch.Size([264046, 300, 4])
Y_data_shuffled shape:  torch.Size([264046])
X_train shape:  torch.Size([211238, 300, 4])
Y_train shape:  torch.Size([211238])
X_val shape:  torch.Size([26404, 300, 4])
Y_val shape:  torch.Size([26404])
X_test shape:  torch.Size([26404, 300, 4])
Y_test shape:  torch.Size([26404])


Train and Validation

In [7]:
print("Start training the model")

model_CNN = ConvNet().to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = th.optim.AdamW(model_CNN.parameters(), lr=0.001)
scheduler = th.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, verbose=True)

# Early stopping parameters
patience = 3  # CHECK THE VALUE!!
best_val_loss = float('inf')
counter = 0

# Training the model
num_epochs = 100  # Hope to reach convergence before 100 epoches
train_loss_CNN, val_loss_CNN = [], []

for epoch in range(num_epochs):
    model_CNN.train()
    X_train = X_train.float()
    Y_train = Y_train.long()
    outputs = model_CNN(X_train.to(device))
    loss = criterion(outputs, Y_train.to(device))
    train_loss_CNN.append(loss.item())

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Validation the model
    model_CNN.eval()
    with th.no_grad():
        X_val = X_val.float()
        Y_val = Y_val.long()
        val_outputs = model_CNN(X_val.to(device))
        val_loss = criterion(val_outputs, Y_val.to(device))
        val_loss_CNN.append(val_loss.item())
        scheduler.step(val_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {loss.item():.4f}, Val Loss: {val_loss.item():.4f}')

    # Check for early stopping
    if val_loss <= best_val_loss:
        best_val_loss = val_loss
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

print("End training the model")

Start training the model


  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Epoch [1/100], Train Loss: 0.7806, Val Loss: 0.7529
Epoch [2/100], Train Loss: 0.6438, Val Loss: 0.7393
Epoch [3/100], Train Loss: 0.5263, Val Loss: 0.7206
Epoch [4/100], Train Loss: 0.4288, Val Loss: 0.6985
Epoch [5/100], Train Loss: 0.3494, Val Loss: 0.6723
Epoch [6/100], Train Loss: 0.2857, Val Loss: 0.6424
Epoch [7/100], Train Loss: 0.2359, Val Loss: 0.6079
Epoch [8/100], Train Loss: 0.1974, Val Loss: 0.5694
Epoch [9/100], Train Loss: 0.1686, Val Loss: 0.5268
Epoch [10/100], Train Loss: 0.1475, Val Loss: 0.4796
Epoch [11/100], Train Loss: 0.1328, Val Loss: 0.4291
Epoch [12/100], Train Loss: 0.1218, Val Loss: 0.3763
Epoch [13/100], Train Loss: 0.1151, Val Loss: 0.3223
Epoch [14/100], Train Loss: 0.1107, Val Loss: 0.2721
Epoch [15/100], Train Loss: 0.1085, Val Loss: 0.2276
Epoch [16/100], Train Loss: 0.1072, Val Loss: 0.1917
Epoch [17/100], Train Loss: 0.1077, Val Loss: 0.1643
Epoch [18/100], Train Loss: 0.1075, Val Loss: 0.1440
Epoch [19/100], Train Loss: 0.1088, Val Loss: 0.1297
Ep

In [8]:
print("Start testing the model")

# Check if the model is loaded
print(model_CNN)

X_test = X_test.to(device).float()

# Model ready for the evaluation
model_CNN.eval()

# Testing the model
with th.no_grad():
    outputs = model_CNN(X_test)
    _, predicted = th.max(outputs, 1)

# Convert the tensor to use scikit learn metrics
y_true = Y_test.cpu().numpy()
y_pred = predicted.cpu().numpy()

# Metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')

print(f'Accuracy: {accuracy:.6f}')
print(f'Precision: {precision:.6f}')
print(f'Recall: {recall:.6f}')
print(f'F1-score: {f1:.6f}')

print("End testing the model")


Start testing the model
ConvNet(
  (layer1): Sequential(
    (0): Conv1d(300, 200, kernel_size=(2,), stride=(1,), padding=(1,))
    (1): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): PReLU(num_parameters=1)
    (3): AvgPool1d(kernel_size=(2,), stride=(2,), padding=(0,))
    (4): Dropout1d(p=0.45, inplace=False)
  )
  (drop_out): Dropout(p=0.5, inplace=False)
  (layer2): Sequential(
    (0): Conv1d(200, 100, kernel_size=(2,), stride=(1,), padding=(1,))
    (1): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): PReLU(num_parameters=1)
    (3): AvgPool1d(kernel_size=(2,), stride=(2,), padding=(0,))
    (4): Dropout1d(p=0.45, inplace=False)
  )
  (layer3): Sequential(
    (0): Conv1d(100, 75, kernel_size=(2,), stride=(1,), padding=(1,))
    (1): BatchNorm1d(75, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): PReLU(num_parameters=1)
    (3): MaxPool1d(kernel_size=2, stride=2, padding