# Downstream prediction 

Run this notebook on google colab to use a free GPU! 

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/helicalAI/helical/blob/main/examples/notebooks/Hyena-DNA-Inference.ipynb)

In this notebook, a HyenaDNA model is used for various classifications tasks with a given sequence of nucleotides.

A HyenaDNA model (2 layers and width 256) is used to create embeddings of nucleotides.

A neural network is then trained, using the embeddings as inputs, to make a prediction.

This notebook can be used for any of the 18 [nucleotide transformer downstream tasks](https://huggingface.co/datasets/InstaDeepAI/nucleotide_transformer_downstream_tasks).

In [0]:
!pip install helical

In [0]:
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import numpy as np
from datasets import DatasetDict
import torch
from torch import nn
from typing import Tuple
import logging, warnings
from torch.nn.functional import one_hot
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

logging.getLogger().setLevel(logging.ERROR)

warnings.filterwarnings("ignore")

### Use the Helical package to get the Hyena model
We use a small HyenaDNA model with 2 layers and width 256.

In [0]:
from helical.models.hyena_dna.model import HyenaDNA
from helical.models.hyena_dna.hyena_dna_config import HyenaDNAConfig  
device = "cuda" if torch.cuda.is_available() else "cpu"
configurer = HyenaDNAConfig(model_name="hyenadna-tiny-1k-seqlen-d256", device=device, batch_size=5)
hyena_model = HyenaDNA(configurer=configurer)

### Download the dataset
Several datasets are available from the [Nucleotide Transformer](https://arxiv.org/abs/2306.15794). Using the `get_dataset_config_names()` function, we get a list of the available the datasets for the downstream tasks.

In [0]:
from datasets import get_dataset_config_names

configs = get_dataset_config_names("InstaDeepAI/nucleotide_transformer_downstream_tasks", trust_remote_code=True)
configs

We can select any of the 18 downstream tasks. Let us take the `promoter_tata` as an example.

In [0]:
from datasets import load_dataset
label = "promoter_tata"
dataset = load_dataset("InstaDeepAI/nucleotide_transformer_downstream_tasks_revised", label, trust_remote_code=True)

To familiarize ourselves with the data, we can print the first seqence and see if it is a splice site acceptor or not:

In [0]:
print("Nucleotide sequence:", dataset["train"]["sequence"][0][:10], "...")
print("Label name:", dataset["train"].config_name, "and value:", dataset["train"]["label"][0])
num_classes = len(set(dataset["train"]["label"]))
print("Number of classes:", num_classes)

Define a function that gets the embeddings for each nucleotide sequence in the training dataset.

According to the HyenaDNA [paper](https://arxiv.org/pdf/2306.15794): "[they] average across the tokens to obtain a single classification token".

In our code below, the Hyena model returns a (302, 256) matrix. We average column wise resulting in a vector of shape (256, ) for each observation.

During the training process, we also found that it is beneficial to normalize the data row-wise.

In [0]:
def get_model_inputs(dataset: DatasetDict) -> Tuple[np.ndarray, np.ndarray]:
    """
    Parameters
    ----------
    dataset : DatasetDict
        The dataset containing the sequences and labels.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        A tuple containing the input features and labels.
    """
    processed_data = hyena_model.process_data(dataset["sequence"])

    embeddings = hyena_model.get_embeddings(processed_data)
    embeddings = embeddings.mean(axis=1)
    means = np.mean(embeddings, axis=1, keepdims=True)
    stds = np.std(embeddings, axis=1, keepdims=True)

    x = (embeddings - means) / stds

    return x, dataset["label"]

It may be beneficial to do this step once and save the output in a `.npy` file.

In [0]:
x, y = get_model_inputs(dataset["train"])
#np.save(f"data/train/x_{label}_norm_256", x)
#np.save(f"data/train/y_{label}_norm_256", y)

Load the data and one-hot-encode the labels.

We split the training set into actual training data and a test set. 

This is optional and the entire dataset can be used for training. We did this to avoid data leakage by not touching the test set during the training process.

In [0]:
#x = np.load(f"data/train/x_{label}_norm_256.npy")
#y = np.load(f"data/train/y_{label}_norm_256.npy")

# # One-hot encode the labels
encoder = LabelEncoder()
y_encoded = encoder.fit_transform(y)
y_encoded = one_hot(torch.tensor(y_encoded),num_classes).float()
y_encoded


In [0]:
X_train, X_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.1, random_state=42)

In [0]:
input_shape = configurer.config['d_model']

# Define the model architecture
head_model = nn.Sequential(
    nn.Linear(input_shape, 256),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(256, 128),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(128, 128),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(128, 64),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(64, num_classes),
    nn.Softmax(dim=1)
    )

print(head_model)

In [0]:
def train_model(model: nn.Sequential,
                X_train: torch.Tensor,  
                y_train: torch.Tensor,  
                X_val: torch.Tensor, 
                y_val: torch.Tensor, 
                optimizer = optim.Adam, 
                loss_fn = nn.CrossEntropyLoss(),
                num_epochs = 25, 
                batch = 64):    

    # Create DataLoader for batching
    train_dataset = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=batch, shuffle=True)

    # Validation dataset
    val_dataset = TensorDataset(X_val, y_val)
    val_loader = DataLoader(val_dataset, batch_size=batch, shuffle=False)

    # Ensure model is in training mode
    model.train()

    for epoch in range(num_epochs):
        for batch_X, batch_y in train_loader:
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(batch_X)
            
            # Compute loss
            loss = loss_fn(outputs, batch_y)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        
        # Validation phase (optional)
        model.eval()
        with torch.no_grad():
            val_losses = []
            for val_X, val_y in val_loader:
                val_outputs = model(val_X)
                val_loss = loss_fn(val_outputs, val_y)
                val_losses.append(val_loss.item())
            
            print(f"Epoch {epoch+1}, Validation Loss: {sum(val_losses)/len(val_losses)}")
        
        # Set back to training mode for next epoch
        model.train()
        
    model.eval()   
    return model

### Define and train the model

In [0]:
trained_model = train_model(head_model, 
                            torch.from_numpy(X_train), 
                            y_train, 
                            torch.from_numpy(X_test), 
                            y_test,
                            optim.Adam(head_model.parameters(), lr=0.001),
                            nn.CrossEntropyLoss())

In [0]:
X_unseen, y_unseen = get_model_inputs(dataset["test"])
#np.save(f"data/test/x_{label}_norm_256.npy")
#np.save(f"data/test/y_{label}_norm_256.npy")

### Evaluate the model on the test data


In [0]:
#X_unseen = np.load(f"data/test/x_{label}_norm_256.npy")
#y_unseen = np.load(f"data/test/y_{label}_norm_256.npy")

predictions_nn = trained_model(torch.from_numpy(X_unseen))

y_pred = np.array(torch.argmax(predictions_nn, dim=1))
print("Correct predictions: {:.2f}%".format(sum(np.equal(y_pred, y_unseen))*100/len(y_unseen)))


The [Hyena](https://arxiv.org/pdf/2306.15794) and the [Nucleotide transformer](https://www.biorxiv.org/content/10.1101/2023.01.11.523679v1.full.pdf) papers, report accuracies around 95% for this task. Our results underperform in comparison. This is probably due to the much larger models being used for the NT, while the Hyena model was re-trained from scratch for this task. In future work, we want to achieve these accuracies too with either approaches.

## OPTIONAL
For reference, we also trained an SVM model and obtained similar results (to our small NN).

In [0]:
# Train the SVM model
svm_model = svm.SVC(kernel='rbf', degree=3, C=1, decision_function_shape='ovr')  # One-vs-rest strategy
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=42)
svm_model.fit(X_train, y_train)

In [0]:
# Evaluate the model
unseen_predictions_svm = svm_model.predict(X_unseen)

accuracy = accuracy_score(y_unseen, unseen_predictions_svm)
print("Test accuracy: {:.1f}%".format(accuracy*100))


In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# NN
# y_true = np.argmax(y_test, axis=1)
# y_pred_classes = np.argmax(predictions_nn, axis=1)

# SVM
y_true = y_test
y_pred_classes = svm_model.predict(X_test)

# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred_classes)

# Create a heatmap
plt.figure(figsize=(10,7))
sns.heatmap(cm, annot=True, fmt='d')
plt.xlabel('Predicted')
plt.ylabel('Truth')
plt.show()