In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# Function to load and preprocess the genome sequence from a FASTA file
def load_genome(file_path):
    sequence = []
    with open(file_path, 'r') as file:
        next(file)  # Skip the header
        for line in file:
            sequence.extend(line.strip())
    # Encode the nucleotides
    nucleotide_map = {'A': 0, 'C': 1, 'G': 2, 'T': 3}
    return [nucleotide_map[nuc] for nuc in sequence if nuc in nucleotide_map]

class NucleotideDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, index):
        seq = torch.tensor(self.sequences[index][:-1], dtype=torch.long)
        target = torch.tensor(self.sequences[index][1:], dtype=torch.long)
        return torch.nn.functional.one_hot(seq, num_classes=4).float().to(torch.device("cuda")), target.to(torch.device("cuda"))

class RNNModel(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)
    
    def forward(self, x, hidden):
        x, hidden = self.rnn(x, hidden)
        x = self.fc(x)
        return x, hidden
    
    def init_hidden(self, batch_size):
        return torch.zeros(self.rnn.num_layers, batch_size, self.rnn.hidden_size)


def train_model(model, train_loader, device, optimizer, loss_fn, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        total_correct = 0
        total_samples = 0
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}')
        for seq_batch, target_batch in progress_bar:
            seq_batch, target_batch = seq_batch.to(device), target_batch.to(device)
            optimizer.zero_grad()
            hidden = model.init_hidden(seq_batch.size(0)).to(device)
            output, hidden = model(seq_batch, hidden)
            loss = loss_fn(output.transpose(1, 2), target_batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            # Calculate accuracy
            predicted = output.argmax(dim=2)
            correct = (predicted == target_batch).sum().item()
            total_correct += correct
            total_samples += target_batch.numel()
            accuracy = 100 * total_correct / total_samples
            # Update the progress bar
            progress_bar.set_postfix(loss=f'{loss.item():.4f}', accuracy=f'{accuracy:.2f}%')
        average_loss = total_loss / len(train_loader)
        print(f'Epoch {epoch + 1} Loss: {average_loss:.4f}, Accuracy: {accuracy:.2f}%')

def evaluate_model(model, test_loader, device, loss_fn):
    model.eval()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for seq_batch, target_batch in test_loader:
            seq_batch, target_batch = seq_batch.to(device), target_batch.to(device)
            hidden = model.init_hidden(seq_batch.size(0)).to(device)
            output, hidden = model(seq_batch, hidden)
            loss = loss_fn(output.transpose(1, 2), target_batch)
            total_loss += loss.item()
            # Calculate accuracy
            predicted = output.argmax(dim=2)
            correct = (predicted == target_batch).sum().item()
            total_correct += correct
            total_samples += target_batch.numel()
        average_loss = total_loss / len(test_loader)
        accuracy = 100 * total_correct / total_samples
        print(f'Test Loss: {average_loss:.4f}, Accuracy: {accuracy:.2f}%')

def main():
    file_path = 'Ecoli_GCF_003018035.1_ASM301803v1_genomic (1).fna'  # Update this path
    encoded_sequence = load_genome(file_path)
    sequences = [encoded_sequence[i:i+101] for i in range(len(encoded_sequence) - 100)]
    train_seq, test_seq = train_test_split(sequences, test_size=0.2, random_state=42)
    train_dataset = NucleotideDataset(train_seq)
    train_loader = DataLoader(train_dataset, batch_size=2048, shuffle=True)
    test_dataset = NucleotideDataset(test_seq)
    test_loader = DataLoader(test_dataset, batch_size=2048, shuffle=False)

    device = torch.device("cuda" if torch.cuda.is_available() else "mps")
    model = RNNModel(input_size=4, output_size=4, hidden_dim=128, n_layers=2)
    model.to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.005)

    train_model(model, train_loader, device, optimizer, loss_fn, num_epochs=10)
    test_loss = evaluate_model(model, test_loader, device, loss_fn)
    print(f'Test Loss: {test_loss:.4f}')

if __name__ == '__main__':
    main()


Epoch 1/10: 100%|██████████| 2306/2306 [18:24<00:00,  2.09it/s, accuracy=35.72%, loss=1.3178]  


Epoch 1 Loss: 1.3249, Accuracy: 35.72%


Epoch 2/10: 100%|██████████| 2306/2306 [19:26<00:00,  1.98it/s, accuracy=36.90%, loss=1.3112]  


Epoch 2 Loss: 1.3115, Accuracy: 36.90%


Epoch 3/10: 100%|██████████| 2306/2306 [22:15<00:00,  1.73it/s, accuracy=37.18%, loss=1.3080]  


Epoch 3 Loss: 1.3084, Accuracy: 37.18%


Epoch 4/10: 100%|██████████| 2306/2306 [24:43<00:00,  1.55it/s, accuracy=37.32%, loss=1.3090]  


Epoch 4 Loss: 1.3070, Accuracy: 37.32%


Epoch 5/10: 100%|█████████▉| 2296/2306 [22:59<00:05,  1.97it/s, accuracy=37.39%, loss=1.3071]  