<a href="https://colab.research.google.com/github/BenxiaHu/DeepSilencer/blob/master/CNN_Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
import os
import torch
from torch import nn, Tensor
import torch.optim as optim
import numpy as np
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

# Define a toy dataset (replace with your own dataset)
# For this example, we'll use random sequences and labels
num_sequences = 2000
sequence_length = 200
num_classes = 2
sequences = np.random.choice(['A', 'C', 'G', 'T'], size=(num_sequences, sequence_length))
labels = np.random.randint(0, num_classes, size=num_sequences)

# Convert DNA sequences to one-hot encoding
def one_hot_encoding(sequences):
    mapping = {'A': 0, 'C': 1, 'G': 2, 'T': 3}
    one_hot_sequences = np.zeros((len(sequences), 4, len(sequences[0])))
    for i, seq in enumerate(sequences):
        for j, base in enumerate(seq):
            one_hot_sequences[i, mapping[base], j] = 1
    return one_hot_sequences

one_hot_sequences = one_hot_encoding(sequences)

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(one_hot_sequences, labels, test_size=0.3, random_state=42)

class Residual(nn.Module):
    '''Residual Block'''
    def __init__(self, module):
        super().__init__()
        self.module = module

    def forward(self, inputs):
        return self.module(inputs) + inputs

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, max_len: int = 5000):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            max_len - Maximum length of a sequence to expect.
        """
        super().__init__()

        # Create matrix of [SeqLen, HiddenDim] representing the positional encoding for max_len inputs
        position = torch.arange(0,max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)

        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        # Used for tensors that need to be on the same device as the module.
        # persistent=False tells PyTorch to not add the buffer to the state dict (e.g. when we save the model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x


# CNN Block with Residual Connection
class CNNBlock(nn.Module):
    def __init__(self, input_size):
        super(CNNBlock, self).__init__()
        self.conv1 = nn.Conv1d(input_size,64, kernel_size=1,padding=2)
        self.BP1 = nn.BatchNorm1d(num_features=64)
        self.relu = Residual(nn.ReLU())
        self.maxpool1 = nn.MaxPool1d(2)

        self.conv2 = nn.Conv1d(64, 128, kernel_size=1,padding=2)
        self.BP2 = nn.BatchNorm1d(num_features=128)
        self.maxpool2 = nn.MaxPool1d(2)

        self.conv2 = nn.Conv1d(128, 256, kernel_size=1,padding=2)
        self.BP3 = nn.BatchNorm1d(num_features=256)
        self.maxpool3 = nn.MaxPool1d(2)

    def forward(self, x):
        out1 = self.conv1(x)
        out1 = self.BP1(out1)
        out1 = self.relu(out1)
        out1 = self.maxpool1(out1)

        out2 = self.relu(out1)
        out2 = self.BP1(out2)
        out2 = self.relu(out2)
        out2 = self.maxpool2(out2)

        out3 = self.relu(out2)
        out3 = self.BP1(out3)
        out3 = self.relu(out3)
        out3 = self.maxpool3(out3)
        return out3

# Define a CNN-Transformer model
class CNNTransformer(nn.Module):
    def __init__(self,input_size: int,ntoken: int,d_model: int, num_encoder_layers, num_classes, nhead=8,dropout: float = 0.5):#
        super().__init__()
        self.model_type = 'CNNTransformer'
        self.cnn_block = CNNBlock(input_size)
        self.pos_encoder = PositionalEncoding(d_model, 64)
        # TransformerEncoderLayer and nn.Embedding
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_encoder_layers)
        self.d_model = d_model
        self.fc = nn.Sequential(
            nn.Linear(1600, 256),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(256,64),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(64,32),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(32, num_classes)
        )

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        cnn_output = self.cnn_block(src)
        #print(cnn_output.shape)           # torch.Size([32, 64, 50])
        cnn_output = cnn_output.transpose(1,2) # reshape input to: (batch, seq, feature)
        #print(cnn_output.shape)          # torch.Size([32, 50, 64])
        cnn_output = self.pos_encoder(cnn_output)

        # Transformer encoding
        trans_out = self.transformer_encoder(cnn_output)
        trans_out = trans_out.transpose(1,2) # onvert to (batch, seq, channel)
        #print(trans_out.shape)           # torch.Size([32, 64, 25])
        trans_out = trans_out.reshape(trans_out.size(0), -1)
        #avg_pooled = avg_pooled.transpose(0,1)
        #print(trans_out.shape)           # torch.Size([32, 1600])
        output = self.fc(trans_out)
        #print(output.shape)              # torch.Size([64, 32])
        return output


# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

# Create DataLoader for training and testing
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize and train the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ntoken = 4  # size of vocabulary
num_encoder_layers = 6  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 8  # number of heads in ``nn.MultiheadAttention``
dropout = 0.4  # dropout probability

model = CNNTransformer(input_size=4,ntoken=4,d_model=64, num_encoder_layers=8, num_classes=2, nhead=8,dropout = 0.5).to(device)

lr = 0.001  # learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
criterion = nn.CrossEntropyLoss()

num_epochs = 1000


model.train()
    #total_loss = 0
    #log_interval = 32
for epoch in range(num_epochs):
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()

    if epoch % 50 ==0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


# Test the model
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}%')


Epoch [1/1000], Loss: 0.6643
Epoch [51/1000], Loss: 0.6886
Epoch [101/1000], Loss: 0.6972
Epoch [151/1000], Loss: 0.6977
