## Exercise 1: DNA RNN Model

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np

# ------------------ Load and preprocess the genome ------------------
# Load file
with open("Ecoli_GCF_003018035.1_ASM301803v1_genomic.fna", "r") as f:
    lines = f.readlines()

# Clean sequence (remove FASTA headers and newlines)
sequence = ''.join([line.strip() for line in lines if not line.startswith('>')])

# Vocabulary mapping
vocab = ['A', 'C', 'G', 'T']
char_to_idx = {ch: idx for idx, ch in enumerate(vocab)}
idx_to_char = {idx: ch for ch, idx in char_to_idx.items()}

# Encode the sequence
encoded_seq = np.array([char_to_idx[ch] for ch in sequence if ch in char_to_idx], dtype=np.int64)

# Train-test split
split_idx = int(len(encoded_seq) * 0.8)
train_data = encoded_seq[:split_idx]
test_data = encoded_seq[split_idx:]

# ------------------ PyTorch Dataset ------------------
class DNADataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_length]
        y = self.data[idx + 1:idx + self.seq_length + 1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# Parameters
seq_length = 100
batch_size = 64

# DataLoaders
train_ds = DNADataset(train_data, seq_length)
test_ds = DNADataset(test_data, seq_length)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

# ------------------ RNN Model ------------------
class RNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True)
        self.fc = nn.Linear(rnn_hidden_size, vocab_size)

    def forward(self, x, hidden, cell):
        out = self.embedding(x)  # shape: (batch, seq_len, embed_dim)
        out, (hidden, cell) = self.rnn(out, (hidden, cell))
        out = self.fc(out)
        return out, hidden, cell

    def init_hidden(self, batch_size, device):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        return hidden, cell

# ------------------ Training ------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = len(vocab)
embed_dim = 256
rnn_hidden_size = 512

model = RNN(vocab_size, embed_dim, rnn_hidden_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

num_epochs = 4000

for epoch in range(num_epochs):
    model.train()
    hidden, cell = model.init_hidden(batch_size, device)
    seq_batch, target_batch = next(iter(train_dl))
    seq_batch = seq_batch.to(device)
    target_batch = target_batch.to(device)
    optimizer.zero_grad()
    loss = 0

    for c in range(seq_length):
        pred, hidden, cell = model(seq_batch[:, c].unsqueeze(1), hidden, cell)
        pred = pred.squeeze(1)
        loss += loss_fn(pred, target_batch[:, c])

    loss.backward()
    optimizer.step()
    loss = loss.item() / seq_length

    if epoch % 500 == 0:
        print(f"Epoch {epoch}, Loss: {loss:.4f}")


Epoch 0, Loss: 1.3872
Epoch 500, Loss: 1.3531
Epoch 1000, Loss: 1.3394
Epoch 1500, Loss: 1.3391
Epoch 2000, Loss: 1.3326
Epoch 2500, Loss: 1.3307
Epoch 3000, Loss: 1.3230
Epoch 3500, Loss: 1.3147


In [2]:
# ------------------ Testing ------------------
model.eval()

test_loss = 0
with torch.no_grad():
    for seq_batch, target_batch in test_dl:
        seq_batch, target_batch = seq_batch.to(device), target_batch.to(device)
        hidden, cell = model.init_hidden(seq_batch.size(0), device)
        output, _, _ = model(seq_batch, hidden, cell)
        
        # Reshape for loss
        output = output.view(-1, vocab_size)
        target_batch = target_batch.view(-1)
        
        # Compute loss
        loss = loss_fn(output, target_batch)
        test_loss += loss.item()

avg_test_loss = test_loss / len(test_dl)
print(f"Test Loss: {avg_test_loss:.4f}")


Test Loss: 1.3236


## Exercise 2: DNA (k-mer) with RNN Model

In [5]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from collections import Counter

# ------------------ Load and preprocess the genome ------------------
# Load file
with open("Ecoli_GCF_003018035.1_ASM301803v1_genomic.fna", "r") as f:
    lines = f.readlines()

# Clean sequence (remove FASTA headers and newlines)
sequence = ''.join([line.strip() for line in lines if not line.startswith('>')])

# Function to generate k-mers
def generate_kmers(sequence, k=4):
    kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)]
    return kmers

# Generate k-mers
k = 4  # You can change this to 3
kmers = generate_kmers(sequence, k)

# Create vocabulary of k-mers
kmer_counts = Counter(kmers)
vocab = list(kmer_counts.keys())
char_to_idx = {kmer: idx for idx, kmer in enumerate(vocab)}
idx_to_char = {idx: kmer for kmer, idx in char_to_idx.items()}

