In [90]:
import numpy as np
import pandas as pd
import os
import subprocess
import tempfile
import pickle
from line_profiler import LineProfiler
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
import warnings
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, matthews_corrcoef, roc_auc_score, confusion_matrix, recall_score, f1_score
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.utils import shuffle
from scipy import stats
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
warnings.filterwarnings("ignore")

### Required Physicochemical Properties

Most sources can be found on AAindex

PKA source: D.R. Lide, Handbook of Chemistry and Physics, 72nd Edition, CRC Press, Boca Raton, FL, 1991. (Sigma Aldrich website)

EIIP: Electron-ion interaction potential (Veljkovic et al., 1985)

LEP: No citation, sorta implicit (NOT VERIFIED!)

Wiener Index: ?

Molecular Mass: Wikipedia, implicit

In [91]:
AMINO_ACID_INDICES = {'A': 0, 'R': 1, 'N': 2, 'D': 3, 'C': 4, 'Q': 5, 'E': 6, 'G': 7, 'H': 8, 'I': 9, 
                      'L': 10, 'K': 11, 'M': 12, 'F': 13, 'P': 14, 'S': 15, 'T': 16, 'W': 17, 'Y': 18, 'V': 19}

PKA_AMINO_GROUP = np.array([9.69, 9.04, 8.80, 9.60, 10.28, 9.13, 9.67, 9.60, 9.17, 9.60,
                            9.60, 8.95, 9.21, 9.13, 10.60, 9.15, 9.10, 9.39, 9.11, 9.62])
PKA_CARBOXYL_GROUP = np.array([2.34, 2.17, 2.02, 1.88, 1.96, 2.17, 2.19, 2.34, 1.82, 2.36,
                               2.36, 2.18, 2.28, 1.83, 1.99, 2.21, 2.09, 2.83, 2.20, 2.32])
EIIP = np.array([0.03731, 0.09593, 0.00359, 0.12630, 0.08292, 0.07606, 0.00580, 0.00499, 0.02415, 0.0000, 
                 0.0000, 0.03710, 0.08226, 0.09460, 0.01979, 0.08292, 0.09408, 0.05481, 0.05159, 0.00569])
LONE_ELECTRON_PAIRS = np.array([0, 0, 1, 2, 1, 1, 2, 0, 1, 0, 
                                0, 0, 0, 0, 0, 1, 1, 0, 1, 0])
WIENER_INDEX = np.array([0.3466, 0.1156, 0.3856, 0.2274, 0.0501, 0.6379, 0.1938, 0.1038, 0.2013,
                       0.2863, 0.1071, 0.7767, 0.7052, 0.3419, 0.0957, 0.4375, 0.9320, 0.1000, 0.1969, 0.9000])
MOLECULAR_MASS = np.array([89.094, 174.203, 132.119, 133.104, 121.154, 146.146, 147.131, 75.067, 155.156, 131.175,
                           131.175, 146.189, 149.208, 165.192, 115.132, 105.093, 119.119, 204.228, 181.191, 117.148])

# pKa_amino_group = np.array([9.87, 8.99, 8.72, 9.90, 10.70, 9.13, 9.47, 9.78,
#                            9.33, 9.76, 9.74, 9.06, 9.28, 9.31, 10.64, 9.21, 9.10, 9.41, 9.21, 9.74])
# pKa_carboxyl_group = np.array([2.35, 1.82, 2.14, 1.99, 1.92, 2.17, 2.10, 2.35,
#                               1.80, 2.32, 2.33, 2.16, 2.13, 2.20, 1.95, 2.19, 2.09, 2.46, 2.20, 2.29])
# eiip = np.array([0.0373, 0.0959, 0.0036, 0.1263, 0.0829, 0.0761, 0.0057, 0.0050, 0.0242,
#                 0.0000, 0.0000, 0.0371, 0.0823, 0.0946, 0.0198, 0.0829, 0.0941, 0.0548, 0.0516, 0.0058])
# lone_electron_pairs = np.array(
#     [0, 0, 1, 2, 1, 1, 2, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0])
# winer_index = np.array([0.3466, 0.1156, 0.3856, 0.2274, 0.0501, 0.6379, 0.1938, 0.1038, 0.2013,
#                        0.2863, 0.1071, 0.7767, 0.7052, 0.3419, 0.0957, 0.4375, 0.9320, 0.1000, 0.1969, 0.9000])
# molecular_mass = np.array([71.078, 156.186, 114.103, 115.087, 103.143, 128.129, 129.114, 57.051, 137.139,
#                           113.158, 113.158, 128.172, 131.196, 147.174, 97.115, 87.077, 101.104, 186.210, 163.173, 99.131])

PP_LIST = [PKA_AMINO_GROUP, PKA_CARBOXYL_GROUP, EIIP, LONE_ELECTRON_PAIRS, WIENER_INDEX, MOLECULAR_MASS]
# PP_LIST = [pKa_amino_group, pKa_carboxyl_group, eiip, lone_electron_pairs, winer_index, molecular_mass]

