## Load required packages

In [None]:
import tensorflow as tf
import keras
import keras.layers as kl
from keras.models import Sequential, Model, load_model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, History, ModelCheckpoint

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import gdown

In [None]:
## mount on googledrive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# data is already stored 
import joblib
data = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/data')

X_train = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/X_train')
Y_train = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/Y_train')

X_valid = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/X_valid')
Y_valid = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/Y_valid')

X_test = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/X_test')
Y_test = joblib.load('/content/drive/MyDrive/DeepSTARR_tutorial/Y_test')


## My implementation (CNN + Transformer)

In [None]:
import tensorflow as tf
from tensorflow.keras import layers as kl
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

params_smaller_data = {
    'batch_size': 64,
    'epochs': 1,
    'early_stop': 20,
    'lr': 0.001,
    'n_conv_layer': 3,
    'num_filters1': 128,
    'num_filters2': 60,
    'num_filters3': 60,
    'kernel_size1': 7,
    'kernel_size2': 3,
    'kernel_size3': 5,
    'n_dense_layer': 1,
    'dense_neurons1': 64,
    'dropout_conv': 'yes',
    'dropout_prob': 0.4,
    'pad': 'same',
    'num_heads': 4,  # number of attention heads
    'dff': 128,  # depth of the feed-forward network
    'sequence_length': 249  # adjust based on your data
}


def positional_encoding(positions, d_model):
    positions = tf.range(positions, dtype=tf.float32)[:, tf.newaxis]  # Explicitly cast to float32
    i = tf.range(d_model, dtype=tf.float32)[tf.newaxis, :]  # Explicitly cast to float32
    angle_rates = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
    angle_rads = positions * angle_rates
    sines = tf.sin(angle_rads[:, 0::2])
    cosines = tf.cos(angle_rads[:, 1::2])
    pos_encoding = tf.concat([sines, cosines], axis=-1)
    return pos_encoding

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0, position=None):
    effective_length = inputs.shape[1]
    pos_encoding = positional_encoding(effective_length, inputs.shape[-1])
    pos_encoding = tf.expand_dims(pos_encoding, 0)
    x = kl.Add()([inputs,pos_encoding])  # Add positional encoding to the transformer input

    # First MultiHeadAttention Layer
    x = kl.LayerNormalization(epsilon=1e-6)(inputs)
    x = inputs
    x = kl.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = kl.Dropout(dropout)(x)
    x = kl.Add()([x,inputs])  # Skip Connection

    # Second MultiHeadAttention Layer
    x = kl.LayerNormalization(epsilon=1e-6)(x)
    x = kl.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = kl.Dropout(dropout)(x)
    x = kl.Add()([x,inputs])  # Skip Connection again


    # Feed Forward Part
    x = kl.LayerNormalization(epsilon=1e-6)(x)
    x = kl.Dense(ff_dim, activation="relu")(x)
    x = kl.Dropout(dropout)(x)
    x = kl.Dense(inputs.shape[-1])(x)
    return kl.Add()([x,inputs])  # Skip Connection

