<a href="https://colab.research.google.com/github/dianekolodny/DataScience/blob/main/Riboswitches.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Colab Pro, Premium GPU (A100), High RAM

## Faster GPUs

Users who have purchased one of Colab's paid plans have access to premium GPUs. You can upgrade your notebook's GPU settings in `Runtime > Change runtime type` in the menu to enable Premium accelerator. Subject to availability, selecting a premium GPU may grant you access to a V100 or A100 Nvidia GPU.

The free of charge version of Colab grants access to Nvidia's T4 GPUs subject to quota restrictions and availability.

You can see what GPU you've been assigned at any time by executing the following cell. If the execution result of running the code cell below is "Not connected to a GPU", you can change the runtime by going to `Runtime > Change runtime type` in the menu to enable a GPU accelerator, and then re-execute the code cell.

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
import torch
import torchvision

print(f"PyTorch version: {torch.__version__}")
print(f"TorchVision version: {torchvision.__version__}")

## Data Preparation

The first step is to prepare the RNA sequence and 3D structure data for training and evaluation. We will use the Rfam database to obtain RNA sequences belonging to known Riboswitch classes. We can store this data in a CSV file or other format that is suitable for loading into a PyTorch DataLoader.

In [None]:
!pip install pandas torch requests bs4 transformers pytorch-ranger ranger #json re gzip io csv matplotlib scikit-learn

### Create a CSV for all RFAM IDs

In [None]:
import csv

# Define the data as a list of tuples
riboswitch_data = [
    ('Glutamine riboswitch', 'RF01739'),
    ('Glutamine-II riboswitch (downstream peptide RNA)', 'RF01704'),
    ('raiA RNA', 'RF03072'),
    ('SAM riboswitch (S box leader)', 'RF00162'),
    ('Cobalamin riboswitch', 'RF00174'),
    ('SAM-I/IV variant riboswitch', 'RF01725'),
    ('Fluoride riboswitch (crcB)', 'RF01734'),
    ('nhaA-I RNA', 'RF03057'),
    ('ZMP/ZTP riboswitch', 'RF01750'),
    ('Moco (molybdenum cofactor) riboswitch', 'RF01055'),
    ('M-box riboswitch (ykoK leader)', 'RF00380'),
    ('FMN riboswitch (RFN element)', 'RF00050'),
    ('Na+ riboswitch (DUF1646 RNA)', 'RF03071'),
    ('AdoCbl variant RNA', 'RF01689'),
    ('Purine riboswitch', 'RF00167'),
    ('TPP riboswitch (THI element)', 'RF00059'),
    ('ydaO/yuaA leader', 'RF00379'),
    ('THF riboswitch', 'RF01831'),
    ('Guanidine-I riboswitch', 'RF00442'),
    ('NiCo riboswitch', 'RF02683'),
    ('sul1 RNA', 'RF03058'),
    ('SAM/SAH riboswitch', 'RF01727'),
    ('Cyclic di-GMP-II riboswitch', 'RF01786'),
    ('S-adenosyl-L-homocysteine riboswitch', 'RF01057'),
    ('Lysine riboswitch', 'RF00168'),
    ('Glycine riboswitch', 'RF00504'),
    ('S-adenosyl methionine (SAM) riboswitch', 'RF00634'),
    ('SAM riboswitch (alpha-proteobacteria)', 'RF00521'),
    ('PreQ1 riboswitch', 'RF00522'),
    ('PreQ1-III riboswitch', 'RF02680'),
    ('yybP-ykoY manganese riboswitch', 'RF00080'),
    ('SMK box translational riboswitch (SAM-III)', 'RF01767'),
    ('glmS glucosamine-6-phosphate activated ribozyme', 'RF00234'),
    ('preQ1-II (pre queuosine) riboswitch', 'RF01054'),
    ('SAM-VI riboswitch', 'RF02885'),
    ('SAM-V riboswitch', 'RF01826'),
    ('AdoCbl riboswitch', 'RF01482'),
    ('Magnesium Sensor', 'RF01056'),
    ('putative aminoglycoside riboswitch / attI site', 'RF02912'),
    ('M. florum riboswitch', 'RF01510')
]

