In [None]:
# ANN Training
import sys
import glob
import gzip
import json
import math
import os
import argparse
import numpy as np
import pandas as pd
import random
import copy
import time
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, BatchNormalization
from tensorflow.keras.layers import Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras import layers, models

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.metrics import precision_recall_curve

from Bio.Seq import Seq
from Bio import SeqIO

In [None]:
WINDOW_SIZE= 8192 # 8192
MODEL_SIZE="7b"
SUBSET_METHOD="all" # "random, top, bottom, balanced, all"
REGION = "BRCA1_DATA" # BRCA1_DATA, RovHer_BRCA1 or RovHer_LDLR, "both"
LAYER="blocks.28.mlp.l3"
COMBO="delta"

# Directories
INPUT_DIR = Path("/mnt/nfs/rigenenfs/shared_resources/biobanks/UKBIOBANK/pangk/evo2/BRCA1_LDLR")
INPUT_DIR.mkdir(parents=True, exist_ok=True)

OUTPUT_DIR = Path(f"/mnt/nfs/rigenenfs/shared_resources/biobanks/UKBIOBANK/pangk/evo2/NN/BRCA1_LDLR_{COMBO}")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Input data: 
delta_file = f"{INPUT_DIR}/{REGION}_{LAYER}_delta.csv"
delta_rev_file = f"{INPUT_DIR}/{REGION}_{LAYER}_delta_rev.csv"
ref_file = f"{INPUT_DIR}/{REGION}_{LAYER}_ref.csv"
var_file = f"{INPUT_DIR}/{REGION}_{LAYER}_var.csv"
ref_rev_file = f"{INPUT_DIR}/{REGION}_{LAYER}_ref_rev.csv"
var_rev_file = f"{INPUT_DIR}/{REGION}_{LAYER}_var_rev.csv"

if REGION == "BRCA1_DATA":
    file = "/mnt/nfs/rigenenfs/workspace/pangk/Softwares/evo2/data/BRCA1_DATA.xlsx" # training variants + labels
else:
    DIR="/mnt/nfs/rigenenfs/shared_resources/biobanks/UKBIOBANK/pangk"
    label_file1 = f"{DIR}/RARity_monogenic_benchmark/BRCAexchange/BRCA1_clinvar_cleaned.txt" 
    label_file2 = f"{DIR}/RARity_monogenic_benchmark/LOVD_LDLR/LDLR_clinvar_curated.txt" # British heart foundation-classified variants on LOVD

if REGION == "both":
    REGION = "RovHer_BRCA1" 
    ref_file1 = f"{INPUT_DIR}/{REGION}_{LAYER}_ref.csv"
    var_file1 = f"{INPUT_DIR}/{REGION}_{LAYER}_var.csv"
    ref_rev_file1 = f"{INPUT_DIR}/{REGION}_{LAYER}_ref_rev.csv"
    var_rev_file1 = f"{INPUT_DIR}/{REGION}_{LAYER}_var_rev.csv"
    REGION = "RovHer_LDLR" 
    ref_file2 = f"{INPUT_DIR}/{REGION}_{LAYER}_ref.csv"
    var_file2 = f"{INPUT_DIR}/{REGION}_{LAYER}_var.csv"
    ref_rev_file2 = f"{INPUT_DIR}/{REGION}_{LAYER}_ref_rev.csv"
    var_rev_file2 = f"{INPUT_DIR}/{REGION}_{LAYER}_var_rev.csv"
    


In [None]:
from keras.losses import binary_crossentropy

def sample_data(df, sample_frac=1.0, balanced=True, disable=True, random_state=42):
    """Sample dataframe, optionally with balanced classes.
    """
    if disable:
        return df
    if balanced: # Get the number of rows in the dataframe
        num_rows_minor_class = math.ceil(len(df[df["class"] == "LOF"]) * sample_frac)
        return (
            pd.concat(
                [
                    df[df["class"] == "LOF"].sample(n=num_rows_minor_class, random_state=random_state),
                    df[df["class"] == "FUNC/INT"].sample(n=num_rows_minor_class, random_state=random_state),
                ]
            )
            .sample(frac=1.0, random_state=random_state)
            .reset_index(drop=True)
        )
    else: # Calculate the number of rows to sample
        return df.sample(frac=sample_frac, random_state=random_state).reset_index(drop=True)

def subset_dataframe(df, seq):
    """
    Randomly subsets the dataframe to SEQ_LENGTH number of rows.
    Returns: pandas.DataFrame - A subset of the dataframe with SEQ_LENGTH rows.
    """
    print("Number of rows to extract:", seq) 
    if seq > len(df):
        raise ValueError(f"SEQ_LENGTH ({seq}) is greater than the number of rows in the DataFrame ({len(df)}).")
    subset_df = df.sample(n=seq, random_state=42)
    print("New subset:", subset_df.shape) 
    return subset_df