def DeepSTARR(params):
    input = kl.Input(shape=(params['sequence_length'], 4))

    # Convolutional layers as before
    x = kl.Conv1D(params['num_filters1'], kernel_size=params['kernel_size1'],
                  padding=params['pad'], name='Conv1D_1')(input)
    x = kl.BatchNormalization()(x)
    x = kl.Activation('relu')(x)
    x = kl.MaxPooling1D(2)(x)

    for i in range(1, params['n_conv_layer']):
        x = kl.Conv1D(params['num_filters' + str(i + 1)],
                      kernel_size=params['kernel_size' + str(i + 1)],
                      padding=params['pad'], name='Conv1D_' + str(i + 1))(x)
        x = kl.BatchNormalization()(x)
        x = kl.Activation('relu')(x)
        x = kl.MaxPooling1D(2)(x)
        if params['dropout_conv'] == 'yes':
            x = kl.Dropout(params['dropout_prob'])(x)

    # Transformer layer with positional encoding
    x = transformer_encoder(x, head_size=64, num_heads=params['num_heads'], ff_dim=params['dff'], dropout=params['dropout_prob'], position=params['sequence_length']//4)

    # Dense layers 
    x = kl.Flatten()(x)
    for i in range(0, params['n_dense_layer']):
        x = kl.Dense(params['dense_neurons' + str(i + 1)], name='Dense_' + str(i + 1))(x)
        x = kl.BatchNormalization()(x)
        x = kl.Activation('relu')(x)
        x = kl.Dropout(params['dropout_prob'])(x)

    bottleneck = x
    tasks = ['Dev', 'Hk']
    outputs = []
    for task in tasks:
        outputs.append(kl.Dense(1, activation='linear', name='Dense_' + task)(bottleneck))

    model = Model([input], outputs)
    model.compile(Adam(learning_rate=params['lr']),
                  loss=['mse', 'mse'],
                  loss_weights=[1, 1])

    return model, params

model, params = DeepSTARR(params_smaller_data)
model.summary()


## Train model

In [None]:
# function to to train model
def train(selected_model, X_train, Y_train, X_valid, Y_valid, params):

    my_history=selected_model.fit(X_train, Y_train,
                                  validation_data=(X_valid, Y_valid), # The function trains the selected_model using the training data and validates it using the validation data.
                                  batch_size=params['batch_size'],
                                  epochs=params['epochs'],
                                  callbacks=[EarlyStopping(patience=params['early_stop'], monitor="val_loss", restore_best_weights=True), # The training will stop early if there is no improvement in validation loss after a certain number of epochs (patience)
                                             History()])

    # save model and history
    return selected_model, my_history

In [None]:
main_model, main_params = DeepSTARR(params_smaller_data) 
main_model, my_history = train(main_model, X_train, Y_train, X_valid, Y_valid, main_params)

## Evaluate the model

### Training history

In [None]:
# Plot training & validation training metrics
for out in ['Dev', 'Hk']:
  # MSE
  plt.plot(my_history.history[str('Dense_' + out + '_loss')])
  plt.plot(my_history.history[str('val_Dense_' + out + '_loss')])
  plt.title(out) # loss is Mean Squared Error (MSE)
  plt.ylabel('Loss MSE')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Validation'], loc='upper left')

  # Add vertical line at minimum validation loss of combined dev and hk
  min_val_loss = min(my_history.history['val_loss'])
  plt.axvline(x=my_history.history['val_loss'].index(min_val_loss), color='red', linestyle='--')

  plt.show()

### Model performance metrics

Analyse Mean Squared Error (MSE) and Pearson (PCC) and Spearman (SCC) correlation coefficients

In [None]:
from scipy import stats
from sklearn.metrics import mean_squared_error

# create functions
def summary_statistics(X, Y, set, task):
    pred = main_model.predict(X, batch_size=main_params['batch_size']) # predict
    if task =="Dev":
        i=0
    if task =="Hk":
        i=1
    print(set + ' MSE ' + task + ' = ' + str("{0:0.2f}".format(mean_squared_error(Y, pred[i].squeeze()))))
    print(set + ' PCC ' + task + ' = ' + str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0])))
    # print(set + ' SCC ' + task + ' = ' + str("{0:0.2f}".format(stats.spearmanr(Y, pred[i].squeeze())[0]))) # not giving similar results as training

# predict activity for each set and enhancer type
summary_statistics(X_train, Y_train[0], "train", "Dev")
summary_statistics(X_valid, Y_valid[0], "validation", "Dev")
summary_statistics(X_test, Y_test[0], "test", "Dev")

summary_statistics(X_train, Y_train[1], "train", "Hk")
summary_statistics(X_valid, Y_valid[1], "validation", "Hk")
summary_statistics(X_test, Y_test[1], "test", "Hk")

Although there is some overfitting (i.e. the performance is better in the training set than test set), the performance on the test data is still good. This suggests that the model accurately captures the regulatory information present in the DNA sequences, as you will see in the sections below.

### Scatter plots
The scatter plots compare the observed values with the predicted ones for each set of sequences (train/validation/testing). This allows to compare the global performance of the final model.

In [None]:
def my_scatter(X, Y, set, task):
  pred = main_model.predict(X, batch_size=main_params['batch_size'])
  if task =="Dev":
    i=0
    c="red"
  if task =="Hk":
    i=1
    c="blue"

  g = sns.regplot(x=Y, y=pred[i].squeeze(), ci=None, color=c,
                  fit_reg=False,
                  scatter_kws={'s': 5, 'alpha': 0.2},
                  line_kws={'color': "black"})

  # add expected regression line
  x0, x1 = g.get_xlim()
  y0, y1 = g.get_ylim()
  lims = [max(x0, y0), min(x1, y1)]
  g.plot(lims, lims, 'w', linestyle='dashed', transform=g.transData, color='grey')

  # same axes ranges
  g.set_aspect('equal')
  # g.set(xlim=(min(x0, y0), max(x1, y1)),
  #       ylim=(min(x0, y0), max(x1, y1)))

  PCC = str("{0:0.2f}".format(stats.pearsonr(Y, pred[i].squeeze())[0]))
  plt.xlabel('Measured expression [log2]')
  plt.ylabel('Predicted expression [log2]')
  plt.title(str(task + ' - ' + set + ' set (PCC=' + PCC + ')'))

  plt.show()

# plots
my_scatter(X_test, Y_test[0], "test", "Dev")
my_scatter(X_test, Y_test[1], "test", "Hk")