# Define the headers for the CSV file
csv_headers = ['family_name', 'rfam_id', 'rfam_id_number_only']

# Define a function to extract the non-character integers from the rfam_id
def extract_integers(string):
    return ''.join(c for c in string if c.isdigit())

# Create a list of tuples with the family_name, rfam_id, and rfam_id_number_only columns
processed_data = [(family_name, rfam_id, extract_integers(rfam_id)) for family_name, rfam_id in riboswitch_data]

# Write the data to a CSV file
with open('riboswitch_data.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(csv_headers)
    writer.writerows(processed_data)

print('CSV file created successfully!')

We will take the starting file and download all possible sequences.

In [None]:
import csv
import pandas as pd
import requests
from io import StringIO

# Load the riboswitch_data.csv file
riboswitch_data = pd.read_csv('riboswitch_data.csv')

# Initialize an empty dataframe for the riboswitch dataset
riboswitch_dataset = pd.DataFrame(columns=['Accession ID', 'Start-End', 'Description', 'Sequence', 'rfam_id', 'rfam_id_number_only'])

# Define a function to parse and process the downloaded sequences
def process_sequence_data(sequence_data, rfam_id, rfam_id_number_only):
    # Initialize a list to store processed sequences
    sequences = []

    # Initialize variables for storing data
    accession_id = ''
    start_end = ''
    description = ''
    sequence = ''

    # Loop over lines in the sequence_data
    for line in sequence_data.split('\n'):
        line = line.strip()

        # Check if line starts with '>'
        if line.startswith('>'):
            # If not the first sequence, append the previous data to the list
            if accession_id:
                sequences.append([accession_id, start_end, description, sequence, rfam_id, rfam_id_number_only])

            # Extract accession ID, start-end, and description from line
            accession_id = line[1:].split('/')[0]
            start_end = line[1:].split('/')[1].split(' ')[0]
            description = ' '.join(line[1:].split('/')[1].split(' ')[1:])

            # Reset sequence variable
            sequence = ''
        else:
            # Append sequence to variable
            sequence += line

    # Append the last sequence to the list
    sequences.append([accession_id, start_end, description, sequence, rfam_id, rfam_id_number_only])

    return sequences

# Loop through all RFAM IDs in the riboswitch_data.csv file
for index, row in riboswitch_data.iterrows():
    rfam_id = row['rfam_id']
    rfam_id_number_only = row['rfam_id_number_only']

    # Download the sequences
    url = f'https://rfam.org/family/{rfam_id}/alignment?acc={rfam_id}&format=fastau&download=0'
    
    try:
        response = requests.get(url)

        # Check if the request was successful
        response.raise_for_status()
        
        # Unzip the downloaded data
        sequence_data = response.content.decode('utf-8')

        # Process the sequence data and append it to the riboswitch_dataset dataframe
        processed_data = process_sequence_data(sequence_data, rfam_id, rfam_id_number_only)
        riboswitch_dataset = pd.concat([riboswitch_dataset, pd.DataFrame(processed_data, columns=riboswitch_dataset.columns)], ignore_index=True)

    except requests.exceptions.RequestException as e:
        print(f"Failed to download sequences for {rfam_id}: {e}")
        continue

# Save the riboswitch dataset to a CSV file
riboswitch_dataset.to_csv('riboswitch_dataset.csv', index=False)

print('Riboswitch dataset created successfully!')
# Print top 6 rows of riboswitch_dataset.csv
print(riboswitch_dataset.head(6))

print('Original Riboswitch data sample below!')
# Print top 6 rows of riboswitch_data.csv
print(riboswitch_data.head(6))


We will take the resulting file and validate the records in our CSV.

In [None]:
import pandas as pd

# Read the riboswitch dataset file
riboswitch_dataset = pd.read_csv('riboswitch_dataset.csv')

# Group the data by rfam_id and count the unique Accession IDs or Sequences per rfam_id
#accession_counts = riboswitch_dataset.groupby('rfam_id')['Accession ID'].nunique()
unique_sequence_counts = riboswitch_dataset.groupby('rfam_id')['Sequence'].nunique()
all_sequence_counts = riboswitch_dataset.groupby('rfam_id')['Sequence']
# Print the number of unique Accession IDs per rfam_id
print(all_sequence_counts)


In [None]:
#next we prepare the dataset
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torch.nn as nn

class RiboswitchDataset(Dataset):
    def __init__(self, df, max_seq_len):
        self.df = df
        self.max_seq_len = max_seq_len
        self.label_to_int = {label: i for i, label in enumerate(sorted(self.df['rfam_id'].unique()))}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        rna_seq = self.df.loc[idx, 'Sequence']
        one_hot_rna = torch.zeros(self.max_seq_len, 4)

        for i, base in enumerate(rna_seq):
            if i >= self.max_seq_len:
                break
            if base == 'A':
                one_hot_rna[i, 0] = 1
            elif base == 'C':
                one_hot_rna[i, 1] = 1
            elif base == 'G':
                one_hot_rna[i, 2] = 1
            elif base == 'U':
                one_hot_rna[i, 3] = 1

        label = self.label_to_int[self.df.loc[idx, 'rfam_id']]
        return one_hot_rna, label

# Load Riboswitch dataset from CSV file with padding
max_seq_len = 200  # You can set this value according to your dataset
riboswitch_df = pd.read_csv('riboswitch_dataset.csv')
riboswitch_dataset = RiboswitchDataset(riboswitch_df, max_seq_len=max_seq_len)
val_size = int(len(riboswitch_dataset) * 0.2)
train_size = len(riboswitch_dataset) - val_size

# Randomly split dataset into training and validation subsets
train_dataset, val_dataset = random_split(riboswitch_dataset, [train_size, val_size])

# Create DataLoader for training and validation
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)