def parse_sequences(pos, ref, alt, refseq, window_size=WINDOW_SIZE):
    """Parse reference and variant sequences from the reference genome sequence.
    Returns:  tuple (reference_sequence, variant_sequence)
    """
    p = pos - 1  # Convert to 0-indexed position
    full_seq = refseq
    ref_seq_start = max(0, p - window_size // 2)
    ref_seq_end = min(len(full_seq), p + window_size // 2)
    ref_seq = refseq[ref_seq_start:ref_seq_end]
    snv_pos_in_ref = min(window_size // 2, p)
    var_seq = ref_seq[:snv_pos_in_ref] + alt + ref_seq[snv_pos_in_ref + 1 :]
    # Sanity checks
    assert len(var_seq) == len(ref_seq)
    assert ref_seq[snv_pos_in_ref] == ref
    assert var_seq[snv_pos_in_ref] == alt
    return ref_seq, var_seq

def generate_fasta_files(df, refseq, output_dir=FASTA_DIR, window_size=WINDOW_SIZE):
    """Generate FASTA files for ref and var sequences.
    Returns:pandas.DataFrame with added columns for FASTA names
    """
    # Create output directory
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    # Paths for output files
    ref_fasta_path = output_dir / "reference_sequences.fasta"
    var_fasta_path = output_dir / "variant_sequences.fasta"
    # Track unique sequences
    ref_sequences = set()
    var_sequences = set()
    ref_seq_to_name = {}
    # Store unique sequences with metadata for writing
    ref_entries = []
    var_entries = []
    ref_names = []
    var_names = []
    # Collect unique reference and variant sequences
    for idx, row in df.iterrows():
        ref_seq, var_seq = parse_sequences(row["pos"], row["ref"], row["alt"], refseq, window_size)
        # Add to sets to ensure uniqueness
        if ref_seq not in ref_sequences:
            ref_sequences.add(ref_seq)
            ref_name = f"ref_pos_{row['pos']}_{row['ref']}"
            ref_entries.append(f">{ref_name}\n{ref_seq}\n")
            ref_names.append(ref_name)
            ref_seq_to_name[ref_seq] = ref_name
        else:
            ref_name = ref_seq_to_name[ref_seq]
            ref_names.append(ref_name)
        if var_seq not in var_sequences:
            var_sequences.add(var_seq)
            var_name = f"var_pos_{row['pos']}_{row['ref']}to{row['alt']}"
            var_entries.append(f">{var_name}\n{var_seq}\n")
            var_names.append(var_name)
        else:
            assert False, "Duplicate variant sequence"
    # Write unique sequences to FASTA files
    with open(ref_fasta_path, "w") as f:
        f.writelines(ref_entries)
    with open(var_fasta_path, "w") as f:
        f.writelines(var_entries)
    # Add FASTA names to dataframe
    df_with_names = df.copy()
    df_with_names["ref_fasta_name"] = ref_names
    df_with_names["var_fasta_name"] = var_names
    print(f"Unique reference sequences: {len(ref_sequences)}")
    print(f"Unique variant sequences: {len(var_sequences)}")
    return df_with_names

# Compute Binary Cross-Entropy using NumPy
def binary_cross_entropy_np(y_true, y_pred):
    """
    Calculates Binary Cross-Entropy loss for multiple samples using NumPy.
    y_true: NumPy array of actual labels (0s and 1s)
    y_pred: NumPy array of predicted probabilities (between 0 and 1)
    """
    epsilon = 1e-15  # Small value to prevent log(0)
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
    loss = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    return loss

# Compute Binary Cross-Entropy using own calculaton for checking
def binary_cross_entropy_check(y_true, y_pred):
    """
    Calculates Binary Cross-Entropy loss for multiple samples using NumPy.
    y_true: NumPy array of actual labels (0s and 1s)
    y_pred: NumPy array of predicted probabilities (between 0 and 1)
    """
    epsilon = 1e-15  # Small value to prevent log(0)
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
    N=len(y_true)
    BCE_sum=0
    for i in range(N):
        BCE_sum = BCE_sum + y_true[i]*np.log(y_pred[i]) + (1-y_true[i])*np.log((1-y_pred[i])) 
    BCE = (-1*BCE_sum)/N
    return BCE

## Load training labels 

The *BRCA1* SNV dataset was obtained from [Findlay et al. (2018)](https://www.nature.com/articles/s41586-018-0461-z), which contains 3,893 SNVs. Among them, 631 SNVs have ClinVar classification, which we recoded as numerical labels [0, 0.25, 0.5, 0.75, 1] for model training.

In [None]:
def recode_clinvar(value):
    mapping = {
        "P": 1,
        "B": 0,
        "LB": 0.25,
        "LP": 0.75,
        "LP,P": 0.75,
        "B/LB": 0.25
    }
    return mapping.get(value, 0.5)

recode_map = {
    "Pathogenic": 1,
    'Pathogenic/Likely pathogenic': 1,
    'Likely pathogenic': 0.75,
    "Uncertain significance": 0.5,
    "Likely benign": 0.25,
    "Benign": 0,
    "absent": "NA",
    'Conflicting interpretations of pathogenicity': "NA",
}

# 1. Variant data + ClinVar labels 
if REGION == "BRCA1_DATA":
    data = pd.read_excel(file, header=2)
    data = data[['chromosome', 'position (hg19)', 'reference', 'alt', 'function.score.mean', 'func.class', 'clinvar',]]
    data.rename(columns={
            'chromosome': 'chrom','position (hg19)': 'pos',
            'reference': 'ref','alt': 'alt',
            'function.score.mean': 'score','func.class': 'class', 'clinvar': 'clinvar',
        }, inplace=True)
    # Re-code values 
    data['class'] = data['class'].replace(['FUNC', 'INT'], 'FUNC/INT')
    # Create new column 
    data['PLINK_SNP_NAME'] = data.apply(
            lambda row: f"{row['chrom']}:{row['pos']}:{row['ref']}:{row['alt']}", axis=1
    )
    # Recode `clinvar` column
    unique_clinvar_values = data['clinvar'].unique()
    print("Unique values in clinvar column:", unique_clinvar_values)
    data['clinvar'] = data['clinvar'].replace(recode_map)
    lof_count = data[data["class"] == "LOF"].shape[0]
    print(f"Test 'LOF': {lof_count}\n")
    other_count = data[data["class"] == "FUNC/INT"].shape[0]
    print(f"Test 'FUNC/INT': {other_count}\n")
else:
    # BRCA1
    ACMG_col1 = pd.read_csv(label_file1, sep="\t", usecols=["PLINK_SNP_NAME", "ACMG_final"])
    ACMG_col1 = ACMG_col1.rename(columns={"ACMG_final": "clinvar"})
    # LDLR
    ACMG_col2 = pd.read_csv(label_file2, sep="\t", usecols=["PLINK_SNP_NAME", "clinvar_clnsig"])
    ACMG_col2 = ACMG_col2.rename(columns={"clinvar_clnsig": "clinvar"})
    # Combine 
    data = pd.concat([ACMG_col1, ACMG_col2], ignore_index=True)
    print(f"BRCA1 and LDLR merged: {data.shape}")
    # (883, 2)
    data = data[~data["clinvar"].isin(["", "NA", "CCP"])]
    data["clinvar"] = data["clinvar"].apply(recode_clinvar)
    print(data["clinvar"].value_counts(dropna=False))

# Remove rows with missing clinvar anno
data = data[data['clinvar'] != "NA"]
print("After removing NA in clinvar:", data.shape)


## Load Evo2 7B model emeddings data

In [None]:
if COMBO == "delta":
    # Variant + reverse complement embeddings 
    delta = pd.read_csv(delta_file)
    delta_reverse = pd.read_csv(delta_rev_file)

if COMBO == "refvar":
    if REGION == "BRCA1_DATA" or REGION == "RovHer_BRCA1" or REGION == "RovHer_LDLR":
        # 1. Variant + reverse complement 
        var = pd.read_csv(var_file)
        var_reverse = pd.read_csv(var_rev_file)
        # 2. Reference + reverse complement
        ref = pd.read_csv(ref_file)
        ref_reverse = pd.read_csv(ref_rev_file)

    if REGION == "both":
        var1 = pd.read_csv(var_file1)
        var_reverse1 = pd.read_csv(var_rev_file1)
        ref1 = pd.read_csv(ref_file1)
        ref_reverse1 = pd.read_csv(ref_rev_file1)
        var2 = pd.read_csv(var_file2)
        var_reverse2 = pd.read_csv(var_rev_file2)
        ref2 = pd.read_csv(ref_file2)
        ref_reverse2 = pd.read_csv(ref_rev_file2)

        var = pd.concat([var1, var2], ignore_index=True)
        var_reverse = pd.concat([var_reverse1, var_reverse2], ignore_index=True)
        ref = pd.concat([ref1, ref2], ignore_index=True)
        ref_reverse = pd.concat([ref_reverse1, ref_reverse2], ignore_index=True)

### Subset rows from training data

In [None]:
# Check for duplicate rows based on the PLINK_SNP_NAME column
data = data[~data['PLINK_SNP_NAME'].duplicated(keep='first')]

if COMBO == "delta":
    # Step 1: Compute the strict intersection of PLINK_SNP_NAME across all dfs
    final_common_snp_names = list(
        set(data['PLINK_SNP_NAME'])
        .intersection(delta['PLINK_SNP_NAME'])
        .intersection(delta_reverse['PLINK_SNP_NAME'])
    )
    # Step 2: Filter all dfs simultaneously based on the common SNP names
    data = data[data['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    delta = delta[delta['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    delta_reverse = delta_reverse[delta_reverse['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    # Tallies
    print("Filtered labels file (data):", data.shape)
    print("delta:", delta.shape, "delta_reverse:", delta_reverse.shape)
    # Check if the number of rows match
    if not (delta.shape[0] == data.shape[0] and
            delta_reverse.shape[0] == data.shape[0]):
        raise ValueError("Number of rows in embeddings do not match number of rows in data.")


if COMBO == "refvar":
    # Step 1: Compute the strict intersection of PLINK_SNP_NAME across all dfs
    final_common_snp_names = list(
        set(data['PLINK_SNP_NAME'])
        .intersection(var['PLINK_SNP_NAME'])
        .intersection(var_reverse['PLINK_SNP_NAME'])
        .intersection(ref['PLINK_SNP_NAME'])
        .intersection(ref_reverse['PLINK_SNP_NAME'])
    )
    # Step 2: Filter all dfs based on the common SNP names
    data = data[data['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    var = var[var['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    var_reverse = var_reverse[var_reverse['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    ref = ref[ref['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)
    ref_reverse = ref_reverse[ref_reverse['PLINK_SNP_NAME'].isin(final_common_snp_names)].reset_index(drop=True)

    # Tallies
    print("Filtered labels file (data):", data.shape)
    print("var:", var.shape, "var_reverse:", var_reverse.shape, "ref:", ref.shape, "ref_reverse:", ref_reverse.shape)

    # Check if the number of rows match
    if not (var.shape[0] == data.shape[0] and
            var_reverse.shape[0] == data.shape[0] and
            ref.shape[0] == data.shape[0] and
            ref_reverse.shape[0] == data.shape[0]):
        raise ValueError("Number of rows in embeddings do not match number of rows in data.")

print("Counts of unique values in clinvar column:")
print(data['clinvar'].value_counts())
numeric_rows = data['clinvar'].apply(lambda x: isinstance(x, (int, float))).sum()
print("Number of clinvar annotations:", numeric_rows, "of", data.shape[0], "rows") #  631 of 3893 rows

### Subset columns from training data

In [None]:
# Drop the 'input_file' and 'layer' columns

if COMBO == "delta":
    delta = delta.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    delta_reverse = delta_reverse.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    print(f"Variant embeddings: {delta.shape}")
    print(f"Variant reverse comp. embeddings: {delta_reverse.shape}")

if COMBO == "refvar":
    var = var.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    var_reverse = var_reverse.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    ref = ref.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    ref_reverse = ref_reverse.drop(columns=['PLINK_SNP_NAME','input_file', 'layer'])
    print(f"Variant embeddings: {var.shape}") # (631, 4096)
    print(f"Variant reverse comp. embeddings: {var_reverse.shape}") # (631, 4096)
    print(f"Reference embeddings: {ref.shape}") # (631, 4096)
    print(f"Reference reverse comp. embeddings: {ref_reverse.shape}") # (631, 4096)

### Build feature vector by concatenation
vector = [reference + reference reverse complement + variant + variant reverse complement]

In [None]:
if COMBO == "delta":
    # feature vector for each SNV  (631, 8192)
    feature_vec = np.hstack([
        delta.values,         # delta embeddings
        delta_reverse.values, # Reverse complement
    ])
    
if COMBO == "refvar":
    # feature vector for each SNV (3893, 16384) | 16384 features per SNV
    feature_vec = np.hstack([
        ref.values,         # Reference embeddings
        ref_reverse.values, # Reverse complement of reference
        var.values,         # Variant embeddings
        var_reverse.values  # Reverse complement of variant
    ])

print(f"feature_vec embeddings: {feature_vec.shape}")

# Extract labels
train_y = data['clinvar'].values

### Split dataset
1. Training Set: The remaining 80% of the training data.
2. Test Set: 20% of the data (withheld entirely from training).
3. Validation Set: 20% of the remaining training data.

In [None]:
# Use all of X_train for training (more training data), w/ internal validation_split during training
internal_validation_split="no"

from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

if internal_validation_split == "yes":
    # Split dataset into test (20%) and remaining training data (80%)
    X_train, X_test, y_train, y_test = train_test_split(
        feature_vec, train_y, test_size=0.2, random_state=42, stratify=train_y
    )
    print(f"Training set size: {X_train.shape}")
else:
    # Check if REGION requires filtering of classes
    if REGION == "RovHer_LDLR":
        # Check class distribution and remove classes with fewer than 2 instances
        class_counts = pd.Series(train_y).value_counts()
        print("Class distribution before filtering:")
        print(class_counts)
        # Keep only classes with at least 2 instances
        classes_to_keep = class_counts[class_counts >= 2].index
        mask = np.isin(train_y, classes_to_keep)
        feature_vec = feature_vec[mask]
        train_y = train_y[mask]
        # Check class distribution again after filtering
        print("Class distribution after filtering:")
        print(pd.Series(train_y).value_counts())
    # MANUALLY split dataset into test (20%) and remaining training data (80%)
    X_train_val, X_test, y_train_val, y_test = train_test_split(
        feature_vec, train_y, test_size=0.2, random_state=42, stratify=train_y
    )
    # Ensure no class has fewer than 2 instances in the remaining training data
    class_counts_val = pd.Series(y_train_val).value_counts()
    if (class_counts_val < 2).any():
        print("Warning: Some classes in y_train_val have fewer than 2 instances. Adjusting...")
        # Filter out classes with fewer than 2 instances in y_train_val
        classes_to_keep_val = class_counts_val[class_counts_val >= 2].index
        mask_val = np.isin(y_train_val, classes_to_keep_val)
        X_train_val = X_train_val[mask_val]
        y_train_val = y_train_val[mask_val]
    # Split the remaining training data into train (80%) and validation (20%)
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val
    )
    # Convert validation and training data to float32
    X_val = X_val.astype('float32')
    y_val = y_val.astype('float32')
    X_train_val = X_train_val.astype('float32')
    y_train_val = y_train_val.astype('float32')
    print(f"Validation set size: {X_val.shape}")

# if internal_validation_split == "yes":
#     # Split dataset into test (20%) and remaining training data (80%)
#     X_train, X_test, y_train, y_test = train_test_split(
#         feature_vec, train_y, test_size=0.2, random_state=42, stratify=train_y
#     )
#     print(f"Training set size: {X_train.shape}") 
# else: 
#     if REGION == "RovHer_LDLR":
#         # Check class distribution, Remove those with fewer than 2 instances
#         class_counts = pd.Series(train_y).value_counts()
#         classes_to_keep = class_counts[class_counts >= 2].index
#         mask = np.isin(train_y, classes_to_keep)
#         feature_vec = feature_vec[mask]
#         train_y = train_y[mask]
#     # MANUALLY split into test (20%) and remaining training data (80%)
#     X_train_val, X_test, y_train_val, y_test = train_test_split(
#         feature_vec, train_y, test_size=0.2, random_state=42, stratify=train_y
#     )
#     # Split remaining training data into train (80%) and validation (20%)
#     X_train, X_val, y_train, y_val = train_test_split(
#         X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val
#     )
#     X_val = X_val.astype('float32')
#     y_val = y_val.astype('float32')
#     X_train_val = X_train_val.astype('float32')
#     y_train_val = y_train_val.astype('float32')
#     print(f"Validation set size: {X_val.shape}")

X_train = X_train.astype('float32')
y_train = y_train.astype('float32')
X_test = X_test.astype('float32')
y_test = y_test.astype('float32')

print(f"Training set size: {X_train.shape}") # (403, 16384)
print(f"Test set size: {X_test.shape}") # (127, 16384)

### Train ANN
* Input Layer = 32,768 features.
* Hidden Layers: 512 → 128 → 32 neurons.
* Output Layer: Binary classification (pathogenic probability).
* Activation: ReLU for hidden layers, Sigmoid for the output layer.
* Batch Normalization and Dropout (𝑝=0.3) after each hidden layer.

In [None]:
# Create ANN model, with output layer for binary classification
input_dim = feature_vec.shape[1]
def build_model():
    model = Sequential()
    model.add(Dense(512, activation='relu',input_dim=input_dim))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(128, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(32, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['AUC'])
    return model

ANN_model = build_model()
ANN_model.summary()

In [None]:
start_time = time.time()

# reserves 15% of the training data (X_train and y_train) for validation during training
if internal_validation_split == "yes":
    history = ANN_model.fit(
        X_train, y_train, 
        epochs=100, 
        batch_size=64, 
        validation_split=0.15, 
    )
    print("Internal validation split enabled during training\n")
else:
    history = ANN_model.fit(
        X_train, y_train, 
        epochs=100, 
        batch_size=64, 
        verbose=2
    )
    print("No internal validation split occured.\n")

end_time = time.time()
exe_time = end_time - start_time
print("Training time: ", exe_time)

### Evaluate on test and validation sets
* validation set- performance on unseen data, monitor overfitting

In [None]:
test_loss, test_auc = ANN_model.evaluate(X_test, y_test, verbose=2)
print(f"Test Loss: {test_loss:.4f} | AUC: {test_auc:.4f}\n") # Test Loss: 0.7120 | AUC: 0.8740

# Evaluate on the validation set
if internal_validation_split == "no":  
    val_loss, val_auc = ANN_model.evaluate(X_val, y_val, verbose=2)
    print(f"Validation Loss: {val_loss:.4f} | AUC: {val_auc:.4f}")

## Plot training loss & AUC
* Loss vs Epochs and AUC vs Epochs (Training)
* ROC curve (Test and Validation set)

In [None]:
# Plot training and validation loss
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# Plot training and validation AUC
plt.subplot(1, 2, 2)
plt.plot(history.history['auc'], label='Training AUC')
plt.plot(history.history['val_auc'], label='Validation AUC')
plt.title('Training AUC')
plt.xlabel('Epochs')
plt.ylabel('AUC')
plt.legend()
plt.tight_layout()
plt.show()
plt.savefig(f"{OUTPUT_DIR}/{REGION}_{LAYER}_train_AUC_loss.png")
print("Loss plot:", f"{OUTPUT_DIR}/{REGION}_{LAYER}_train_AUC_loss.png")

## Plot ROC curve 

In [None]:
# Binary thresholding for AUC calculation
threshold = 0.5

# Set up a single figure for side-by-side subplots
plt.figure(figsize=(12, 5))

# Plot Test Set ROC Curve
plt.subplot(1, 2, 1)  # First subplot (1 row, 2 columns, 1st plot)
y_test_binary = np.where(y_test > threshold, 1, 0)
y_test_pred_prob = ANN_model.predict(X_test).ravel()  # Flatten predictions
fpr, tpr, thresholds = roc_curve(y_test_binary, y_test_pred_prob)
auc = roc_auc_score(y_test_binary, y_test_pred_prob)
print(f"AUC (binary thresholding - Test Set): {auc:.4f}")
plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}", color="blue")
plt.plot([0, 1], [0, 1], 'k--', label="Random Guess", color="gray")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Test Set ROC Curve (Binary-thresholded AUC)")
plt.legend(loc="lower right")
plt.grid(alpha=0.3)

# Plot Validation Set ROC Curve
plt.subplot(1, 2, 2)  # Second subplot (1 row, 2 columns, 2nd plot)
y_val_binary = np.where(y_val > threshold, 1, 0)
y_val_pred_prob = ANN_model.predict(X_val).ravel()  # Flatten predictions
fpr_val, tpr_val, thresholds_val = roc_curve(y_val_binary, y_val_pred_prob)
auc_val = roc_auc_score(y_val_binary, y_val_pred_prob)
print(f"AUC (binary thresholding - Validation Set): {auc_val:.4f}")
plt.plot(fpr_val, tpr_val, label=f"AUC = {auc_val:.4f}", color="green")
plt.plot([0, 1], [0, 1], 'k--', label="Random Guess", color="gray")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Validation Set ROC Curve (Binary-thresholded AUC)")
plt.legend(loc="lower right")
plt.grid(alpha=0.3)

plt.tight_layout()
plt.savefig(f"{OUTPUT_DIR}/{REGION}_{LAYER}_ROC.png")
plt.show()
print(f"AUC (Test): {auc:.4f}  (Validation): {auc_val:.4f}")

In [371]:
# input is 1x2000 which is  [ ref vector  variant vector ]

# The training dataset (balanced) is as follows:
#  [one_ref_vector  variant_vector_1 (8% or 80 locations modified)]    label 1 : 10 cases
#  [one_ref_vector  variant_vector_2 (2% or 20 locations modified)]    label 0 : 5 cases
#  [one_ref_vector  variant_vector_3 (similar to one_ref_vector)  ]    label 0 : 5 cases
#
# Based on the search, the benign variants is designed to have changes of 2% of elements.(label 0)
# The pathogenic variants have a higher frequency of alterations of 8% of elements. (label 1)
#
# generate_dataset_new2() is the main function

In [None]:
# To create the dataset
# Ref vector is 1x1000; variant vector is 1x1000
def generate_dataset_new2(num_items,size_ref_vector,label_1_matrix,label_0_matrix,label_1_sign,label_0_sign):
    train_x = np.empty((0,2*size_ref_vector))
    train_y = np.empty((0,1))
    lowerB = 0.99
    upperB = 1.01
    lowerR = 1.2
    upperR = 1.5
    for i in range(0,num_items):
        ref_vector1 = np.array(np.random.uniform(low=0.01, high=0.6, size=size_ref_vector))
        for icase in range(0,10):
            pos = np.array(label_1_matrix[icase,:])
            kk_sign = np.array(label_1_sign[icase,:])
            counter = 0
            k_adjust = random.uniform(lowerB,upperB)
            var_vector1 = copy.deepcopy(ref_vector1)*k_adjust
            for j in pos:
                kk = random.uniform(lowerR,upperR)  
                var_vector1[j] = ref_vector1[j]*kk*kk_sign[counter]
                counter = counter + 1
            zzz_x = np.concatenate((ref_vector1,var_vector1))   
            zzz_y = np.array([1])
            train_x = np.append(train_x,[zzz_x],axis=0)
            train_y = np.append(train_y,[zzz_y],axis=0)
        for icase in range(0,5):
            pos = np.array(label_0_matrix[icase,:])
            kk_sign = np.array(label_0_sign[icase,:])
            counter = 0
            k_adjust = random.uniform(lowerB,upperB)
            var_vector1 = copy.deepcopy(ref_vector1)*k_adjust
            for j in pos:
                kk = random.uniform(lowerR,upperR)
                var_vector1[j] = ref_vector1[j]*kk*kk_sign[counter]
                counter = counter + 1
            zzz_x = np.concatenate((ref_vector1,var_vector1))   
            zzz_y = np.array([0])
            train_x = np.append(train_x,[zzz_x],axis=0)
            train_y = np.append(train_y,[zzz_y],axis=0)
        for icase in range(0,5):
            k_adjust = random.uniform(lowerB,upperB)
            var_vector1 = copy.deepcopy(ref_vector1)*k_adjust
            zzz_x = np.concatenate((ref_vector1,var_vector1))   
            zzz_y = np.array([0])
            train_x = np.append(train_x,[zzz_x],axis=0)
            train_y = np.append(train_y,[zzz_y],axis=0)
    return train_x, train_y

In [None]:
size_ref_vector = 1000

ref_elements_to_change = int(size_ref_vector*0.08) # 8%
var_elements_to_change = int(size_ref_vector*0.02) # 2%

label_1_matrix = np.random.randint(0,size_ref_vector-1, size=(10,ref_elements_to_change) )
label_0_matrix = np.random.randint(0,size_ref_vector-1, size=(10,var_elements_to_change) )

label_1_sign = np.random.choice([-1,1], size=(10,ref_elements_to_change) )
label_0_sign = np.random.choice([-1,1], size=(10,var_elements_to_change) )

print('label_1_matrix.shape: ' , label_1_matrix.shape)
print('label_0_matrix.shape: ' , label_0_matrix.shape)
print('label_1_sign.shape: ' , label_1_sign.shape)
print('label_0_sign.shape: ' , label_0_sign.shape)

Nset = 100
print('Generating the training dataset ...')
train_x, train_y=generate_dataset_new2(Nset,size_ref_vector,label_1_matrix,label_0_matrix,label_1_sign,label_0_sign)
print(train_x.shape) # (2000, 2000)
print(train_y.shape) # (2000,)

# array([[1.],
#        [1.],
#        [1.],
#        ...,
#        [0.],
#        [0.],
#        [0.]])

# >>> train_x
# array([[ 0.28908019,  0.59879076,  0.17052536, ...,  0.58650623,
#         -0.24925832,  0.54755763],
#        [ 0.28908019,  0.59879076,  0.17052536, ...,  0.58685261,
#          0.20274144,  0.54788101],
#        [ 0.28908019,  0.59879076,  0.17052536, ...,  0.58446367,
#          0.20191613,  0.54565071],
#        ...,
#        [ 0.32940272,  0.03123094,  0.22965965, ...,  0.22620247,
#          0.0247488 ,  0.29854916],
#        [ 0.32940272,  0.03123094,  0.22965965, ...,  0.22705594,
#          0.02484218,  0.29967561],
#        [ 0.32940272,  0.03123094,  0.22965965, ...,  0.22548236,
#          0.02467001,  0.29759874]])




label_1_matrix.shape:  (10, 80)
label_0_matrix.shape:  (10, 20)
label_1_sign.shape:  (10, 80)
label_0_sign.shape:  (10, 20)
Generating the training dataset ...
(2000, 2000)
(2000, 1)


In [397]:
# Each set of dataset is Nset x 80
print('Generating the testing dataset ...')
test_x, test_y=generate_dataset_new2(100,size_ref_vector,label_1_matrix,label_0_matrix,label_1_sign,label_0_sign)
print(test_x.shape)
print(test_y.shape)

Generating the testing dataset ...
(2000, 2000)
(2000, 1)


In [None]:
# Create ANN model

def baseline_model():
    model = Sequential()
    model.add(Dense(512, activation='relu', input_dim = 2000))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(128, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(32, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dense(1, activation='sigmoid')) # Output layer for binary classification
    model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
    return model

def baseline_model4():
    model = Sequential()
    model.add(Dense(256, activation='relu', input_dim = 1000))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(64, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(16, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dense(1, activation='sigmoid')) # Output layer for binary classification
    model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
    return model

ANN_model = baseline_model()
ANN_model.summary()

start_time = time.time()
history = ANN_model.fit(train_x, train_y, epochs=50, batch_size=64,validation_split=0.15,verbose = 2)
end_time = time.time()
exe_time = end_time - start_time
print("Execution time: ", exe_time)
scores = ANN_model.evaluate(test_x,test_y,verbose = 2)
print("Testing Accuracy = ", scores)

Epoch 1/50
27/27 - 0s - loss: 0.2007 - accuracy: 0.9529 - val_loss: 0.0563 - val_accuracy: 0.9633
Epoch 2/50
27/27 - 0s - loss: 0.0159 - accuracy: 0.9953 - val_loss: 0.0386 - val_accuracy: 0.9833
Epoch 3/50
27/27 - 0s - loss: 0.0042 - accuracy: 0.9994 - val_loss: 0.0170 - val_accuracy: 0.9900
Epoch 4/50
27/27 - 0s - loss: 0.0031 - accuracy: 0.9994 - val_loss: 0.0041 - val_accuracy: 1.0000
Epoch 5/50
27/27 - 0s - loss: 0.0016 - accuracy: 1.0000 - val_loss: 0.0030 - val_accuracy: 1.0000
Epoch 6/50
27/27 - 0s - loss: 0.0015 - accuracy: 1.0000 - val_loss: 0.0022 - val_accuracy: 1.0000
Epoch 7/50
27/27 - 0s - loss: 0.0016 - accuracy: 1.0000 - val_loss: 0.0019 - val_accuracy: 1.0000
Epoch 8/50
27/27 - 0s - loss: 0.0010 - accuracy: 1.0000 - val_loss: 0.0018 - val_accuracy: 1.0000
Epoch 9/50
27/27 - 0s - loss: 8.9974e-04 - accuracy: 1.0000 - val_loss: 0.0017 - val_accuracy: 1.0000
Epoch 10/50
27/27 - 0s - loss: 9.2048e-04 - accuracy: 1.0000 - val_loss: 0.0017 - val_accuracy: 1.0000
Epoch 11/50

In [None]:
# To get the results of the ANN using test dataset
y_pred = ANN_model.predict(test_x)

epsilon = 1e-15  # Small value to prevent log(0)
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
print(test_x.shape)
print(test_y.shape)
print(y_pred.shape)
total_loss = binary_cross_entropy_check(test_y, y_pred)
print(f"** CHECK Average BCE Loss for multiple samples: {total_loss}")

# To check on accuracy
# first, convert the elements in y_pred so that negative becones epsilon, largest is 1 - epsilon
epsilon = 1e-15  # Small value to prevent log(0)
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
N=len(test_y)
threshold = 0.5
y_pred_binary = (y_pred >= threshold).astype(int)

accuracy = accuracy_score(test_y,y_pred_binary)
print("Accuracy Score:",accuracy)

#Count error cases:
test_y_binary = test_y
error0to1 = 0  # test_y is 0
error1to0 = 0  # test_y is 1
for i in range(N):
    if (test_y_binary[i] == 0 and y_pred_binary[i] == 1):
        error0to1 += 1
    if (test_y_binary[i] == 1 and y_pred_binary[i] == 0):
        error1to0 += 1
print('error0to1 = ',error0to1, '; label is 0')
print('error1to0 = ',error1to0, '; label is 1')
print('Testing total error = ',error0to1+error1to0,'percentError = ', 100*(error0to1+error1to0)/N)
print('2000 test cases: 1000 label 0; 1000 label 1')

(2000, 2000)
(2000, 1)
(2000, 1)
** CHECK Average BCE Loss for multiple samples: [nan]
Accuracy Score: 0.9975
error0to1 =  4 ; label is 0
error1to0 =  1 ; label is 1
Testing total error =  5 percentError =  0.25
2000 test cases: 1000 label 0; 1000 label 1




In [None]:
print('test_x.shape ', test_x.shape)
print('test_y.shape ',test_y.shape)
print(y_pred_binary.shape)
print('Compare between test_y and y_pred_binary')

N=len(test_y)
errorcase=[0]*20
for i in range(0,len(test_y)):
    index = (i % 20)
    if (test_y[i] != y_pred_binary[i]): 
        errorcase[index] = errorcase[index]+1
print(errorcase)
print('Sum of errors = ' ,np.sum(errorcase), '  Percent Error', 100*np.sum(errorcase)/N)

test_x.shape  (2000, 2000)
test_y.shape  (2000, 1)
(2000, 1)
Compare between test_y and y_pred_binary
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0]
Sum of errors =  5   Percent Error 0.25


In [None]:
print('train_x.shape ', train_x.shape)
print('train_y.shape ',train_y.shape)

# To get the results of the ANN using test dataset
y_pred = ANN_model.predict(train_x)

epsilon = 1e-15  # Small value to prevent log(0)
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
N=len(train_y)
threshold = 0.5
y_pred_binary = (y_pred >= threshold).astype(int)
print(y_pred_binary.shape)
print('Compare between train_y and y_pred_binary')

errorcase=[0]*20
for i in range(0,len(train_y)):
    index = (i % 20)
    if (train_y[i] != y_pred_binary[i]): 
        errorcase[index] = errorcase[index]+1
print(errorcase)
print('len(error) '  ,len(errorcase))

print('Sum of errors = ' ,np.sum(errorcase), '  Percent Error', 100*np.sum(errorcase)/N)

train_x.shape  (2000, 2000)
train_y.shape  (2000, 1)
(2000, 1)
Compare between train_y and y_pred_binary
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
len(error)  20
Sum of errors =  0   Percent Error 0.0


In [None]:
# Part B:
# Difference vector
# Getting the difference vector of the training cases for training
# input is now 1 x 1000; binary output
#
ref_vector = train_x[:,0:1000]
var_vector = train_x[:,1000:2000]
diff_vector = np.abs(ref_vector - var_vector)
print(ref_vector.shape,var_vector.shape,diff_vector.shape  )

ref_vector_test = test_x[:,0:1000]
var_vector_test = test_x[:,1000:2000]
diff_vector_test = np.abs(ref_vector_test - var_vector_test)
print(ref_vector_test.shape,var_vector_test.shape,diff_vector_test.shape  )


(2000, 1000) (2000, 1000) (2000, 1000)
(2000, 1000) (2000, 1000) (2000, 1000)


In [None]:
# ANN model of the 1000 inputs
def baseline_model4():
    model = Sequential()
    model.add(Dense(256, activation='relu', input_dim = 1000))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(64, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.3))
    model.add(Dense(16, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dense(1, activation='sigmoid')) # Output layer for binary classification
    
    model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])
    
    return model

In [None]:
ANN_model_diff = baseline_model4()

history = ANN_model_diff.fit(diff_vector, train_y, epochs=30, batch_size=64,validation_split=0.15,verbose = 2)
print('Training completed. ')
scores = ANN_model_diff.evaluate(diff_vector_test,test_y,verbose = 2)
print("Testing Accuracy = ", scores)

Epoch 1/30
27/27 - 1s - loss: 0.2219 - accuracy: 0.9265 - val_loss: 0.4861 - val_accuracy: 0.9067
Epoch 2/30
27/27 - 0s - loss: 0.0441 - accuracy: 1.0000 - val_loss: 0.4150 - val_accuracy: 0.8933
Epoch 3/30
27/27 - 0s - loss: 0.0269 - accuracy: 1.0000 - val_loss: 0.3699 - val_accuracy: 0.9100
Epoch 4/30
27/27 - 0s - loss: 0.0206 - accuracy: 1.0000 - val_loss: 0.3221 - val_accuracy: 0.9833
Epoch 5/30
27/27 - 0s - loss: 0.0180 - accuracy: 1.0000 - val_loss: 0.2650 - val_accuracy: 1.0000
Epoch 6/30
27/27 - 0s - loss: 0.0137 - accuracy: 1.0000 - val_loss: 0.1992 - val_accuracy: 1.0000
Epoch 7/30
27/27 - 0s - loss: 0.0109 - accuracy: 0.9994 - val_loss: 0.1342 - val_accuracy: 1.0000
Epoch 8/30
27/27 - 0s - loss: 0.0095 - accuracy: 1.0000 - val_loss: 0.0907 - val_accuracy: 1.0000
Epoch 9/30
27/27 - 0s - loss: 0.0080 - accuracy: 1.0000 - val_loss: 0.0585 - val_accuracy: 1.0000
Epoch 10/30
27/27 - 0s - loss: 0.0070 - accuracy: 1.0000 - val_loss: 0.0374 - val_accuracy: 1.0000
Epoch 11/30
27/27 -

In [None]:
# Results based on the vector difference of the 2000 test cases
y_pred = ANN_model_diff.predict(diff_vector_test)
loss_diff, acc_diff = ANN_model_diff.evaluate(diff_vector_test,test_y,verbose = 2)
print(loss_diff, acc_diff)

63/63 - 0s - loss: 3.8846e-04 - accuracy: 1.0000
0.0003884606994688511 1.0


In [None]:
print('diff_vector_test.shape ', diff_vector_test.shape)
print('test_y.shape ',test_y.shape)

y_pred = ANN_model_diff.predict(diff_vector_test)
threshold = 0.5
y_pred_binary = (y_pred >= threshold).astype(int)

print(y_pred_binary.shape)

print('Compare between test_y and y_pred_binary')

N=len(test_y)

errorcase=[0]*20
for i in range(0,len(test_y)):
    index = (i % 20)
    if (test_y[i] != y_pred_binary[i]): 
        errorcase[index] = errorcase[index]+1
print(errorcase)
print('Sum of errors = ' ,np.sum(errorcase), '  Percent Error', 100*np.sum(errorcase)/N)

diff_vector_test.shape  (2000, 1000)
test_y.shape  (2000, 1)
(2000, 1)
Compare between test_y and y_pred_binary
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Sum of errors =  0   Percent Error 0.0


In [130]:
import os
DIR = "C:/Users/gpang/iCloudDrive/Naja/20250628-Evo2/code"
out_path = os.path.join(DIR, "trainX_vectors3.csv")
print(out_path)
# DB_train.tofile(out_path,sep=',',fmt='%.5f')
np.savetxt(out_path,DBset,delimiter=',',fmt='%.5f')
print("Finished Export of Training cases ")

C:/Users/gpang/iCloudDrive/Naja/20250628-Evo2/code\trainX_vectors3.csv
Finished Export of Training cases 


In [131]:
out_path = os.path.join(DIR, "trainY_vectors3.csv")
print(out_path)
# DB_test.tofile(out_path,sep=',',fmt='#.5f')
np.savetxt(out_path,DBy,delimiter=',',fmt='%.5f')
print("Finished Export of Testing cases ")

C:/Users/gpang/iCloudDrive/Naja/20250628-Evo2/code\trainY_vectors3.csv
Finished Export of Testing cases 


In [132]:
out_path = os.path.join(DIR, "testX_vectors3.csv")
print(out_path)
# DB_train.tofile(out_path,sep=',',fmt='%.5f')
np.savetxt(out_path,DBset_test,delimiter=',',fmt='%.5f')
print("Finished Export of Training cases ")

C:/Users/gpang/iCloudDrive/Naja/20250628-Evo2/code\testX_vectors3.csv
Finished Export of Training cases 


In [40]:
# NO USE BELOW

#Define ANOTHER ANN model
input_dim = 16384
ANNmodel2 = keras.Sequential([
    layers.Dense(512, activation='relu', input_shape=(1500,input_dim)),
    layers.Dense(128, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(1, activation='sigmoid') # Output layer for binary classification
])

# compile with cross entropy
ANNmodel2.compile(optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy'])

In [None]:
# training

start_time = time.time()

history = ANNmodel2.fit(x_train, y_train, epochs=30, batch_size=32,verbose = 2)

end_time = time.time()
exe_time = end_time - start_time
print("Execution time: ", exe_time)

In [None]:
scores = ANNmodel2.evaluate(test_x,test_y,verbose = 2)

In [55]:
# test code
import numpy as np
from keras.losses import binary_crossentropy
import keras.backend as K

# Example true labels and predicted probabilities
y_true = np.array([0, 1, 1, 0, 1])
y_pred = np.array([0.1, 0.9, 0.8, 0.2, 0.7])

# Compute Binary Cross-Entropy using NumPy
def binary_cross_entropy(y_true, y_pred):
    bce = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    return bce

bce_loss = binary_cross_entropy(y_true, y_pred)
print(f"***Binary Cross-Entropy Loss (function): {bce_loss}")

#===========================================================================
def binary_cross_entropy_np(y_true, y_pred):
    """
    Calculates Binary Cross-Entropy loss for multiple samples using NumPy.
    y_true: NumPy array of actual labels (0s and 1s)
    y_pred: NumPy array of predicted probabilities (between 0 and 1)
    """
    epsilon = 1e-15  # Small value to prevent log(0)
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)  # Clip probabilities
    loss = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    return loss

total_loss = binary_cross_entropy_np(y_true, y_pred)
print(f"--Average BCE Loss for multiple samples: {total_loss}")
#--------------------------------------------------------------------------

# Compute Binary Cross-Entropy using Keras
# DOES NOT WORK
# bce_loss_keras = binary_crossentropy(K.constant(y_true), K.constant(y_pred)).numpy()
# print(f"Binary Cross-Entropy Loss (Keras): {bce_loss_keras}")

***Binary Cross-Entropy Loss (function): 0.20273661557656092
--Average BCE Loss for multiple samples: 0.20273661557656092


In [84]:
aaa = np.array([0, 1, 1, -2.5, 1 , 3.4])
bbb = np.array([0.1, 0.9, 0.8, 0.2, 0.7])

print(max(aaa))
print(min(aaa))

3.4
-2.5


In [None]:
    DBset = np.empty((0,4))
    DBset_tmp = np.array([1,2,3,4])
    DBset = np.append(DBset,[DBset_tmp],axis=0)
    print(DBset)
    
    DBset_tmp = np.array([5,6,7,8])
    DBset = np.append(DBset,[DBset_tmp],axis=0)

    print(DBset)
    print('========================')
    DBset_tmp = np.array([9,10,11,12])
    DBset = np.append(DBset,[DBset_tmp],axis=0)

    print(DBset)

In [None]:
# NO NEED TO LOAD / READIN

# READ IN CSV (takes 30 seconds)
new_train_values = np.loadtxt("train_vectors2.csv",delimiter=",")
print(new_train_values.shape)
new_train = new_train_values.reshape((4000,16385))
print('new_train.shape = ',new_train.shape)


new_test_values = np.loadtxt("test_vectors2.csv",delimiter=",")
print(new_test_values.shape)
new_test = new_test_values.reshape((4000,16385))
print('new_test.shape = ',new_test.shape)

train_x = new_train[:,0:16384] 
train_y = new_train[:,16384]
print(train_x.shape)
print(train_y.shape)
#===================
test_x = new_test[:,0:16384] 
test_y = new_test[:,16384]
print(test_x.shape)
print(test_y.shape)