In [92]:
# Amino Acid Composition (AAC) groups - Polarity Charge
# C1; C2; C3; C4 
# (polar amino acid with positive charge, polar amino acid with negative charge, noncharged
# polar amino acid, nonpolar amino acid).

AAC_C1 = ['G', 'A', 'V', 'L', 'I', 'F', 'W', 'M', 'P']
AAC_C2 = ['S', 'T', 'C', 'Y', 'N', 'Q']
AAC_C3 = ['D', 'E']
AAC_C4 = ['R', 'K', 'H']

AAC_C_LIST = [AAC_C1, AAC_C2, AAC_C3, AAC_C4]

# Amino Acid Composition (AAC) groups - Hydrohpobicity
# H1;H2;H3;H4  (strong hydrophobic residue, weak hydrophobic residue, strong hydrophilic residue, weak hydrophilic residue).
# This scale is obtained from Kyte and Doolittle (1982). 
# K&D scale from 0 to +-2.0 is considered weak, >2.0 is strong hydrophobicity, and <-2.0 is strong hydrophilic. 


AAC_H1 = ['I', 'V', 'L', 'F', 'C']
AAC_H2 = ['M', 'A']
AAC_H3 = ['H', 'Q', 'N', 'E', 'D', 'K', 'R']
AAC_H4 = ['G', 'T', 'S', 'W', 'Y', 'P']

AAC_H_LIST = [AAC_H1, AAC_H2, AAC_H3, AAC_H4]


#### PP Matrix stored as a constant

In [93]:
# rows: normalized pp properties 
# columns: amino acids
def create_pp_matrix() -> np.ndarray:
    pp_matrix = np.empty((len(PP_LIST), len(AMINO_ACID_INDICES)), dtype=float)
    for i, pp in enumerate(PP_LIST):
        max_val = np.max(pp)
        min_val = np.min(pp)
        pp_matrix[i] = (pp - min_val) / (max_val - min_val)
    
    return pp_matrix

# Constant PP_MATRIX
PP_MATRIX = create_pp_matrix()
# print(PP_MATRIX)

### OBV

Source: Shen, Juwen, et al. "Predicting protein–protein interactions based only on sequences information." Proceedings of the National Academy of Sciences 104.11 (2007): 4337-4341. (Supp. information)

Note: We use 7 classes here instead of 6. It was not mentioned why they used 6 classes only, when the source mentioned that amino acids are grouped into 7 classes

In [94]:
obv_classes = {
    'A' : 0, 'G' : 0, 'V' : 0,
    'I': 1, 'L': 1, 'F': 1, 'P': 1,
    'Y': 2, 'M': 2, 'T': 2, 'S': 2,
    'H': 3, 'N': 3, 'Q': 3, 'W': 3,
    'R': 4, 'K': 4,
    'D': 5, 'E': 5,
    'C': 6
}

def generate_obv(amino_acid):
    temp = np.zeros(7)
    temp[obv_classes.get(amino_acid)] = 1
    return temp

### Get Window Instance from sequence

In [95]:
# takes in a string, then 
# extract list of instances by sliding a window through the sequence
def get_instances_from_seq(seq : str, window_size : int = 9) -> list :
    instances = list()
    for i in range(len(seq) - window_size + 1):
        instances.append(seq[i:i+window_size])
    return instances
    

## Generate PSSM-PP