# Encode the sequence of k-mers
encoded_seq = np.array([char_to_idx[kmer] for kmer in kmers], dtype=np.int64)

# Train-test split
split_idx = int(len(encoded_seq) * 0.8)
train_data = encoded_seq[:split_idx]
test_data = encoded_seq[split_idx:]

# ------------------ PyTorch Dataset ------------------
class KmerDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_length]
        y = self.data[idx + 1:idx + self.seq_length + 1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# Parameters
seq_length = 50  # Number of k-mers in a sequence
batch_size = 64

# DataLoaders
train_ds = KmerDataset(train_data, seq_length)
test_ds = KmerDataset(test_data, seq_length)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

# ------------------ RNN Model ------------------
class KmerRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn_hidden_size = rnn_hidden_size
        self.rnn = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True)
        self.fc = nn.Linear(rnn_hidden_size, vocab_size)

    def forward(self, x, hidden, cell):
        out = self.embedding(x)  # shape: (batch, seq_len, embed_dim)
        out, (hidden, cell) = self.rnn(out, (hidden, cell))
        out = self.fc(out)
        return out, hidden, cell

    def init_hidden(self, batch_size, device):
        hidden = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        cell = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        return hidden, cell

# ------------------ Training ------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = len(vocab)
embed_dim = 256
rnn_hidden_size = 512

