In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.losses import MeanSquaredError, MeanAbsoluteError
from tensorflow.keras.optimizers import Adam



from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import numpy as np


2025-07-22 23:25:06.898763: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-07-22 23:25:06.907917: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-07-22 23:25:06.930994: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1753226706.969528   10464 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1753226706.980328   10464 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1753226707.011719   10464 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

### Data Preprocessing 

In [12]:
from sklearn.model_selection import train_test_split

ecg_data = np.load("/home/psyche/Documents/ProJect64/ProJect64 System Architect/Data/Model_data/ecgModel_data.npy")
pcg_data = np.load("/home/psyche/Documents/ProJect64/ProJect64 System Architect/Data/Model_data/pcgModel_data.npy")

ecgTrain, ecgTemp = train_test_split(ecg_data, test_size=0.4, random_state=42, shuffle=True)
pcgTrain, pcgTemp = train_test_split(pcg_data, test_size=0.4, random_state=42, shuffle=True)

ecgVal,ecgTest = train_test_split(ecgTemp, test_size=0.5, random_state=42, shuffle=True)
pcgVal,pcgTest = train_test_split(pcgTemp, test_size=0.5, random_state=42, shuffle=True)



In [13]:
np.save("ecgTrain.npy", ecgTrain)
np.save("pcgTrain.npy", pcgTrain)
np.save("ecgVal.npy", ecgVal)
np.save("pcgVal.npy", pcgVal)
np.save("ecgTest.npy", ecgTest)
np.save("pcgTest.npy", pcgTest)

### Encoders
This stream of code is an encoder for Variational Autoencoder model


