In [None]:
!unzip real_data.zip

In [None]:
import os
import torch

def load_dna_sequences(folder):
    sequences = []
    for filename in sorted(os.listdir(folder)):
        if filename.endswith('.dna'):
            with open(os.path.join(folder, filename), 'r') as file:
                # Read the file content
                content = file.read()
                # Remove whitespaces, newlines, and special characters
                cleaned_content = ''.join(filter(str.isalpha, content))
                sequences.append(cleaned_content.upper())  # Convert to upper case if needed
    return sequences

In [None]:
# Load training, validation, and test sets
train_data = load_dna_sequences('train_data')
train_labels = load_dna_sequences('train_labels')
val_data = load_dna_sequences('val_data')
val_labels = load_dna_sequences('val_labels')
test_data = load_dna_sequences('test_data')
test_labels = load_dna_sequences('test_labels')

In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder

# Custom Dataset Class
class DNASequenceDataset(Dataset):
    def __init__(self, sequences, labels, max_seq_len):
        self.sequences = sequences
        self.labels = labels
        self.max_seq_len = max_seq_len
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(['A', 'C', 'G', 'T', 'N'])  # N for padding

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        label = self.labels[idx]

        # Encode sequence and label
        encoded_seq = self.label_encoder.transform(list(sequence + 'N' * (self.max_seq_len - len(sequence))))
        encoded_label = self.label_encoder.transform(list(label + 'N' * (self.max_seq_len - len(label))))

        return torch.tensor(encoded_seq, dtype=torch.long), torch.tensor(encoded_label, dtype=torch.long)

# Determine the maximum sequence length for padding
max_seq_len = max(max(len(seq) for seq in train_data), max(len(seq) for seq in train_labels))

# Create datasets
train_dataset = DNASequenceDataset(train_data, train_labels, max_seq_len)
val_dataset = DNASequenceDataset(val_data, val_labels, max_seq_len)
test_dataset = DNASequenceDataset(test_data, test_labels, max_seq_len)

# Create dataloaders
batch_size = 32  # You can adjust this based on your GPU capacity
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch.nn as nn

class DNALSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, bidirectional=False, dropout=0.5):
        super(DNALSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = bidirectional

        # LSTM Layer
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=bidirectional, dropout=dropout if num_layers > 1 else 0)

        # Linear layer that maps from hidden state space to tag space
        # Adjust output dimension if bidirectional
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.hidden2tag = nn.Linear(lstm_output_dim, output_dim)

    def forward(self, sequence):
        lstm_out, _ = self.lstm(sequence)
        tag_space = self.hidden2tag(lstm_out)
        tag_scores = torch.log_softmax(tag_space, dim=2)
        return tag_scores

# Parameters for the model
input_dim = 5  # 4 nucleotides + 1 for padding
hidden_dim = 64  # Can be adjusted
output_dim = 5  # 4 nucleotides + 1 for padding
num_layers = 2  # Adjusted
bidirectional = True  # Using bidirectional LSTM

model = DNALSTM(input_dim, hidden_dim, output_dim, num_layers, bidirectional)

In [None]:
import torch.optim as optim

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move the model to the specified device
model.to(device)


# Loss and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Function to convert sequences to one-hot encodings
def sequence_to_one_hot(sequence, sequence_length):
    one_hot = torch.zeros(sequence_length, 5)  # 5 for A, C, G, T, N
    one_hot[torch.arange(sequence_length), sequence] = 1
    return one_hot

# Training loop
num_epochs = 20  # Can be adjusted
for epoch in range(num_epochs):
    running_loss = 0.0
    for sequences, labels in train_loader:
        # Convert sequences to one hot encoding and move to GPU
        sequences_one_hot = torch.stack([sequence_to_one_hot(seq, max_seq_len) for seq in sequences]).to(device)

        # Move labels to the same device as model
        labels = labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward
        outputs = model(sequences_one_hot)

        # Compute loss; assuming labels are class indices, not one-hot encoded
        loss = criterion(outputs.view(-1, 5), labels.view(-1))

        # Backward + optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

Epoch 1, Loss: 1.4433101680543687
Epoch 2, Loss: 1.034171728624238
Epoch 3, Loss: 0.5470337892572085
Epoch 4, Loss: 0.4975723723570506
Epoch 5, Loss: 0.4920103872815768
Epoch 6, Loss: 0.48343312326404786
Epoch 7, Loss: 0.47942885508139926
Epoch 8, Loss: 0.4818102220694224
Epoch 9, Loss: 0.47446943033072686
Epoch 10, Loss: 0.47342990669939256
Epoch 11, Loss: 0.4703857832484775
Epoch 12, Loss: 0.46630774438381195
Epoch 13, Loss: 0.4709229874942038
Epoch 14, Loss: 0.4609362888667319
Epoch 15, Loss: 0.46639643526739544
Epoch 16, Loss: 0.4578609632121192
Epoch 17, Loss: 0.459771738284164
Epoch 18, Loss: 0.4560243984063466
Epoch 19, Loss: 0.4559146004418532
Epoch 20, Loss: 0.4572299275961187


In [None]:
def decode_sequence(output, label_encoder):
    # Convert output to predicted class indices
    _, predicted_indices = torch.max(output, 2)

    # Ensure the tensor is on the CPU before converting to numpy
    predicted_indices = predicted_indices.cpu()

    # Convert indices to nucleotides
    predicted_sequences = ["".join(label_encoder.inverse_transform(indices.numpy())) for indices in predicted_indices]
    return predicted_sequences