## Model Architecture (Transformer)

We will implement the modified transformer model that incorporates a graph neural network layer, as described in the prior framework you provided. We will use PyTorch to define the architecture of the model, including the input encoding layers, graph neural network layer, multi-head self-attention layer, positional encoding, feedforward layer, and classification layer.

In [None]:
#Define your model architecture
import torch
import torch.nn as nn
from transformers import BertModel, BertConfig

class RiboswitchClassifier(nn.Module):
    def __init__(self, num_classes, max_seq_len):
        super(RiboswitchClassifier, self).__init__()

        self.encoder = nn.Embedding(4, 16)
        self.rnn = nn.GRU(input_size=16, hidden_size=32, num_layers=1, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(32 * 2, num_classes)

    def forward(self, x):
        x = self.encoder(x.argmax(dim=-1))
        x, _ = self.rnn(x)
        x = self.dropout(x[:, -1, :])
        x = self.fc(x)
        return x

# Initialize the model
num_classes = len(train_dataset.dataset.label_to_int)
model = RiboswitchClassifier(num_classes, max_seq_len)

In [None]:
# Train the model
import torch.optim as optim
from tqdm import tqdm

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Add data augmentations
def random_reverse(sequence):
    if torch.rand(1) < 0.5:
        return sequence.flip(dims=[1])
    return sequence

def random_shuffle(sequence):
    if torch.rand(1) < 0.5:
        perm = torch.randperm(sequence.size(1))
        return sequence[:, perm]
    return sequence

def random_complement(sequence):
    if torch.rand(1) < 0.5:
        complement_map = {0: 3, 1: 2, 2: 1, 3: 0}
        return sequence.flip(dims=[2]).apply_(lambda x: complement_map[int(x)])
    return sequence

def apply_augmentations(sequence):
    sequence = random_reverse(sequence)
    sequence = random_shuffle(sequence)
    sequence = random_complement(sequence)
    return sequence

# Training parameters
num_epochs = 100
learning_rate = 0.001
weight_decay = 1e-5  # for L2 regularization

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()

# Use RAdam optimizer
from pytorch_ranger import Ranger
optimizer = Ranger(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs = apply_augmentations(inputs)  # Apply data augmentations
        inputs = inputs.to(device)
        labels = labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Add current batch loss
        running_loss += loss.item()

    # Calculate average loss for the epoch
    epoch_loss = running_loss / (i + 1)

    # Validation loop
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    val_loss = 0.0
    with torch.no_grad():
        for j, data in enumerate(val_loader, 0):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)

            # Calculate validation loss
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_accuracy = 100 * correct / total
    val_loss = val_loss / (j + 1)  # Calculate average validation loss for the epoch

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

print('Finished Training')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import AutoTokenizer, AutoModel

class TransformerModel(nn.Module):
    def __init__(self, num_classes):
        super(TransformerModel, self).__init__()
        self.bert = AutoModel.from_pretrained("bert-base-cased")
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_classes)
    
    def forward(self, input_ids):
        input_shape = input_ids.size()
        if len(input_shape) > 2:
            input_ids = input_ids.squeeze(1)
        bert_output = self.bert(input_ids)
        pooled_output = bert_output[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits    

#model = TransformerModel(num_classes=2)
# input_ids = torch.ones((2, 5), dtype=torch.long)  # Example input tensor of shape (batch_size, sequence_length)
# logits = model(input_ids)
# print(logits.shape)  # Should print (2, 2) for binary classification with batch_size=2

# Use the RiboswitchDataset and DataLoader code you provided earlier

# Define training and validation loop
def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs, label_names):
    y_true = []
    y_pred = []
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels.squeeze())
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels.squeeze())
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels.squeeze()).sum().item()

                # Store true labels and predicted labels
                if epoch == num_epochs - 1:
                    y_true.extend(labels.squeeze().tolist())
                    y_pred.extend(predicted.tolist())

        val_loss /= len(val_loader)
        val_acc = correct / total

        print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        scheduler.step()
    
    return y_true, y_pred, label_names

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model, criterion, and optimizer
num_classes = len(riboswitch_df['rfam_id'].unique())
model = TransformerModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Add a learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