In [14]:
def build_encoder(input_dim, latent_dim=32, hidden_dims=[256, 128], name_prefix="enc"):
    inputs = Input(shape=(input_dim,), name=f"{name_prefix}_input")
    x = inputs
    for i, dim in enumerate(hidden_dims):
        x = layers.Dense(dim, activation="relu", name=f"{name_prefix}_dense_{i}")(x)
    z_mean = layers.Dense(latent_dim, name=f"{name_prefix}_z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name=f"{name_prefix}_z_log_var")(x)
    return Model(inputs, [z_mean, z_log_var], name=f"{name_prefix}_encoder")


### Decoders
This stream of code for the decoder for the model.

In [3]:
def build_decoder(output_dim, latent_dim=32, hidden_dims=[128, 256], name_prefix="dec"):
    latent_inputs = Input(shape=(latent_dim,), name=f"{name_prefix}_input")
    x = latent_inputs
    for i, dim in enumerate(hidden_dims):
        x = layers.Dense(dim, activation="relu", name=f"{name_prefix}_dense_{i}")(x)
    outputs = layers.Dense(output_dim, activation="tanh", name=f"{name_prefix}_output")(x)
    return Model(latent_inputs, outputs, name=f"{name_prefix}_decoder")



In [4]:
def sampling(z_mean, z_log_var):
    epsilon = tf.random.normal(shape=tf.shape(z_mean))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

### Model 

In [5]:
class MultimodalVAE(Model):
    def __init__(self, ecg_encoder, pcg_encoder, ecg_decoder, pcg_decoder, latent_dim=32, **kwargs):
        super(MultimodalVAE, self).__init__(**kwargs)
        self.ecg_encoder = ecg_encoder
        self.pcg_encoder = pcg_encoder
        self.ecg_decoder = ecg_decoder
        self.pcg_decoder = pcg_decoder
        self.latent_dim = latent_dim
        self.loss_fn = MeanSquaredError()

    def compile(self, optimizer):
        super().compile()
        self.optimizer = optimizer
        self.total_loss_tracker = tf.keras.metrics.Mean(name="loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")
        self.recon_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")

    @property
    def metrics(self):
        return [self.total_loss_tracker, self.recon_loss_tracker, self.kl_loss_tracker]

    def train_step(self, data):
        ecg = data["ecg_input"]
        pcg = data["pcg_input"]

        with tf.GradientTape() as tape:
            z_mean_ecg, z_log_var_ecg = self.ecg_encoder(ecg)
            z_mean_pcg, z_log_var_pcg = self.pcg_encoder(pcg)

            # Concatenate latent mean and logvar
            z_mean = tf.concat([z_mean_ecg, z_mean_pcg], axis=1)
            z_log_var = tf.concat([z_log_var_ecg, z_log_var_pcg], axis=1)

            z = sampling(z_mean, z_log_var)

            ecg_recon = self.ecg_decoder(z)
            pcg_recon = self.pcg_decoder(z)

            ecg_loss = self.loss_fn(ecg, ecg_recon)
            pcg_loss = self.loss_fn(pcg, pcg_recon)
            recon_loss = ecg_loss + pcg_loss

            kl_loss = -0.5 * tf.reduce_mean(
                tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1)
            )

            total_loss = recon_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.recon_loss_tracker.update_state(recon_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {
            "loss": self.total_loss_tracker.result(),
            "recon_loss": self.recon_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


In [6]:
# Assuming both are shaped (num_samples, 500)
ecg_data = np.load("/home/psyche/Documents/ProJect64/ProJect64 System Architect/Data/Model_data/ecgModel_data.npy")
pcg_data = np.load("/home/psyche/Documents/ProJect64/ProJect64 System Architect/Data/Model_data/pcgModel_data.npy")

ecg_encoder = build_encoder(5000, latent_dim=18, hidden_dims=[512, 256], name_prefix="ecg")
pcg_encoder = build_encoder(5000, latent_dim=18, hidden_dims=[512, 256], name_prefix="pcg")

ecg_decoder = build_decoder(5000, latent_dim=36, hidden_dims=[256, 512], name_prefix="ecg")
pcg_decoder = build_decoder(5000, latent_dim=36, hidden_dims=[256, 512], name_prefix="pcg")

vae = MultimodalVAE(ecg_encoder, pcg_encoder, ecg_decoder, pcg_decoder, latent_dim=36)
vae.compile(optimizer=Adam(1e-3))
vae.summary()
vae.fit(
    {"ecg_input": ecg_data, "pcg_input": pcg_data},
    epochs=30,
    batch_size=32
)


2025-07-22 23:25:23.411693: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)


Epoch 1/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 273ms/step - kl_loss: 1.2726 - loss: 1.5538 - recon_loss: 0.2812
Epoch 2/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 253ms/step - kl_loss: 0.0131 - loss: 0.2871 - recon_loss: 0.2740
Epoch 3/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 250ms/step - kl_loss: 7.2569e-04 - loss: 0.2774 - recon_loss: 0.2767
Epoch 4/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 251ms/step - kl_loss: 1.0931e-04 - loss: 0.2703 - recon_loss: 0.2702
Epoch 5/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 252ms/step - kl_loss: 3.3186e-04 - loss: 0.2759 - recon_loss: 0.2756
Epoch 6/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 223ms/step - kl_loss: 3.9242e-04 - loss: 0.2705 - recon_loss: 0.2701
Epoch 7/30
[1m106/106[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 198ms/step - kl_loss: 1.4658e-04 - loss: 0.2724 

<keras.src.callbacks.history.History at 0x73508cf217f0>

## ROC's

In [1]:
from sklearn.metrics import roc_curve, auc

def compute_roc_threshold(y_true, scores):
    fpr, tpr, thresholds = roc_curve(y_true, scores)
    roc_auc = auc(fpr, tpr)

    # Youden's J statistic
    j_scores = tpr - fpr
    best_idx = j_scores.argmax()
    best_threshold = thresholds[best_idx]

    return {
        "fpr": fpr,
        "tpr": tpr,
        "thresholds": thresholds,
        "auc": roc_auc,
        "best_threshold": best_threshold
    }


In [None]:


def evaluate_roc(model, dataset, labels, plot=True):
    """
    Evaluate a VAE's anomaly detection performance using ROC.

    Parameters:
    - model: Trained VAE model with encoder and decoder
    - dataset: Numpy array or batched tf.data.Dataset of input segments (ECG, PCG, etc.)
    - labels: Ground truth binary labels (0 = normal, 1 = anomaly)
    - plot: Whether to plot the ROC curve

    Returns:
    - fpr, tpr, thresholds, roc_auc
    """
    recon_errors = []
    mae_loss = MeanAbsoluteError()

    # If tf.data.Dataset, iterate directly
    for batch in dataset:
        if isinstance(batch, tuple):  # (data, labels) format
            x = batch[0]
        else:
            x = batch

        # Reconstruct
        x_pred = model(x, training=False)
        
        # Compute reconstruction loss (per sample)
        batch_errors = np.mean(np.abs(x.numpy() - x_pred.numpy()), axis=(1,))
        recon_errors.extend(batch_errors)

    recon_errors = np.array(recon_errors)
    labels = np.array(labels)

    # Compute ROC metrics
    fpr, tpr, thresholds = roc_curve(labels, recon_errors)
    roc_auc = auc(fpr, tpr)

    if plot:
        plt.figure(figsize=(6, 5))
        plt.plot(fpr, tpr, label=f"ROC curve (AUC = {roc_auc:.4f})")
        plt.plot([0, 1], [0, 1], 'k--', label="Random guess")
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC Curve for VAE Anomaly Detection')
        plt.legend(loc='lower right')
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    return fpr, tpr, thresholds, roc_auc


In [None]:
def get_reconstruction_errors(model, ecg_data, pcg_data):
    recon_errors = []
    batch_size = 32
    for i in range(0, len(ecg_data), batch_size):
        ecg_batch = tf.convert_to_tensor(ecg_data[i:i+batch_size], dtype=tf.float32)
        pcg_batch = tf.convert_to_tensor(pcg_data[i:i+batch_size], dtype=tf.float32)

        z_mean_ecg, z_log_var_ecg = model.ecg_encoder(ecg_batch)
        z_mean_pcg, z_log_var_pcg = model.pcg_encoder(pcg_batch)
        
        z_mean = tf.concat([z_mean_ecg, z_mean_pcg], axis=1)
        z_log_var = tf.concat([z_log_var_ecg, z_log_var_pcg], axis=1)

        z = sampling(z_mean, z_log_var)

        ecg_recon = model.ecg_decoder(z)
        pcg_recon = model.pcg_decoder(z)

        ecg_loss = tf.reduce_mean(tf.abs(ecg_batch - ecg_recon), axis=1)
        pcg_loss = tf.reduce_mean(tf.abs(pcg_batch - pcg_recon), axis=1)

        total_loss = ecg_loss + pcg_loss
        recon_errors.extend(total_loss.numpy())

    return np.array(recon_errors)

# %%
#y_score = get_reconstruction_errors(vae, X_ecg_test, X_pcg_test)


# ### Compute ROC and AUC

roc_results = compute_roc_threshold(y_true, y_score)
print(f"AUC: {roc_results['auc']:.4f}, Best threshold: {roc_results['best_threshold']:.4f}")


# ### Plot ROC

plt.figure(figsize=(6, 5))
plt.plot(roc_results['fpr'], roc_results['tpr'], label=f"AUC = {roc_results['auc']:.4f}")
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for ECG + PCG VAE')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


NameError: name 'vae' is not defined