def predict(model, data_loader, label_encoder):
    model.eval()
    all_predictions = []
    with torch.no_grad():
        for sequences, _ in data_loader:
            sequences_one_hot = torch.stack([sequence_to_one_hot(seq, max_seq_len) for seq in sequences]).to(device)
            outputs = model(sequences_one_hot)
            predictions = decode_sequence(outputs, label_encoder)
            all_predictions.extend(predictions)

    return all_predictions

In [None]:
import editdistance

def calculate_normalized_edit_distance(predictions, true_labels):
    total_distance = 0
    total_length = 0

    for pred, true in zip(predictions, true_labels):
        # Compute the edit distance for each pair of sequences
        distance = editdistance.eval(pred, true)
        total_distance += distance
        total_length += len(true)

    # Normalizing the total edit distance by the total length of all sequences
    avg_normalized_distance = total_distance / total_length
    return avg_normalized_distance

# Generating predictions on the test set
test_predictions = predict(model, test_loader, train_dataset.label_encoder)

# Truncate the predictions and true labels to their original lengths (remove padding)
truncated_predictions = [pred[:len(true)] for pred, true in zip(test_predictions, test_labels)]
truncated_test_labels = [true.rstrip('N') for true in test_labels]  # Assuming 'N' is used for padding

print(len(truncated_predictions))

# Calculate the average normalized edit distance
avg_normalized_edit_distance = calculate_normalized_edit_distance(truncated_predictions, truncated_test_labels)
print(f"Average Normalized Edit Distance on Test Set: {avg_normalized_edit_distance}")

379
Average Normalized Edit Distance on Test Set: 0.02169185454345253


In [None]:
def print_first_5_predictions(model, data_loader, label_encoder, test_inputs, test_labels):
    model.eval()
    count = 0
    with torch.no_grad():
        for i, (sequences, _) in enumerate(data_loader):
            if count >= 5:
                break

            # Move sequences to GPU
            sequences_one_hot = torch.stack([sequence_to_one_hot(seq, max_seq_len) for seq in sequences]).to(device)

            outputs = model(sequences_one_hot)
            predictions = decode_sequence(outputs, label_encoder)

            for j in range(len(predictions)):
                if count >= 5:
                    break

                input_seq = test_inputs[i * len(predictions) + j]
                pred = predictions[j][:len(test_labels[i * len(predictions) + j])]
                actual = test_labels[i * len(predictions) + j].rstrip('N')  # Remove padding
                input_distance = editdistance.eval(input_seq, actual)
                pred_distance = editdistance.eval(pred, actual)

                print(f"Input:          {input_seq}")
                print(f"Prediction:     {pred}")
                print(f"Actual:         {actual}")
                print(f"Edit Distance (Input to Actual): {input_distance}")
                print(f"Edit Distance (Prediction to Actual): {pred_distance}\n")

                count += 1

# Print first 5 predictions with inputs and distances
print_first_5_predictions(model, test_loader, train_dataset.label_encoder, test_data, test_labels)

Input:          ATGTAGGTATAGCACACGAGCTCTGAGTGCACGCCCGCCCGAATCTTCGATGGCAAGAGCGAACGAGCGACGCTTAGAGTGAGTGACCGAGAGAGTGCGAGGATGGGAGCTTGCTGGCTGGCGAGCCAACAAAAAAACAGATATCCGACTGAGAAACTTTCTTGCTCACGGGCGCCCGGCCTACCGTCCTAACTCTCGTCCGGCCTTCCTTTCTGGGAATCTTACTGCGAATGACAGACCGACCCTGTCTCTCTGGACGCACAAATACCCATCGTGGAACGAAGCTGCCTACCTAACTAACGGGCGGCGAAACTTAGAAGCTTTCTGGCTATCTGAGAAGGAATGAGTCTTTCTTGCTTTGAAGGATGGAAGCTCGCTGACGTCAGACAGCGCCCTCGGGCTATCTCCCTAGCGGCCGGTCTGCCTTCCGTCCTACGCAGGACCCTTTCTCGCTCCCTTTCTGGGACAGAGCGAGAGAATCTTAGAAGGACAGACTGAAGCTTCCTGCCCTGATAGATGTCGCGCTTTCTTACTGCCTGACTAGCGGTCTTCGAACCGGGCGTACTTACTGCCGTACGAGCGTGCTCCCTGAGACGGATAGAATGATCGACTCTTTCTGCCTCTCGCTCCCTCAGTCAATCACTCCCACGTGGAACGAGAGAATGAACCTCTCGTACTGGCTGTCGGCCGGTCGTTCGTACGAGCCTGCGACCCATCCGACGCGCGACCCGACTCGGATAGAGACTGACGGTCCGACAATAGATATAACGACCGTTCTGTGAAGGAGAGACACTGGCTGCCTTTGAGGCTTACTGACGGTCCAACACACAGGCGCCCGGCCAACATGGATTTCATGGAGACTGGGAGTGCCTGAACCGTACCTTCGAACCGCCAAGCTATCTCTGAAGGATAGACCCTTTGAGGGAGCGAATGACCGACGCGTACAGAATATAGGCCAGAGTCCTCACCTTCCACACAAACCAG