# Train and evaluate the model
num_epochs = 100
label_names = riboswitch_df["rfam_id"].unique()
y_true, y_pred, label_names = train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs, label_names)


### pause, everthing below in this section is wrong

In [None]:
import torch
import torch.nn as nn

class RiboswitchTransformer(nn.Module):
    def __init__(self, num_classes):
        super(RiboswitchTransformer, self).__init__()
        
        # Input Encoding Layers
        self.embedding = nn.Embedding(num_embeddings=4, embedding_dim=128)
        
        # Graph Neural Network Layer
        # Graph Construction
        # Node and Edge Features
        
        # Graph Neural Network Architecture
        self.gnn_layer = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU()
        )
        
        # Output Features
        # Multi-Head Self-Attention Layer
        self.self_attn_layer = nn.MultiheadAttention(embed_dim=32, num_heads=4)
        
        # Positional Encoding
        self.pos_encoding = nn.Parameter(torch.zeros(100, 1, 32))
        
        # Feedforward Layer
        self.feedforward_layer = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU()
        )
        
        # Classification Layer
        self.classification_layer = nn.Linear(32, num_classes)
        
    def forward(self, x):
        # Input Encoding Layers
        x = self.embedding(x)
        
        # Graph Neural Network Layer
        # Graph Construction
        # Node and Edge Features
        
        # Graph Neural Network Architecture
        x = self.gnn_layer(x)
        
        # Output Features
        # Multi-Head Self-Attention Layer
        x = x.transpose(0, 1)
        x, _ = self.self_attn_layer(x, x, x)
        x = x.transpose(0, 1)
        
        # Positional Encoding
        x = x + self.pos_encoding[:x.size(0)]
        
        # Feedforward Layer
        x = self.feedforward_layer(x)
        
        # Classification Layer
        x = self.classification_layer(x[:, -1, :])
        
        return x

In [None]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.dataset import Subset
import torch.nn as nn
import torch.optim as optim