model = KmerRNN(vocab_size, embed_dim, rnn_hidden_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 1000

for epoch in range(num_epochs):
    model.train()
    hidden, cell = model.init_hidden(batch_size, device)
    seq_batch, target_batch = next(iter(train_dl))
    seq_batch = seq_batch.to(device)
    target_batch = target_batch.to(device)
    optimizer.zero_grad()
    loss = 0

    for c in range(seq_length):
        pred, hidden, cell = model(seq_batch[:, c].unsqueeze(1), hidden, cell)
        pred = pred.squeeze(1)
        loss += loss_fn(pred, target_batch[:, c])

    loss.backward()
    optimizer.step()
    loss = loss.item() / seq_length

    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {loss:.4f}")

# ------------------ Testing ------------------
model.eval()

test_loss = 0
with torch.no_grad():
    for seq_batch, target_batch in test_dl:
        seq_batch, target_batch = seq_batch.to(device), target_batch.to(device)
        hidden, cell = model.init_hidden(seq_batch.size(0), device)
        output, _, _ = model(seq_batch, hidden, cell)
        
        # Reshape for loss
        output = output.view(-1, vocab_size)
        target_batch = target_batch.view(-1)
        
        # Compute loss
        loss = loss_fn(output, target_batch)
        test_loss += loss.item()

avg_test_loss = test_loss / len(test_dl)
print(f"Test Loss: {avg_test_loss:.4f}")

# ------------------ Prediction Example ------------------
def predict_next_kmer(model, initial_sequence, k=4, num_predictions=5):
    model.eval()
    with torch.no_grad():
        # Convert initial sequence to k-mers
        initial_kmers = generate_kmers(initial_sequence, k)
        # Convert to indices
        input_indices = [char_to_idx[kmer] for kmer in initial_kmers if kmer in char_to_idx]
        
        if not input_indices:
            return "No valid k-mers found in initial sequence"
            
        # Get the true next k-mers from the sequence
        true_next_kmers = generate_kmers(initial_sequence[k:], k)[:num_predictions]
        
        # Convert to tensor
        input_tensor = torch.tensor(input_indices, dtype=torch.long).unsqueeze(0).to(device)
        hidden, cell = model.init_hidden(1, device)
        
        predictions = []
        for _ in range(num_predictions):
            output, hidden, cell = model(input_tensor, hidden, cell)
            predicted_idx = output[:, -1].argmax(1).item()
            predictions.append(idx_to_char[predicted_idx])
            input_tensor = torch.cat([input_tensor, torch.tensor([[predicted_idx]], dtype=torch.long).to(device)], dim=1)
            
    return predictions, true_next_kmers

# Example usage
test_sequence = sequence[:20]  # Take first 20 nucleotides
predictions, true_next = predict_next_kmer(model, test_sequence)
print("\nInitial sequence:", test_sequence)
print("Predicted next k-mers:", predictions)
print("True next k-mers:", true_next)
print("\nComparison:")
for i, (pred, true) in enumerate(zip(predictions, true_next)):
    print(f"Position {i+1}: Predicted: {pred}, True: {true}, Match: {pred == true}")

Epoch 0, Loss: 5.5505
Epoch 100, Loss: 1.3877
Epoch 200, Loss: 1.3642
Epoch 300, Loss: 1.3702
Epoch 400, Loss: 1.3568
Epoch 500, Loss: 1.3513
Epoch 600, Loss: 1.3565
Epoch 700, Loss: 1.3562
Epoch 800, Loss: 1.3688
Epoch 900, Loss: 1.3418
Test Loss: 1.3606

Initial sequence: TGCTCTCTTTACCGCTGTTT
Predicted next k-mers: ['TTTT', 'TTTT', 'TTTC', 'TTCA', 'TCAG']


## Exercise 3: DNA (K-mers) with RNN and Attention Mechanism Model

In [10]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from collections import Counter

# ------------------ Load and preprocess the genome ------------------
# Load file
with open("Ecoli_GCF_003018035.1_ASM301803v1_genomic.fna", "r") as f:
    lines = f.readlines()

# Clean sequence (remove FASTA headers and newlines)
sequence = ''.join([line.strip() for line in lines if not line.startswith('>')])

# Function to generate k-mers
def generate_kmers(sequence, k=4):
    kmers = [sequence[i:i+k] for i in range(len(sequence) - k + 1)]
    return kmers

# Generate k-mers
k = 4
kmers = generate_kmers(sequence, k)

# Create vocabulary of k-mers
kmer_counts = Counter(kmers)
vocab = list(kmer_counts.keys())
char_to_idx = {kmer: idx for idx, kmer in enumerate(vocab)}
idx_to_char = {idx: kmer for kmer, idx in char_to_idx.items()}

# Encode the sequence of k-mers
encoded_seq = np.array([char_to_idx[kmer] for kmer in kmers], dtype=np.int64)

# Train-test split
split_idx = int(len(encoded_seq) * 0.8)
train_data = encoded_seq[:split_idx]
test_data = encoded_seq[split_idx:]

# ------------------ PyTorch Dataset ------------------
class KmerDataset(Dataset):
    def __init__(self, data, seq_length):
        self.data = data
        self.seq_length = seq_length

    def __len__(self):
        return len(self.data) - self.seq_length

    def __getitem__(self, idx):
        x = self.data[idx:idx + self.seq_length]
        y = self.data[idx + 1:idx + self.seq_length + 1]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# Parameters
seq_length = 50
batch_size = 64

# DataLoaders
train_ds = KmerDataset(train_data, seq_length)
test_ds = KmerDataset(test_data, seq_length)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

# ------------------ Attention RNN Model ------------------
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.attention = nn.Linear(hidden_size * 2, 1)  # Changed to output single attention score
        self.softmax = nn.Softmax(dim=1)

    def forward(self, hidden, encoder_outputs):
        
        # Repeat hidden for each time step
        hidden = hidden.unsqueeze(1).repeat(1, encoder_outputs.size(1), 1)
        
        # Concatenate hidden with encoder outputs
        energy = torch.cat((hidden, encoder_outputs), dim=2)
        
        # Calculate attention scores
        attention_scores = self.attention(energy).squeeze(2)  # shape: (batch_size, seq_length)
        attention_weights = self.softmax(attention_scores)
        
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)  # shape: (batch_size, 1, hidden_size)
        
        return context.squeeze(1), attention_weights

class AttentionLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, rnn_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn_hidden_size = rnn_hidden_size
        
        # Encoder LSTM
        self.encoder = nn.LSTM(embed_dim, rnn_hidden_size, batch_first=True)
        
        # Attention mechanism
        self.attention = Attention(rnn_hidden_size)
        
        # Decoder
        self.decoder = nn.LSTM(embed_dim + rnn_hidden_size, rnn_hidden_size, batch_first=True)
        
        # Output layer
        self.fc = nn.Linear(rnn_hidden_size, vocab_size)

    def init_hidden(self, batch_size, device):
        # Initialize hidden state for encoder
        h0 = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        c0 = torch.zeros(1, batch_size, self.rnn_hidden_size).to(device)
        return h0, c0

    def forward(self, x, hidden, cell):
        # Embedding
        embedded = self.embedding(x)  # shape: (batch, seq_len, embed_dim)
        
        # Encoder
        encoder_outputs, (hidden, cell) = self.encoder(embedded, (hidden, cell))
        
        # Initialize decoder hidden state
        decoder_hidden = hidden
        decoder_cell = cell
        
        # Initialize output tensor
        outputs = torch.zeros(x.size(0), x.size(1), self.fc.out_features).to(x.device)
        
        # Process each time step
        for t in range(x.size(1)):
            # Get current input
            current_input = embedded[:, t:t+1]
            
            # Apply attention - ensure decoder_hidden is properly squeezed
            decoder_hidden_squeezed = decoder_hidden.squeeze(0)  # Remove the num_layers dimension
            context, _ = self.attention(decoder_hidden_squeezed, encoder_outputs)
            context = context.unsqueeze(1)  # Add back the sequence length dimension
            
            # Concatenate input with context
            decoder_input = torch.cat((current_input, context), dim=2)
            
            # Decoder step
            decoder_output, (decoder_hidden, decoder_cell) = self.decoder(
                decoder_input, (decoder_hidden, decoder_cell)
            )
            
            # Output layer
            output = self.fc(decoder_output.squeeze(1))
            outputs[:, t] = output
            
        return outputs, hidden, cell