In [96]:
"""Generate PSSM using psiblast from a given sequence."""
def generate_pssm(input_seq: str, num_iterations = 3) -> np.ndarray:
    DB_PATH = "./databases/uniprot_sprot.fasta"
    output_pssm = "output.pssm"

    # Creating a temporary fasta file for input
    with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".fasta") as temp_fasta:
        SeqIO.write([SeqRecord(Seq(input_seq))], temp_fasta, "fasta")
        temp_fasta_path = temp_fasta.name

    # Running psiblast
    try:
        subprocess.run(["psiblast", "-query", temp_fasta_path, "-db", DB_PATH, 
                        "-out_ascii_pssm", output_pssm, "-num_iterations", str(num_iterations), "-evalue", "0.001"], 
                        check=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
    finally:
        os.remove(temp_fasta_path)

    # Reading PSSM output
    try:
        pssm_df = pd.read_csv(output_pssm, delim_whitespace=True, skiprows=3, header=None)
        os.remove(output_pssm)  # Clean up PSSM file after reading
        pssm_array = pssm_df.iloc[:-5, 2:22].to_numpy(dtype=int)
        return pssm_array
    except FileNotFoundError:
        print(f"Error: PSSM file not found. Input Sequence: {input_seq}")
        return None
    

# Rescale pssm using sigmoid
def rescale_pssm(input_pssm) -> np.ndarray:
    input_pssm = 1/(1 + np.exp(-input_pssm))
    return input_pssm

# Get only the pssm rows that are relevant to the sequence
def get_sliced_pssm(original_pssm : np.ndarray, start_index : int, window_size : int = 9):
    return original_pssm[start_index : start_index + window_size, :]

# generate_pssm("KPKNKDKDKKVPEPDNKKKKPKKEEEQKWKWWEEERYPEGIKWKFLEHKGPVFAPPYEPLPENVKFYYDGKVMKLSPKAEEVATFFAKMLDHEYTTKEIFRKNFFKDWRKEMTNEEKNIITNLSKCDFTQMSQYFKAQTEARKQMSKEEKLKIKEENEKLLKEYGFCIMDNHKERIANFKIEPPGLFRGRGNHPKMGMLKRRIMPEDIIINCSKDAKVPSPPPGHKWKEVRHDNKVTWLVSWTENIQGSIKYIMLNPSSRIKGEKDWQKYETARRLKKCVDKIRNQYREDWKSKEMKVRQRAVALYFIDKLALRAGNEKEEGETADTVGCCSLRVEHINLHPELDGQEYVVEFDFLGKDSIRYYNKVPVEKRVFKNLQLFMENKQPEDDLFDRLNTGILNKHLQDLMEGLTAKVFRTYNASITLQQQLKELTAPDENIPAKILSYNRANRAVAILCNHQRAPPKTFEKSMMNLQTKIDAKKEQLADARRDLKSAKADAKVMKDAKTKKVVESKKKAVQRLEEQLMKLEVQATDREENKQIALGTSKLNFLDPRITVAWCKKWGVPIEKIYNKTQREKFAWAIDMADEDYE")

In [97]:
def create_pssm_pp(pssm_matrix : np.ndarray, pp_matrix : np.ndarray) -> np.ndarray:
    return pp_matrix @ pssm_matrix

### Amino Acid Correlation

In [98]:
# AAC_PC takes in a sequence of 9 amino acids then outputs a list of 4 values
def calculate_AAC_PC(seq : str):
    window_size = len(seq)
    
    def get_c_i():
        c_i = np.zeros((4, window_size - 1), dtype=int)
        for gap in range(1, window_size):
            for j in range(window_size - gap):
                for index, aac_class in enumerate(AAC_C_LIST):
                    if seq[j] in aac_class and seq[j + gap] in aac_class:
                        c_i[index][gap - 1] += 1
        # print(c_i)
        return c_i
    
    def get_n_i():
        n_i = [np.sum(seq.count(a) for a in aac_class) for aac_class in AAC_C_LIST]
        # print(n_i)
        return np.array(n_i)
    
    c_i = get_c_i()
    n_i = get_n_i()
    
    output_aac_list = list()
    for i in range(0, 4):
        sum = 0
        for k in range(0, window_size - 1):
            first_term = ((c_i[i][k] / (window_size - k)) - (n_i[i]**2 / window_size**2))
            if np.isnan(first_term):
                first_term = 0
            second_term = np.square(first_term) / (2 * (n_i[i]**2 / window_size**2))
            if np.isnan(second_term):
                second_term = 0
            sum += (first_term + second_term)
        output_aac_list.append(sum)
    
    # print(output_aac_list)
    return output_aac_list

def calculate_AAC_H(seq : str):
    window_size = len(seq)
    def get_h_i():
        h_i = np.zeros((4, window_size - 1), dtype=int)
        for gap in range(1, window_size):
            for j in range(window_size - gap):
                for index, aac_class in enumerate(AAC_H_LIST):
                    if seq[j] in aac_class and seq[j + gap] in aac_class:
                        h_i[index][gap - 1] += 1
        # print(h_i)
        return h_i
    
    def get_m_i():
        m_i = [np.sum(seq.count(a) for a in aac_class) for aac_class in AAC_H_LIST]
        # print(m_i)
        return np.array(m_i)
    
    h_i = get_h_i()
    m_i = get_m_i()
    
    output_aac_list = list()
    for i in range(0, 4):
        sum = 0
        for k in range(0, window_size - 1):
            first_term = ((h_i[i][k] / (window_size - k)) - (m_i[i]**2 / window_size**2))
            first_term = 0 if np.isnan(first_term) else first_term
            second_term = np.square(first_term) / (2 * (m_i[i]**2 / window_size**2))
            second_term = 0 if np.isnan(second_term) else second_term
            sum += (first_term + second_term)
        output_aac_list.append(sum)
    
    # print(output_aac_list)
    return output_aac_list

### OBV

In [99]:
def get_full_obv(seq: str):
    full_obv = np.zeros((len(seq), 7)) 
    for idx, aa in enumerate(seq):
        full_obv[idx] = generate_obv(aa)  
    return full_obv.flatten()

### Pre-generate PSSMs and store to a numpy file

In [100]:
# Generate pssms for a list of sequences, then save them to a pickle file for future use
def pre_generate_pssm(input_df, file_name:str):
    pssm_list = list()
    for seq in input_df['seq']:
        pssm = generate_pssm(seq)
        pssm_list.append(pssm)
    
    with open(file_name, 'wb') as f:
        pickle.dump(pssm_list, f)
    return pssm_list
    
# list_of_train_pssms = pre_generate_pssm(pd.read_csv("./DRNA_TRAIN.csv"), "generated_pssms_train.pkl")
# print(len(list_of_train_pssms))
# list_of_test_pssms = pre_generate_pssm(pd.read_csv("./DRNA_TEST.csv"), "generated_pssms_test.pkl")
# print(len(list_of_test_pssms))

### Concatenate all features

In [122]:
# Assuming these functions are correctly implemented
def get_all_features_for_one_sequence(full_seq: str, dna_label: str, input_pssm : np.ndarray, seq_diso_values, domain_values, input_hhm : np.ndarray, window_size: int = 9) -> list:
    seq_list = get_instances_from_seq(full_seq, window_size=window_size)  # Assuming this returns a list of sequences of length window_size
    pssm = rescale_pssm(input_pssm)  # Assuming this returns a PSSM for the full_seq
    # pssm = np.array(input_pssm)[:, :20]
    # print(pssm.shape)

    all_features_list = []  # Use a list to maintain structure

    for index, seq in enumerate(seq_list):
        # print(f"Processing sequence {index} of {len(seq_list)}")
        current_residue_label = dna_label[index + window_size // 2]
        if current_residue_label == '2':
            # print(f"Residue unknown at index {index}, skipping")
            continue
        
        pssm_pp_features = create_pssm_pp(get_sliced_pssm(pssm, index, window_size).T, PP_MATRIX).flatten()
        aac_features = np.append(calculate_AAC_PC(seq), calculate_AAC_H(seq))
        obv_features = get_full_obv(seq) 
        diso_features = [0 if x < 0.5 else 1 for x in seq_diso_values[index:index+window_size]]
        domain_features = domain_values[index:index+window_size]
        # hhm_features = np.array(input_hhm[index:index+window_size]).flatten()
        
        # all_features = np.concatenate([pssm_pp_features, aac_features, obv_features, domain_features])
        all_features = np.concatenate([pssm_pp_features, aac_features, obv_features, diso_features, domain_features])
        all_features_list.append((all_features, current_residue_label))

    return all_features_list

# Generate feature vectors for each sequence in the training dataset
def get_all_features_for_dataset(dataset: pd.DataFrame, generated_pssm_file, generated_diso_file, generated_domain_file, generated_hhm_file, window_size : int = 9) -> list:
    full_pssm = list(pickle.load(open(generated_pssm_file, 'rb'))) 
    full_diso_values = list(pickle.load(open(generated_diso_file, 'rb')))   
    full_domain_values = list(pickle.load(open(generated_domain_file, 'rb')))  
    full_hhm_values = list(pickle.load(open(generated_hhm_file, 'rb')))
    
    all_features_list = []
    for index, row in dataset.iterrows():
        if full_pssm[index] is None:
            print(f"Skipping sequence at index {index} due to missing PSSM")
            continue
        if full_diso_values[index] is None:
            print(f"Diso value not available! Index: {index}")
            continue
        # if full_domain_values[index] is None:
        #     print(f"Domain value not available! Index: {index}")
        #     continue
        try:
            all_features_list.extend(get_all_features_for_one_sequence(full_seq=row['seq'], 
                                                                       dna_label=row['dna_label'],
                                                                       input_pssm = full_pssm[index], 
                                                                       seq_diso_values=full_diso_values[index],
                                                                       domain_values=full_domain_values[index],
                                                                       input_hhm=full_hhm_values[index],
                                                                       window_size=window_size))
        except FileNotFoundError as e:
            print(f"Error processing sequence at index {index}: {e}")
            continue
    return all_features_list

In [127]:
# Assuming training_dataset is loaded correctly
training_dataset = pd.read_csv("DRNA_TRAIN.csv")
test_dataset = pd.read_csv("DRNA_TEST.csv")

all_training_features = get_all_features_for_dataset(training_dataset, "generated_pssms_train.pkl", "disorder_preds_train.pkl", "dom_annotations_train.pkl", "generated_hhm_train.pkl")
all_test_features = get_all_features_for_dataset(test_dataset, "generated_pssms_test.pkl", "disorder_preds_test.pkl", "dom_annotations_test.pkl", "generated_hhm_test.pkl")

# Separate into X_train and y_train
X_train = [features for features, label in all_training_features]
y_train = [label for features, label in all_training_features]
X_test = [features for features, label in all_test_features]
y_test = [label for features, label in all_test_features]

# Optionally convert to numpy arrays for compatibility with scikit-learn
X_train = np.array(X_train)
y_train = np.array(y_train)
X_test = np.array(X_test)
y_test = np.array(y_test)


print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

Skipping sequence at index 119 due to missing PSSM
Skipping sequence at index 179 due to missing PSSM
Skipping sequence at index 211 due to missing PSSM
Skipping sequence at index 234 due to missing PSSM
Skipping sequence at index 264 due to missing PSSM
Skipping sequence at index 268 due to missing PSSM
Skipping sequence at index 293 due to missing PSSM
Skipping sequence at index 349 due to missing PSSM
Skipping sequence at index 357 due to missing PSSM
Skipping sequence at index 368 due to missing PSSM
Skipping sequence at index 387 due to missing PSSM
Skipping sequence at index 391 due to missing PSSM
Skipping sequence at index 403 due to missing PSSM
Skipping sequence at index 405 due to missing PSSM
Skipping sequence at index 410 due to missing PSSM
Skipping sequence at index 422 due to missing PSSM
Skipping sequence at index 436 due to missing PSSM
Skipping sequence at index 439 due to missing PSSM
Skipping sequence at index 452 due to missing PSSM
Skipping sequence at index 5 du

# **Compilation of results**

## No balancing

In [128]:
def highlight_max(s):
    is_max = s == s.max()
    return ['background-color: purple' if v else '' for v in is_max]

# # Train the model
# rf_model = RandomForestClassifier(random_state=42)
# rf_model.fit(X_train, y_train)

# # Get prediction probabilities
# predictions = rf_model.predict_proba(X_test)
# results = []

# # Iterate over thresholds from 0.1 to 0.96 with a step of 0.02
# for threshold in np.arange(0.1, 0.96, 0.02):
#     y_pred = ['1' if p[1] >= threshold else '0' for p in predictions]
    
#     mcc = matthews_corrcoef(y_test, y_pred)
    
#     tn, fp, fn, tp = confusion_matrix(y_test, y_pred, labels=['0', '1']).ravel()
    
#     sensitivity = tp / (tp + fn) if (tp + fn) != 0 else 0
#     specificity = tn / (tn + fp) if (tn + fp) != 0 else 0
    
#     results.append((threshold, mcc, sensitivity, specificity))


# results_df = pd.DataFrame(results, columns=['Threshold', 'MCC', 'Sensitivity', 'Specificity'])
# results_df = results_df.round(2)
# styled_results_df = results_df.style.format("{:.2f}").apply(highlight_max, subset=['MCC'])
# display(styled_results_df)

## Random Undersampling

In [129]:
# Subsample X_train and y_train such that they contain equal amounts of positive and negative samples
# Assuming y_train contains binary labels where 1 is positive and 0 is negative
positive_indices = np.where(y_train_nn == '1')[0]
negative_indices = np.where(y_train_nn == '0')[0]


# # Determine the number of samples to subsample based on the smaller class
n_samples = min(positive_indices.shape[0], negative_indices.shape[0]) 

# Randomly select n_samples from both positive and negative indices
positive_subsample_indices = np.random.choice(positive_indices, n_samples, replace=False)
negative_subsample_indices = np.random.choice(negative_indices, n_samples, replace=False)
# unknkown_subsample_indices = np.random.choice(unknown_indices, n_samples, replace=True)

# Concatenate the subsampled indices and then use them to create subsampled X_train and y_train
subsample_indices = np.concatenate([positive_subsample_indices, negative_subsample_indices])
X_train_subsampled = X_train[subsample_indices]
y_train_subsampled = y_train[subsample_indices]
print(X_train_subsampled.shape, y_train_subsampled.shape)

shuffle_indices = np.random.permutation(len(X_train_subsampled))
X_train_subsampled = X_train_subsampled[shuffle_indices]
y_train_subsampled = y_train_subsampled[shuffle_indices]

(14222, 143) (14222,)


In [105]:
# Train the model
# rf_model = RandomForestClassifier()
# rf_model.fit(X_train_subsampled, y_train_subsampled)

# # Get prediction probabilities
# predictions = rf_model.predict_proba(X_test)
# results = []

# # Iterate over thresholds from 0.1 to 0.96 with a step of 0.02
# for threshold in np.arange(0.1, 0.96, 0.02):
#     y_pred = ['1' if p[1] >= threshold else '0' for p in predictions]
    
#     mcc = matthews_corrcoef(y_test, y_pred)
    
#     tn, fp, fn, tp = confusion_matrix(y_test, y_pred, labels=['0', '1']).ravel()
    
#     sensitivity = tp / (tp + fn) if (tp + fn) != 0 else 0
#     specificity = tn / (tn + fp) if (tn + fp) != 0 else 0
    
#     results.append((threshold, mcc, sensitivity, specificity))


# results_df = pd.DataFrame(results, columns=['Threshold', 'MCC', 'Sensitivity', 'Specificity'])
# results_df = results_df.round(2)
# styled_results_df = results_df.style.format("{:.2f}").apply(highlight_max, subset=['MCC'])
# display(styled_results_df)


## Reliability Index

In [106]:
# Train the model
# rf_model = RandomForestClassifier(random_state=42)
# rf_model.fit(X_train_subsampled, y_train_subsampled)

# threshold = 0.5

# # Get prediction probabilities
# predictions = rf_model.predict_proba(X_test)
# y_pred = [1 if p[1] >= threshold else 0 for p in predictions]
# print(y_pred)

# phi_threshold = 0.5
# scaling_const = 50

# d_score = np.abs(predictions[:, 1] - phi_threshold)
# # print(d_score)
# reliability_index = np.clip(scaling_const * (d_score), -1, 10)

# y_pred = np.array(y_pred)

# # get all correctly predicted and incorrectly predicted positive indices
# correct_positive_indices = np.where((y_test == '1') & (y_pred == 1))[0]
# incorrect_positive_indices = np.where((y_test == '1') & (y_pred == 0))[0]
# print(correct_positive_indices.shape, incorrect_positive_indices.shape)

# correct_positive_ri = reliability_index[correct_positive_indices]
# incorrect_positive_ri = reliability_index[incorrect_positive_indices]

# correct_negative_indices = np.where((y_test == '0') & (y_pred == 0))[0]
# incorrect_negative_indices = np.where((y_test == '0') & (y_pred == 1))[0]
# print(correct_negative_indices.shape, incorrect_negative_indices.shape)

# correct_negative_ri = reliability_index[correct_negative_indices]
# incorrect_negative_ri = reliability_index[incorrect_negative_indices]

# # Do a boxplot of the reliability index for correctly predicted and incorrectly predicted positive samples
# plt.figure(figsize=(10, 6))
# plt.boxplot([correct_positive_ri, incorrect_positive_ri, correct_negative_ri, incorrect_negative_ri], labels=['TP', 'FP', 'TN', 'FN'])
# plt.xlabel('Prediction Outcome')
# plt.ylabel('Reliability Index')
# plt.title('Reliability Index')
# plt.show()

In [107]:
#  Plot a graph of accuracy against reliability index
# reliability_index = np.round(np.array(reliability_index))
# accuracies = []
# ri_sample_percentages = []

# for ri in range(0, 11):
#     # get all indices in reliability index with score ri
#     ri_indices = np.where(reliability_index == ri)[0]
#     # get the predictions for these indices
#     ri_predictions = np.array(y_pred[ri_indices]).astype(int)
#     # get the actual labels for these indices
#     ri_actual = np.array(y_test[ri_indices]).astype(int)

#     ri_sample_percentages.append(np.round(ri_indices.shape[0] / y_test.shape[0] * 100, 1))
    
#     # calculate the accuracy for these indices
#     accuracy = accuracy_score(ri_actual, ri_predictions)
#     accuracies.append(accuracy)
    
# accuracies = np.round(np.array(accuracies), 2)
# print(accuracies)
# print(ri_sample_percentages)

# # for ri in range(0, 11):
# #     # get all indices in reliability index with score ri
# #     ri_indices = np.where((reliability_index == ri) & (y_test == '1'))[0]
# #     # get the predictions for these indices
# #     ri_predictions = np.array(y_pred[ri_indices]).astype(int)
# #     # get the actual labels for these indices
# #     ri_actual = np.array(y_test[ri_indices]).astype(int)

# #     ri_sample_percentages.append(np.round(ri_indices.shape[0] / y_test.shape[0] * 100, 1))
    
# #     # calculate the accuracy for these indices
# #     accuracy = accuracy_score(ri_actual, ri_predictions)
# #     accuracies.append(accuracy)
    
# # accuracies = np.round(np.array(accuracies), 2)

# plt.figure(figsize=(10, 6))
# plt.plot(np.arange(0, 11), accuracies, marker = 'o')
# plt.title('Accuracy vs Reliability Index')
# plt.xlabel('Reliability Index')
# plt.ylabel('Accuracy (%)')
# plt.grid(True)
# plt.show()

# add another x-axis for the percentage of samples in each reliability index

## Using Pytorch

In [108]:
# device = torch.device("cpu") # "cuda"
 
# # class SimpleCNN(nn.Module):
# #     def __init__(self):
# #         super(SimpleCNN, self).__init__()

# #         # Branch for the 125D input vector
# #         self.branch_54D = nn.Sequential(
# #             nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
# #             nn.ReLU(),
# #             nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
# #             nn.ReLU(),
# #             nn.MaxPool1d(kernel_size=2, stride=2)
# #         )

# #         # Branch for the 8D input vector
# #         self.branch_8D = nn.Sequential(
# #             nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
# #             nn.ReLU(),
# #             nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
# #             nn.ReLU(),
# #             nn.MaxPool1d(kernel_size=2, stride=2)
# #         )

# #         # Combined branch
# #         combined_input_size = 32 * 27 + 32 * 4
# #         self.combined_branch = nn.Sequential(
# #             nn.Linear(combined_input_size, 128),
# #             nn.ReLU(),
# #             nn.Linear(128, 64),
# #             nn.ReLU(),
# #             nn.Linear(64, 1)  # Adjust output size based on your needs
# #         )

# #     def forward(self, x_125D, x_8D):
# #         # Process 125D input vector
# #         x1 = x_125D.unsqueeze(1)  # Add channel dimension
# #         x1 = self.branch_54D(x1)
# #         x1 = x1.view(x1.size(0), -1)  # Flatten

# #         # Process 8D input vector
# #         x2 = x_8D.unsqueeze(1)  # Add channel dimension
# #         x2 = self.branch_8D(x2)
# #         x2 = x2.view(x2.size(0), -1)  # Flatten

# #         # Combine the two branches
# #         x_combined = torch.cat((x1, x2), dim=1)
# #         output = self.combined_branch(x_combined)

# #         return output

# num_epochs = 10

# class SimpleCNN(nn.Module):
#     def __init__(self):
#         super(SimpleCNN, self).__init__()

#         # Define branches for each vector
#         self.branch_54D = nn.Sequential(
#             nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool1d(kernel_size=2, stride=2),
#             nn.Dropout(0.5)
#         )
        
#         self.branch_8D = nn.Sequential(
#             nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool1d(kernel_size=2, stride=2),
#             nn.Dropout(0.5)
#         )

#         self.branch_63D = nn.Sequential(
#             nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool1d(kernel_size=2, stride=2),
#             nn.Dropout(0.5)
#         )
        
#         combined_input_size = (32 * 62) + (32 * 4) + (32 * 30)
        
#         self.combined_branch = nn.Sequential(
#             nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.Conv1d(in_channels=8, out_channels=16, kernel_size=3, padding=1),
#             nn.ReLU(),
#             nn.MaxPool1d(kernel_size=2, stride=2),
#             nn.Dropout(0.5)
#         )
        
#         # Output size must be adjusted according to the actual output size after combined_branch
#         self.linear = nn.Sequential(
#             nn.Linear(15872 , 1024),  # Assuming 64 channels with a length of 30 after pooling and dropout
#             nn.ReLU(),
#             nn.Linear(1024, 512),
#             nn.ReLU(),
#             nn.Linear(512, 128),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(128, 1)  # Output layer, adjust according to your needs
#         )

#     def forward(self, x_54D, x_8D, x_63D):
#         x1 = self.branch_54D(x_54D.unsqueeze(1))
#         x2 = x_8D.unsqueeze(1)
#         x3 = self.branch_63D(x_63D.unsqueeze(1))

#         # Flatten and concatenate
#         x1 = x1.view(x1.size(0), -1)
#         x2 = x2.view(x2.size(0), -1)
#         x3 = x3.view(x3.size(0), -1)
#         x_combined = torch.cat((x1, x2, x3), dim=1)

#         # Combined convolutional processing
#         x_combined = x_combined.view(x_combined.size(0), 1, -1)  # Reshape for Conv1d input
#         x_combined = self.combined_branch(x_combined)
#         x_combined = x_combined.view(x_combined.size(0), -1)

#         # Final linear layers
#         output = self.linear(x_combined)
        
#         return output
# def train_cnn():
#     for epoch in range(num_epochs):
#         cnn.train()  # Set the model to training mode
#         running_loss = 0.0
        
#         for X_54D_batch, X_8D_batch, X_63D_batch, y_batch in train_loader:
#             # X_54D_batch, y_batch = X_54D_batch.to(device), y_batch.to(device)
#             X_54D_batch, X_8D_batch, X_63D_batch, y_batch = X_54D_batch.to(device), X_8D_batch.to(device), X_63D_batch.to(device), y_batch.to(device)
            
#             optimizer.zero_grad()  # Zero the parameter gradients
            
#             # Forward pass
#             # outputs = cnn(X_54D_batch)
#             outputs = cnn(X_54D_batch, X_8D_batch, X_63D_batch)
#             outputs = outputs.squeeze()  # Remove the extra dimension for loss calculation
            
#             # Compute the loss
#             loss = criterion(outputs, y_batch)
            
#             # Backward pass and optimization
#             loss.backward()
#             optimizer.step()
            
#             # Accumulate loss
#             running_loss += loss.item() * X_54D_batch.size(0)
        
#         epoch_loss = running_loss / len(train_loader.dataset)
#         print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# X_train_54D = torch.tensor(X_train_subsampled[:, :54], dtype=torch.float32)
# X_train_8D = torch.tensor(X_train_subsampled[:, 54:62], dtype=torch.float32)
# X_train_63D = torch.tensor(X_train_subsampled[:, 62:], dtype=torch.float32)

# # X_train = torch.tensor(X_train_subsampled, dtype=torch.float32)

# y_train = torch.tensor(y_train_subsampled.astype(int), dtype=torch.float32) 
# # train_dataset = TensorDataset(X_train, y_train)
# train_dataset = TensorDataset(X_train_54D, X_train_8D, X_train_63D, y_train)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# cnn = SimpleCNN()
# cnn.to(device)

# # Define loss function and optimizer
# criterion = nn.BCEWithLogitsLoss()
# optimizer = torch.optim.Adam(cnn.parameters(), lr=0.001)
# train_cnn()

In [130]:
y_train_subsampled = y_train_subsampled.astype(float)

In [138]:
# Define the neural network
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(143, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Linear(512, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Hyperparameters
num_epochs = 10
batch_size = 64
learning_rate = 0.001

# Create the model, criterion, and optimizer
model = SimpleNN()
criterion = nn.BCEWithLogitsLoss()  # This combines a Sigmoid layer and the BCELoss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Assuming X_train_subsampled and y_train_subsampled are numpy arrays
X_train_nn = torch.tensor(X_train_subsampled, dtype=torch.float32)
y_train_nn = torch.tensor(y_train_subsampled, dtype=torch.float32)
train_dataset = TensorDataset(X_train_nn, y_train_nn)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Training function
def train_nn():
    for epoch in range(num_epochs):
        model.train()  # Set the model to training mode
        running_loss = 0.0
        
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch).squeeze() 
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * X_batch.size(0)
    
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

# Testing function with MCC, Sensitivity, and Specificity
def test_nn():
    model.eval()  # Set the model to evaluation mode
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch).squeeze()
            predicted = torch.round(torch.sigmoid(outputs))  # Apply Sigmoid and round to get binary output
            all_preds.extend(predicted.tolist())
            all_targets.extend(y_batch.tolist())
    
    # Convert to tensors
    all_preds = torch.tensor(all_preds)
    all_targets = torch.tensor(all_targets)
    
    # Calculate MCC
    mcc = matthews_corrcoef(all_targets, all_preds)
    
    # Calculate confusion matrix
    tn, fp, fn, tp = confusion_matrix(all_targets, all_preds).ravel()
    
    # Calculate Sensitivity and Specificity
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)
    
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    
    print(f'Accuracy: {accuracy:.4f}')
    print(f'MCC: {mcc:.4f}')
    print(f'Sensitivity: {sensitivity:.4f}')
    print(f'Specificity: {specificity:.4f}')

# Train and test the model
train_nn()
test_nn()


Epoch 1/10, Loss: 0.6335
Epoch 2/10, Loss: 0.5816
Epoch 3/10, Loss: 0.5748
Epoch 4/10, Loss: 0.5667
Epoch 5/10, Loss: 0.5578
Epoch 6/10, Loss: 0.5525
Epoch 7/10, Loss: 0.5380
Epoch 8/10, Loss: 0.5334
Epoch 9/10, Loss: 0.5210
Epoch 10/10, Loss: 0.5067
Accuracy: 0.6651
MCC: 0.1766
Sensitivity: 0.7222
Specificity: 0.6620


In [111]:
# # test our cnn model
# # X_test = torch.tensor(X_test, dtype=torch.float32)

# X_test_54D = torch.tensor(X_test[:, :54], dtype=torch.float32)
# X_test_8D = torch.tensor(X_test[:, 54:62], dtype=torch.float32)
# X_test_63D = torch.tensor(X_test[:, 62:], dtype=torch.float32)
# y_test = torch.tensor(y_test, dtype=torch.float32)

# test_dataset = TensorDataset(X_test_54D, X_test_8D, X_test_63D, y_test)
# # test_dataset = TensorDataset(X_test, y_test)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# def test_cnn():
#     cnn.eval()  # Set the model to evaluation mode
#     test_loss = 0.0
#     correct = 0
#     total = 0
#     all_predictions = []
#     all_targets = []

#     with torch.no_grad():
#         for X_batch_54D, X_batch_8D, X_batch_63D, y_batch in test_loader:
#             # X_batch, y_batch = X_batch.to(device), y_batch.to(device)
#             X_batch_54D, X_batch_8D, X_batch_63D, y_batch = X_batch_54D.to(device), X_batch_8D.to(device), X_batch_63D.to(device), y_batch.to(device)   

#             outputs = cnn(X_batch_54D, X_batch_8D, X_batch_63D)
#             outputs = outputs.squeeze()

#             predicted = torch.round(torch.sigmoid(outputs))
#             all_predictions.extend(predicted.cpu().numpy())
#             all_targets.extend(y_batch.cpu().numpy())
#             total += y_batch.size(0)
#             correct += (predicted == y_batch).sum().item()

#     test_accuracy = correct / total
#     mcc = matthews_corrcoef(all_targets, all_predictions)

#     tn, fp, fn, tp = confusion_matrix(all_targets, all_predictions).ravel()
#     sensitivity = tp / (tp + fn)
#     specificity = tn / (tn + fp)

#     print(f'Test Accuracy: {test_accuracy:.4f}')
#     print(f'Test MCC: {mcc:.4f}')
#     print(f'Test Sensitivity: {sensitivity:.4f}')
#     print(f'Test Specificity: {specificity:.4f}')

# test_cnn()