class RiboswitchDataset(Dataset):
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # Load RNA sequence and label from dataframe
        rna_seq = self.df.loc[idx, 'Sequence']
        rfam_id = self.df.loc[idx, 'rfam_id']
        
        # Convert RNA sequence to tensor
        rna_seq_tensor = torch.tensor([int(c == x) for x in "ATCG" for c in rna_seq])
        
        # Convert class label to tensor
        class_label_tensor = torch.tensor(int(rfam_id))
        
        return rna_seq_tensor, class_label_tensor

# Load Riboswitch dataset from CSV file
riboswitch_dataset = RiboswitchDataset('riboswitch_dataset.csv')

# Split dataset into training and validation subsets
val_size = int(len(riboswitch_dataset) * 0.2)
train_size = len(riboswitch_dataset) - val_size
train_dataset, val_dataset = torch.utils.data.random_split(riboswitch_dataset, [train_size, val_size])

# Create DataLoader for training and validation
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)

class RiboswitchTransformer(nn.Module):
    def __init__(self, num_classes):
        super(RiboswitchTransformer, self).__init__()
        
        # Input Encoding Layers
        self.embedding = nn.Embedding(num_embeddings=4, embedding_dim=128)
        
        # Graph Neural Network Layer
        # Graph Construction
        # Node and Edge Features
        
        # Graph Neural Network Architecture
        self.gnn_layer = nn.ModuleList([
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU()
        ])
        
        # Output Features
        # Multi-Head Self-Attention Layer
        self.self_attn_layer = nn.MultiheadAttention(embed_dim=32, num_heads=4)
        
        # Positional Encoding
        self.pos_encoding = nn.Parameter(torch.zeros(100, 1, 32))
        
        # Feedforward Layer
        self.feedforward_layer = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU()
        )
        
        # Classification Layer
        self.classification_layer = nn.Linear(32, num_classes)
        
    def forward(self, x):
        # Input Encoding Layers
        x = self.embedding(x)
        
        # Graph Neural Network Layer
        # Graph Construction
        # Node and Edge Features
        
        # Graph Neural Network Architecture
        for layer in self.gnn_layer:
            x = layer(x)
        
        # Output Features
        # Multi-Head Self-Attention Layer
        x = x.transpose(0, 1)
        x, _ = self.self_attn_layer(x, x, x)
        x = x.transpose(0, 1)
        
        # Positional Encoding
        x = x + self.pos




## Training (Transformer)

We will train the model using PyTorch. This will involve defining a loss function, selecting an optimizer, and running multiple epochs of training. We will use a cross-entropy loss function and the Adam optimizer, which is a popular choice for deep learning models.

In [None]:
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

# Use the RiboswitchDataset and DataLoader code you provided earlier

# Define training and validation loop
def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels.squeeze())
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels.squeeze())
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels.squeeze()).sum().item()

        val_loss /= len(val_loader)
        val_acc = correct / total

        print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model, criterion, and optimizer
num_classes = len(riboswitch_df['rfam_id'].unique())
model = TransformerModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model.parameters(), lr=0.0005)
weight_decay = 1e-5
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=weight_decay)


# Train and evaluate the model
num_epochs = 50
train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, num_epochs)


## Model Architecture (LSTM)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split

class LSTMModel(nn.Module):
    def __init__(self, num_classes, input_size=4, hidden_size=128, num_layers=3, dropout=0.1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        x = self.dropout(hidden[-1])
        x = self.classifier(x)
        return x

# Use the RiboswitchDataset and DataLoader code you provided earlier

# Define training and validation loop
def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs, label_names):
    y_true = []
    y_pred = []
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels.squeeze())
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels.squeeze())
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels.squeeze()).sum().item()

                # Store true labels and predicted labels
                if epoch == num_epochs - 1:
                    y_true.extend(labels.squeeze().tolist())
                    y_pred.extend(predicted.tolist())

        val_loss /= len(val_loader)
        val_acc = correct / total

        print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        scheduler.step()
    
    return y_true, y_pred, label_names

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the model, criterion, and optimizer
num_classes = len(riboswitch_df['rfam_id'].unique())
model = LSTMModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)

# Add a learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

# Train and evaluate the model
num_epochs = 100
label_names = riboswitch_df["rfam_id"].unique()
y_true, y_pred, label_names = train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, scheduler, device, num_epochs, label_names)

