In [None]:
from google.colab import drive
drive.mount('/content/drive')

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.utils import resample

RNA_GE_window_size = 200
RNA_GE_bp_length = 20000    # adjust
RNA_GE_feature_length = RNA_GE_bp_length // RNA_GE_window_size

DNA_feature_bp_length = 4000

RNA_GE_left_columns_to_include = ['L' + str(i) for i in range(100 - RNA_GE_feature_length//2, 100 + RNA_GE_feature_length//2)]
RNA_GE_right_columns_to_include = ['R' + str(i) for i in range(100 - RNA_GE_feature_length//2, 100 + RNA_GE_feature_length//2)]

all_datasets = ["HelaS3", "K562", "IMR90","GM12878"]

def load_dataset(name:str):
  # For fuying dataset
  dataset_paths = {
      "HelaS3": "/content/drive/MyDrive/FYP/data_fuying/fuying_HelaS3_out_fea_DNA.csv",
      "K562": "/content/drive/MyDrive/FYP/data_fuying/fuying_K562_out_fea_DNA.csv",
      "IMR90": "/content/drive/MyDrive/FYP/data_fuying/fuying_IMR90_out_fea_DNA.csv",
      "GM12878": "/content/drive/MyDrive/FYP/data_fuying/fuying_GM12878_out_fea_DNA.csv"
  }

  dataset_RNA_20kbp_paths = {
      "HelaS3": "/content/drive/MyDrive/FYP/data_fuying/fuying_HelaS3_out_fea_20kbp.csv",
      "K562": "/content/drive/MyDrive/FYP/data_fuying/fuying_K562_out_fea_20kbp.csv",
      "IMR90": "/content/drive/MyDrive/FYP/data_fuying/fuying_IMR90_out_fea_20kbp.csv",
      "GM12878": "/content/drive/MyDrive/FYP/data_fuying/fuying_GM12878_out_fea_20kbp.csv"
  }


  if name not in dataset_paths.keys():
    print(f"Dataset {name} not found.")
    return None

  DNA_feature_columns = pd.read_csv(dataset_paths[name])[['left_anchor_sequence', 'right_anchor_sequence']]
  RNA_dataframe = pd.read_csv(dataset_RNA_20kbp_paths[name])
  dataframe = pd.concat([RNA_dataframe, DNA_feature_columns], axis = 1)

  print(f"\n{name} loaded")
  neg, pos = np.bincount(dataframe["label"])
  total = neg + pos
  baseline_auprc = pos / total
  print(f"Baseline auprc: {baseline_auprc}")

  return dataframe

def DNA_sequence_to_indices(sequence):
    try:
      sequence = sequence.upper()
    except:
      print(sequence)

    if len(sequence) < 4000:
        sequence = sequence + "N" * (4000 - len(sequence))
    mapping = {"A": 0, "C": 1, "G": 2, "T": 3}
    indices = [mapping.get(i, 4) for i in sequence]
    return indices

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import layers
from tensorflow import keras

@keras.saving.register_keras_serializable(package="MyLayers", name = "PositionEmbedding")
class PositionEmbedding(layers.Layer):
    def __init__(self, maxlen, embed_dim):
        super().__init__()
        self.maxlen = maxlen
        self.embed_dim = embed_dim
        self.pos_emb = layers.Embedding(input_dim=self.maxlen, output_dim=self.embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-2]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions

    def get_config(self):
        return {
            "maxlen": self.maxlen,
            "embed_dim": self.embed_dim
        }

@keras.saving.register_keras_serializable(package="MyLayers", name = "TransformerEncoderBlock")
class TransformerEncoderBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.rate = rate

        self.att = layers.MultiHeadAttention(num_heads=self.num_heads, key_dim=self.embed_dim)
        self.ffn = Sequential(
            [layers.Dense(self.ff_dim, activation="relu"), layers.Dense(self.embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(self.rate)
        self.dropout2 = layers.Dropout(self.rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        return {
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ff_dim": self.ff_dim,
            "rate": self.rate
        }

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, auc

predictions = {}

for dataset_name in all_datasets:
    data = load_dataset(dataset_name, remove_0_GE_rows = True)

    # Chrom Split
    data_test = data[data["left_anchor"].isin(["chr4", "chr7", "chr8", "chr11"])]
    data_train = data.drop(data_test.index)

    print("Train: " + str(len(data_train)) + " " + str(len(data_train)/len(data)))
    print("Test: " + str(len(data_test)) + " " + str(len(data_test)/len(data)))

    X_RNA_left = np.array(data[RNA_GE_left_columns_to_include])
    X_RNA_right = np.array(data[RNA_GE_right_columns_to_include])
    X_RNA_left_train = np.array(data_train[RNA_GE_left_columns_to_include])
    X_RNA_right_train = np.array(data_train[RNA_GE_right_columns_to_include])
    X_RNA_left_test = np.array(data_test[RNA_GE_left_columns_to_include])
    X_RNA_right_test = np.array(data_test[RNA_GE_right_columns_to_include])

    X_DNA_left = np.array([DNA_sequence_to_indices(sequence) for sequence in data['left_anchor_sequence']])
    X_DNA_right = np.array([DNA_sequence_to_indices(sequence) for sequence in data['right_anchor_sequence']])
    X_DNA_left_train = np.array([DNA_sequence_to_indices(sequence) for sequence in data_train['left_anchor_sequence']])
    X_DNA_right_train = np.array([DNA_sequence_to_indices(sequence) for sequence in data_train['right_anchor_sequence']])
    X_DNA_left_test = np.array([DNA_sequence_to_indices(sequence) for sequence in data_test['left_anchor_sequence']])
    X_DNA_right_test = np.array([DNA_sequence_to_indices(sequence) for sequence in data_test['right_anchor_sequence']])

    y = data['label'].astype(np.float32)
    y_train = data_train['label'].astype(np.float32)
    y_test = data_test['label'].astype(np.float32)

    # Random Split
    # X_RNA_left = np.array(data[RNA_GE_left_columns_to_include])
    # X_RNA_right = np.array(data[RNA_GE_right_columns_to_include])

    # X_DNA_left = np.array([DNA_sequence_to_indices(sequence) for sequence in data['left_anchor_sequence']])
    # X_DNA_right = np.array([DNA_sequence_to_indices(sequence) for sequence in data['right_anchor_sequence']])

    # y = data['label'].astype(np.float32)

    # (X_RNA_left_train,
    #      X_RNA_left_test,
    #      X_RNA_right_train,
    #      X_RNA_right_test,
    #      X_DNA_left_train,
    #      X_DNA_left_test,
    #      X_DNA_right_train,
    #      X_DNA_right_test,
    #      y_train,
    #      y_test) = train_test_split(
    #         X_RNA_left,
    #         X_RNA_right,
    #         X_DNA_left,
    #         X_DNA_right, y, test_size=0.2, random_state=42)

    # print("Train: " + str(len(X_RNA_left_train)) + " " + str(len(X_RNA_right_train)) + " " + str(len(X_DNA_left_train)))
    # print("Test: " + str(len(X_RNA_left_test)) + " " + str(len(X_RNA_right_test)) + " " + str(len(X_DNA_left_test)))

    # model = tf.keras.models.load_model(f"/content/drive/MyDrive/FYP/models/{dataset_name}_model_15_2_layer_64_128_CNN_1_layer_transformer_encoder_4_layer_FFN.hdf5")
    model = tf.keras.models.load_model(f"/content/drive/MyDrive/FYP/models/chromsplit/{dataset_name}_model_15_2_layer_64_128_CNN_1_layer_transformer_encoder_4_layer_FFN.hdf5")

    model.evaluate([X_RNA_left_test, X_RNA_right_test, X_DNA_left_test, X_DNA_right_test], y_test)
    y_pred_prob = model.predict([X_RNA_left_test, X_RNA_right_test, X_DNA_left_test, X_DNA_right_test])

    # calculate scores
    test_auroc = roc_auc_score(y_test, y_pred_prob)
    # calculate roc curves
    test_fpr, test_tpr, _ = roc_curve(y_test, y_pred_prob)

    # calculate pr curves
    test_precision, test_recall, _ = precision_recall_curve(y_test, y_pred_prob)
    test_auprc = auc(test_recall, test_precision)


    predictions[dataset_name] = {
        "test_auroc": test_auroc,
        "test_fpr": test_fpr,
        "test_tpr": test_tpr,

        "test_auprc": test_auprc,
        "test_precision": test_precision,
        "test_recall": test_recall,
    }


HelaS3 loaded
Baseline auprc: 0.14699074074074073
Train: 3441 0.7965277777777777
Test: 879 0.20347222222222222

K562 loaded
Baseline auprc: 0.1474029680365297
Train: 5732 0.8179223744292238
Test: 1276 0.18207762557077625

IMR90 loaded
Baseline auprc: 0.15946843853820597
Train: 3829 0.8480620155038759
Test: 686 0.15193798449612403

GM12878 loaded
Baseline auprc: 0.1566360468799493
Train: 5300 0.8394044979410833
Test: 1014 0.1605955020589167


In [None]:
!pip install -U kaleido



In [None]:
import plotly.graph_objects as go

fig = go.Figure()

fig.add_trace(go.Scatter(x=[0, 0.5, 1.0], y=[0, 0.5, 1.0],
                    mode='lines',
                    name="Baseline: ROC AUC={:.4f}".format(0.500)))
for dataset_name, result in predictions.items():
  fig.add_trace(go.Scatter(x=result["test_fpr"], y=result["test_tpr"],
                    mode='lines+markers',
                    marker_size=3,
                    # mode='lines',
                    name="{}: ROC AUC={:.4f}".format(dataset_name, result["test_auroc"])))
fig.update_layout(
        width=800,
        height=500,
        title=dict(
            text="ROC Curve",
        ),
        xaxis_title=dict(text='False Positive Rate'),
        yaxis_title=dict(text='True Positive Rate'),
        yaxis = dict(dtick=0.2),
        xaxis = dict(dtick=0.2),
    )
fig.show()

fig.write_image(f"roc.png")

In [None]:
import plotly.graph_objects as go

fig = go.Figure()

fig.add_trace(go.Scatter(x=[0, 1], y=[1/6, 1/6],
                    mode='lines',
                    name="Baseline: PRC AUC={:.4f}".format(1/6)))
for dataset_name, result in predictions.items():
  fig.add_trace(go.Scatter(x=result["test_recall"], y=result["test_precision"],
                    mode='lines+markers',
                    marker_size=3,
                    # mode='lines',
                    name="{}: PRC AUC={:.4f}".format(dataset_name, result["test_auprc"])))
fig.update_layout(
        width=800,
        height=500,
        title=dict(
            text="Precision-Recall Curve",
        ),
        xaxis_title=dict(text='Recall'),
        yaxis_title=dict(text='Precision'),
        yaxis = dict(dtick=0.2, rangemode="tozero"),
        xaxis = dict(dtick=0.2),

    )
fig.show()
fig.write_image(f"prc.png")