# ------------------ Training ------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

vocab_size = len(vocab)
embed_dim = 256
rnn_hidden_size = 512

model = AttentionLSTM(vocab_size, embed_dim, rnn_hidden_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 1000

for epoch in range(num_epochs):
    model.train()
    hidden, cell = model.init_hidden(batch_size, device)
    seq_batch, target_batch = next(iter(train_dl))
    seq_batch = seq_batch.to(device)
    target_batch = target_batch.to(device)
    optimizer.zero_grad()
    
    # Forward pass
    output, hidden, cell = model(seq_batch, hidden, cell)
    
    # Reshape for loss
    output = output.view(-1, vocab_size)
    target_batch = target_batch.view(-1)
    
    # Calculate loss
    loss = loss_fn(output, target_batch)
    
    # Backward pass
    loss.backward()
    optimizer.step()
    
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

# ------------------ Testing ------------------
model.eval()
test_loss = 0

with torch.no_grad():
    for seq_batch, target_batch in test_dl:
        seq_batch, target_batch = seq_batch.to(device), target_batch.to(device)
        hidden, cell = model.init_hidden(seq_batch.size(0), device)
        output, _, _ = model(seq_batch, hidden, cell)
        
        # Reshape for loss
        output = output.view(-1, vocab_size)
        target_batch = target_batch.view(-1)
        
        # Compute loss
        loss = loss_fn(output, target_batch)
        test_loss += loss.item()

avg_test_loss = test_loss / len(test_dl)
print(f"Test Loss: {avg_test_loss:.4f}")

# ------------------ Prediction Example ------------------
def predict_next_kmer(model, initial_sequence, k=4, num_predictions=5):
    model.eval()
    with torch.no_grad():
        # Convert initial sequence to k-mers
        initial_kmers = generate_kmers(initial_sequence, k)
        # Convert to indices
        input_indices = [char_to_idx[kmer] for kmer in initial_kmers if kmer in char_to_idx]
        
        if not input_indices:
            return "No valid k-mers found in initial sequence"
            
        # Convert to tensor
        input_tensor = torch.tensor(input_indices, dtype=torch.long).unsqueeze(0).to(device)
        hidden, cell = model.init_hidden(1, device)
        
        predictions = []
        for _ in range(num_predictions):
            output, hidden, cell = model(input_tensor, hidden, cell)
            predicted_idx = output[:, -1].argmax(1).item()
            predictions.append(idx_to_char[predicted_idx])
            input_tensor = torch.cat([input_tensor, torch.tensor([[predicted_idx]], dtype=torch.long).to(device)], dim=1)
            
    return predictions

# Example usage
test_sequence = sequence[:20]  # Take first 20 nucleotides
predictions = predict_next_kmer(model, test_sequence)
print("\nInitial sequence:", test_sequence)
print("Predicted next k-mers:", predictions)

Epoch 0, Loss: 5.5496
Epoch 100, Loss: 1.3059
Epoch 200, Loss: 1.1240
Epoch 300, Loss: 1.0030
Epoch 400, Loss: 0.8790
Epoch 500, Loss: 0.8197
Epoch 600, Loss: 0.7271
Epoch 700, Loss: 0.6735
Epoch 800, Loss: 0.6122
Epoch 900, Loss: 0.5806
Test Loss: 0.5421

Initial sequence: TGCTCTCTTTACCGCTGTTT
Predicted next k-mers: ['TTTC', 'TTCT', 'TCTC', 'CTCT', 'TCTC']