## Evaluation (LSTM)

We will evaluate the performance of the trained model using various metrics such as accuracy, precision, recall, and F1-score. We can also visualize the results using confusion matrices and ROC curves.

In [None]:
# Save the model
model_path = "lstm_model_scripted.pt"  # Choose the path and filename you want for your model
torch.save(model.state_dict(), model_path)

In [None]:
# Load the trained model
model_path = "lstm_model_scripted.pt"
model = LSTMModel(num_classes=num_classes) # Replace this with the model class you used for training
model.load_state_dict(torch.load(model_path))
model.eval()

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize

# Compute the accuracy, precision, recall, and F1-score
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)

print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1-score: {f1:.2f}")

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d', xticklabels=label_names, yticklabels=label_names)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

# ROC Curve
y_true_binary = label_binarize(y_true, classes=list(range(len(label_names))))
y_pred_binary = label_binarize(y_pred, classes=list(range(len(label_names))))

fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(len(label_names)):
    fpr[i], tpr[i], _ = roc_curve(y_true_binary[:, i], y_pred_binary[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

plt.figure()

for i in range(len(label_names)):
    plt.plot(fpr[i], tpr[i], label='ROC curve of class {0} (area = {1:0.2f})'.format(label_names[i], roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1))
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Define the data
data = {
    'RF00050': 145,
    'RF00059': 113,
    'RF00080': 29,
    'RF00162': 457,
    'RF00167': 133,
    'RF00168': 47,
    'RF00174': 433,
    'RF00234': 18,
    'RF00379': 106,
    'RF00380': 158,
    'RF00442': 100,
    'RF00504': 44,
    'RF00521': 40,
    'RF00522': 36,
    'RF00634': 40,
    'RF01054': 13,
    'RF01055': 102,
    'RF01056': 4,
    'RF01057': 52,
    'RF01482': 7,
    'RF01510': 3,
    'RF01689': 140,
    'RF01704': 359,
    'RF01725': 414,
    'RF01727': 47,
    'RF01734': 233,
    'RF01739': 663,
    'RF01750': 148,
    'RF01767': 25,
    'RF01786': 48,
    'RF01826': 6,
    'RF01831': 100,
    'RF02680': 18,
    'RF02683': 71,
    'RF02885': 6,
    'RF02912': 4,
    'RF03057': 241,
    'RF03058': 45,
    'RF03071': 107,
    'RF03072': 450
}

# Sort the data by frequency
sorted_data = sorted(data.items(), key=lambda x: x[1])

# Extract the x and y values
x_values = [d[0] for d in sorted_data]
y_values = [d[1] for d in sorted_data]

# Create the bar chart
plt.bar(x_values, y_values)

# Set the chart title and axis labels
plt.title('Frequency of Rfam IDs')
plt.xlabel('Rfam ID')
plt.ylabel('Frequency')

# Rotate the x-axis labels for better visibility
plt.xticks(rotation=90)

# Show the chart
plt.show()

In [None]:
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc

# Binarize the output
n_classes = len(np.unique(y_true))
y_true_bin = label_binarize(y_true, classes=range(n_classes))
y_pred_bin = label_binarize(y_pred, classes=range(n_classes))

# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_pred_bin[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Print ROC curve data as text
for i in range(n_classes):
    print(f"ROC curve of class {i}:")
    print(f"FPR: {fpr[i]}")
    print(f"TPR: {tpr[i]}")
    print(f"AUC: {roc_auc[i]}")


# Evaluation (Transformer)

# Other Code for later

In [None]:
import requests
import json
import gzip

# Set search parameters
url = 'https://rfam.org/search?q=entry_type:%22Family%22%20AND%20rna_type:%22riboswitch%22'
num_per_page = 50

# Initialize list to hold family details
families = []

# Loop over search results pages
page_num = 0
while True:
    # Get search results page
    search_url = f'{url}&start={page_num*num_per_page}&rows={num_per_page}'
    response = requests.get(search_url)
    search_results = json.loads(response.text)

    # Break if no more pages
    if not search_results['results']:
        break
    

    # Loop over families on page
    for result in search_results['results']:
        # Check if family has RNA type riboswitch
        if 'rna_type' in result and result['rna_type'] == 'riboswitch':
            # Get family details
            family = {
                'rfam_id': result['rfam_acc'],
                'name': result['name']
            }

            # Add family to list
            families.append(family)
    
    # Loop over families on page
    for result in search_results['results']:
        # Check if family has RNA type riboswitch
        if 'rna_type' in result and result['rna_type'] == 'riboswitch':
            # Get family details
            family_id = result['rfam_acc']
            family_name = result['description']
            family_url = f'https://rfam.org/family/{family_id}'
            
            # Download and extract the FASTA file
            fasta_url = f'https://ftp.ebi.ac.uk/pub/databases/Rfam/CURRENT/fasta_files/{family_id}.fa.gz'
            response = requests.get(fasta_url, stream=True)
            with gzip.open(response.raw, 'rt') as f:
                lines = f.readlines()
            sequences = []
            for i in range(0, len(lines), 2):
                label = lines[i].strip()[1:]
                sequence = lines[i+1].strip()
                sequences.append((label, sequence))
                
            # Add family details to list
            families.append((family_id, family_name, family_url, sequences))
    
    # Increment page number
    page_num += 1

# Write family details to CSV file
with open('riboswitch_families.csv', 'w') as f:
    f.write('RFAM ID,Family Name,URL,Label,Sequence\n')
    for family in families:
        family_id = family[0]
        family_name = family[1]
        family_url = family[2]
        sequences = family[3]
        for sequence in sequences:
            label = sequence[0]
            seq = sequence[1]
            f.write(f'{family_id},{family_name},{family_url},{label},{seq}\n')

In [None]:
import requests
import json
import torch

# Set search parameters
#query = 'entry_type:"Family" AND rna_type:"riboswitch"'
sort = 'default'
num_per_page = 90

# Initialize lists to hold sequence data and labels
sequences = []
labels = []

# Loop over search results pages
page_num = 0
while True:
    # Get search results page
    search_url = f'https://rfam.org/search?q=entry_type:%22Family%22%20AND%20rna_type:%22riboswitch%22&sort={sort}&start={page_num*num_per_page}&rows={num_per_page}'
    try:
        response = requests.get(search_url)
        search_results = json.loads(response.text)
    except Exception as e:
        print(f"Error occurred while getting search results page: {e}")
        break
    
    print(response.text)
    # Break if no more pages
    if not search_results['results']:
        break
    
    # Loop over families on page
    for result in search_results['results']:
        # Check if family has RNA type riboswitch
        if 'rna_type' in result and result['rna_type'] == 'riboswitch':
            # Get family description
            family_url = f'http://rfam.org/family/{result["rfam_acc"]}?content-type=application/json'
            try:
                response = requests.get(family_url)
                family_data = response.json()

                # Extract alignment URL from family description
                alignment_url = family_data["rfam"]["alignment"]["url"] + "?content-type=text%2Fplain"

                # Retrieve alignment file
                response = requests.get(alignment_url)
                alignment_data = response.text

                # Extract sequences from alignment file
                for line in alignment_data.splitlines():
                    if line.startswith(">"):
                        label = line.strip(">").split()[0]
                    else:
                        sequence = torch.tensor([int(c == x) for x in "ATCG"] for c in line.strip())
                        sequences.append(sequence)
                        labels.append(label)
            except Exception as e:
                print(f"Error occurred while processing family {result['rfam_acc']}: {e}")
            
    # Increment page number
    page_num += 1

# Pad sequences with zeros to make them the same length
max_len = max([len(seq) for seq in sequences])
padded_sequences = []
for seq in sequences:
    padding = torch.zeros(max_len - len(seq), len(seq[0]))
    padded_seq = torch.cat([seq, padding])
    padded_sequences.append(padded_seq)

# Create dataset
dataset = torch.utils.data.TensorDataset(torch.stack(padded_sequences), labels)

# Save dataset to file
torch.save(dataset, 'riboswitch_dataset.pth')