In [None]:
!pip install wfdb --quiet

In [None]:
import pandas as pd
import wfdb
import matplotlib.pyplot as plt
import numpy as np
import json


# Define dataset path
data_path = "/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/"

# Load SCP-ECG Statements (Error Handling Applied)
try:
    scp_df = pd.read_csv(data_path + "scp_statements.csv", encoding='utf-8')
except Exception as e:
    print("Error loading scp_statements.csv:", e)

# Load PTB-XL Metadata (Error Handling Applied)
try:
    df = pd.read_csv(data_path + "ptbxl_database.csv", encoding='utf-8')
except Exception as e:
    print("Error loading ptbxl_database.csv:", e)


# Drop rows where diagnostic_class is missing (important for classification)
scp_df_clean = scp_df.dropna(subset=["diagnostic_class"]).copy()  # Explicit Copy

# Fill missing values for subclass and category labels
scp_df_clean.loc[:, "diagnostic_subclass"] = scp_df_clean["diagnostic_subclass"].fillna("Unknown")

# Drop columns with >70% missing data
drop_cols = ["AHA code", "aECG REFID", "CDISC Code", "DICOM Code", "form", "rhythm"]
scp_df_clean = scp_df_clean.drop(columns=drop_cols)  # No `inplace=True`

# Display cleaned dataset info
print("Cleaned SCP-ECG Statements Data:\n", scp_df_clean.info())

# Fill missing numerical values with median
num_cols = ["age", "height", "weight"]
df[num_cols] = df[num_cols].fillna(df[num_cols].median())

# Fill missing categorical values with "Unknown"
cat_cols = ["sex", "device", "validated_by", "site"]
df[cat_cols] = df[cat_cols].fillna("Unknown")

# Convert recording_date to datetime
df["recording_date"] = pd.to_datetime(df["recording_date"], errors="coerce")

# Drop any remaining rows with critical missing data
df_clean = df.dropna()

# Display cleaned dataset info
print("Cleaned PTB-XL Database Data:\n", df_clean.info())

# Check for any remaining missing values
print("Remaining Missing Values in SCP:\n", scp_df_clean.isnull().sum())
print("\nRemaining Missing Values in PTB-XL:\n", df_clean.isnull().sum())

# Save cleaned data for faster processing later
scp_df_clean.to_csv("cleaned_scp_statements.csv", index=False)
df_clean.to_csv("cleaned_ptbxl_database.csv", index=False)

print("Cleaned datasets saved successfully!")

# Add full file paths for high-resolution (500 Hz) and low-resolution (100 Hz) ECG files
df["file_path_500"] = data_path + df["filename_hr"]  # 500Hz file paths
df["file_path_100"] = data_path + df["filename_lr"]  # 100Hz file paths

# Confirm that the columns are now present
print("Available Columns in DataFrame After Fix:\n", df.columns)

In [None]:
import pandas as pd
import numpy as np
import ast  # To parse scp_codes from string to dictionary
import wfdb
from tqdm import tqdm
import matplotlib.pyplot as plt

# ✅ Load cleaned metadata
df_metadata = pd.read_csv("/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/ptbxl_database.csv")

# ✅ Convert 'scp_codes' column from string to dictionary
df_metadata["scp_codes"] = df_metadata["scp_codes"].apply(ast.literal_eval)

# ✅ Define MI and STTC labels
mi_labels = ['INJAS', 'ILMI', 'INJIL', 'PMI', 'ISCAL', 'ISCLA', 'AMI', 'LMI', 'INJAL', 
             'ALMI', 'INJIN', 'IPMI', 'ISC_', 'IMI', 'ISCAS', 'ISCIN', 'INJLA', 'ISCIL', 
             'ISCAN', 'IPLMI', 'ASMI']
sttc_labels = ['NST_', 'STE_', 'STD_', 'STACH']

# ✅ Function to assign diagnostic class
def assign_diagnostic_class(scp_dict):
    scp_keys = list(scp_dict.keys())
    if any(label in scp_keys for label in mi_labels):
        return "MI"
    elif any(label in scp_keys for label in sttc_labels):
        return "STTC"
    elif "NORM" in scp_keys:
        return "NORM"
    return "OTHER"

# ✅ Apply function to dataset
df_metadata["diagnostic_class"] = df_metadata["scp_codes"].apply(assign_diagnostic_class)

# ✅ Filter out OTHER
df_metadata = df_metadata[df_metadata["diagnostic_class"] != "OTHER"]

# ✅ Reassign MI and STTC into ABNORM for binary classification
df_metadata["diagnostic_class"] = df_metadata["diagnostic_class"].apply(lambda x: "NORM" if x == "NORM" else "ABN")

# ✅ Encode binary labels: NORM = 0, ABNORM = 1
label_mapping = {"NORM": 0, "ABN": 1}
df_metadata["label"] = df_metadata["diagnostic_class"].map(label_mapping)

# ✅ Print updated label distribution
print("\n✅ Final Binary Label Distribution:")
print(df_metadata["diagnostic_class"].value_counts())

# ✅ Display sample metadata
print("\n✅ Sample Metadata with Labels:")
print(df_metadata[["scp_codes", "diagnostic_class", "label"]].head())

# ✅ Define file paths (ensure data_path is defined)
data_path = "/kaggle/input/ptb-xl-dataset/ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/"
df_metadata["file_path_100"] = data_path + df_metadata["filename_lr"]
df_metadata["file_path_500"] = data_path + df_metadata["filename_hr"]

# ✅ Debug: Check columns
print("\n✅ Columns in df_metadata:", df_metadata.columns)

# ✅ Sample data preview
print(df_metadata[["file_path_100", "file_path_500", "label"]].head())

# ✅ Function to load ECG signal
def load_ecg(record_path):
    record = wfdb.rdrecord(record_path)
    return np.array(record.p_signal)  # ECG signal as NumPy array

# ✅ Downsample Normal class (if needed) to balance dataset
norm_samples = df_metadata[df_metadata["diagnostic_class"] == "NORM"]
abnorm_samples = df_metadata[df_metadata["diagnostic_class"] == "ABN"]
df_balanced = pd.concat([norm_samples, abnorm_samples])

# ✅ Shuffle
df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True)

# ✅ New label distribution
print("✅ New Dataset Size:")
print(df_balanced["diagnostic_class"].value_counts())

# ✅ Load ECG signals (100Hz only for memory efficiency)
subset_size = len(df_balanced)
ecg_100Hz = np.array([load_ecg(f) for f in tqdm(df_balanced["file_path_100"][:subset_size])])

# ✅ ECG data shape
print("✅ Loaded ECG Subset Shape (100Hz):", ecg_100Hz.shape)

# ✅ Final label distribution check
print("✅ Label Distribution in Processed Data:")
print(df_balanced["diagnostic_class"].value_counts())

# ✅ Debug: Preview of file paths and labels
print(df_balanced[["file_path_100", "file_path_500", "diagnostic_class", "label"]].head())

# ✅ Plot a random ECG signal (first lead)
sample_idx = np.random.randint(0, ecg_100Hz.shape[0])
plt.figure(figsize=(12, 4))
plt.plot(ecg_100Hz[sample_idx, 0, :], label="Lead 1 ECG Signal (100Hz)", linestyle="dashed")
plt.xlabel("Time (ms)")
plt.ylabel("Amplitude (µV)")
plt.title(f"ECG Signal from Record {sample_idx}")
plt.legend()
plt.show()


In [None]:
from sklearn.model_selection import train_test_split

# ✅ Prepare features (X) and labels (y)
X = ecg_100Hz  # Shape: (samples, leads, time)
y = df_balanced["label"].values  # 0 = NORM, 1 = ABNORM

# ✅ Identify normal (healthy) indices
healthy_indices = np.where(y == 0)[0]
abnorm_indices = np.where(y == 1)[0]

# ✅ Split healthy data for training the autoencoder
x_healthy = X[healthy_indices]
x_healthy_train, x_healthy_test = train_test_split(x_healthy, test_size=0.2, random_state=42)

# ✅ Split entire dataset for evaluation
x_all_train, x_all_test, y_all_train, y_all_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# ✅ Final checks
print("\n✅ Autoencoder Training Data (Healthy Only):")
print("Train:", x_healthy_train.shape, " Test:", x_healthy_test.shape)

print("\n✅ Evaluation Dataset (All Data):")
print("Train:", x_all_train.shape, " Test:", x_all_test.shape)
print("Train labels:", np.bincount(y_all_train))
print("Test labels:", np.bincount(y_all_test))


In [None]:
import numpy as np

# Count normal (0) and anomalous (1) labels
print("✅ Test Label Distribution:")
print("Normal     (0):", np.sum(y_all_test == 0))
print("Anomalous  (1):", np.sum(y_all_test == 1))


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ✅ Update label titles for binary classification
label_title = ["NORM", "ABN"]

# ✅ Function to plot ECG samples
def plot_ecg_samples(X_data, y_data, dataset_name):
    unique_labels = np.unique(y_data)
    fig, axes = plt.subplots(len(unique_labels), 1, figsize=(12, 6), sharex=True)

    for i, label in enumerate(unique_labels):
        # Find the first sample for each class
        sample_idx = np.where(y_data == label)[0][0]
        
        # Plot only Lead I (first channel)
        axes[i].plot(X_data[sample_idx, :, 0], label=f"Label {label} - {label_title[label]}")
        axes[i].set_title(f"{dataset_name} - Class: {label_title[label]}")
        axes[i].legend()
        axes[i].grid(True)

    plt.xlabel("Time Steps")
    plt.tight_layout()
    plt.show()

# ✅ Show samples from your full classification dataset
plot_ecg_samples(x_all_train, y_all_train, "Training (Binary Labels)")
plot_ecg_samples(x_all_test, y_all_test, "Testing (Binary Labels)")


In [None]:
# import tensorflow as tf
# from tensorflow.keras.models import Model
# from tensorflow.keras.layers import (
#     Input, Conv1D, BatchNormalization, Dropout, LSTM, Dense, Flatten,
#     Conv1DTranspose, Reshape, TimeDistributed, RepeatVector
# )

# # ✅ Define input shape from healthy training data
# input_shape = (x_healthy_train.shape[1], x_healthy_train.shape[2])  # e.g., (1000, 12)

# # ✅ Input Layer
# input_layer = Input(shape=input_shape)

# # ✅ CNN Encoder
# x = Conv1D(128, kernel_size=7, strides=1, activation="relu", padding="same")(input_layer)
# x = BatchNormalization()(x)
# x = Dropout(0.3)(x)

# x = Conv1D(256, kernel_size=5, strides=1, activation="relu", padding="same")(x)
# x = BatchNormalization()(x)
# x = Dropout(0.3)(x)

# x = Flatten()(x)
# encoded = Dense(256, activation="relu")(x)

# # ✅ LSTM Decoder
# x = RepeatVector(input_shape[0])(encoded)
# x = LSTM(256, return_sequences=True)(x)
# x = LSTM(128, return_sequences=True)(x)
# x = LSTM(64, return_sequences=True)(x)

# # ✅ CNN Decoder
# x = Conv1DTranspose(128, kernel_size=5, strides=1, activation="relu", padding="same")(x)
# x = BatchNormalization()(x)

# x = Conv1DTranspose(64, kernel_size=7, strides=1, activation="relu", padding="same")(x)
# x = BatchNormalization()(x)

# # ✅ Output layer with sigmoid activation (normalized output)
# decoded = Conv1DTranspose(input_shape[1], kernel_size=7, strides=1, activation="sigmoid", padding="same")(x)

# # ✅ Compile Autoencoder
# autoencoder = Model(inputs=input_layer, outputs=decoded)
# autoencoder.compile(optimizer="adam", loss="mae")

# # ✅ Summary
# autoencoder.summary()

# # ✅ Early Stopping Callback
# early_stopping = tf.keras.callbacks.EarlyStopping(
#     monitor="val_loss",
#     patience=10,
#     restore_best_weights=True
# )

# # ✅ Train Autoencoder on healthy ECGs only
# history = autoencoder.fit(
#     x_healthy_train, x_healthy_train,  # Self-reconstruction
#     validation_data=(x_healthy_test, x_healthy_test),
#     epochs=1000,
#     batch_size=64,
#     callbacks=[early_stopping],
#     verbose=1
# )


In [None]:
# import tensorflow as tf
# from tensorflow.keras.models import Model
# from tensorflow.keras.layers import (
#     Input, Conv1D, MaxPooling1D, Dropout, BatchNormalization,
#     Bidirectional, LSTM, RepeatVector, TimeDistributed, Dense, UpSampling1D
# )

# # ✅ Input shape (e.g., 1000 timesteps, 12 leads)
# input_shape = (x_healthy_train.shape[1], x_healthy_train.shape[2])  # (1000, 12)

# # ✅ Encoder
# inputs = Input(shape=input_shape)

# x = Conv1D(64, kernel_size=7, activation="relu", padding="same")(inputs)
# x = MaxPooling1D(pool_size=2)(x)
# x = BatchNormalization()(x)

# x = Conv1D(128, kernel_size=5, activation="relu", padding="same")(x)
# x = MaxPooling1D(pool_size=2)(x)
# x = BatchNormalization()(x)
# x = Dropout(0.3)(x)

# # ✅ Bottleneck: BiLSTM
# x = Bidirectional(LSTM(64, return_sequences=False))(x)
# encoded = Dense(128, activation="relu")(x)

# # ✅ Decoder
# x = RepeatVector(input_shape[0] // 4)(encoded)
# x = LSTM(64, return_sequences=True)(x)
# x = UpSampling1D(size=2)(x)

# x = Conv1D(128, kernel_size=5, activation="relu", padding="same")(x)
# x = BatchNormalization()(x)
# x = UpSampling1D(size=2)(x)

# x = Conv1D(64, kernel_size=7, activation="relu", padding="same")(x)
# x = BatchNormalization()(x)

# # ✅ Output Layer
# decoded = Conv1D(input_shape[1], kernel_size=7, activation="sigmoid", padding="same")(x)

# # ✅ Compile Model
# autoencoder = Model(inputs, decoded)
# autoencoder.compile(optimizer="adam", loss="mae")
# autoencoder.summary()

# # ✅ Early Stopping
# early_stopping = tf.keras.callbacks.EarlyStopping(
#     monitor="val_loss",
#     patience=10,
#     restore_best_weights=True
# )

# # ✅ Train
# history = autoencoder.fit(
#     x_healthy_train, x_healthy_train,
#     validation_data=(x_healthy_test, x_healthy_test),
#     epochs=50,
#     batch_size=32,
#     # callbacks=[early_stopping],
#     verbose=1
# )


In [None]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, BatchNormalization, Dropout, Activation, Reshape, Permute

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, BatchNormalization, Dropout, Activation, Reshape, Permute

# Input: (1000, 12)
inputs = Input(shape=(1000, 12))

# Reshape to (12, 1000, 1)
x = Permute((2, 1))(inputs)          # (1000, 12) → (12, 1000)
x = Reshape((12, 1000, 1))(x)        # Add channel dim → (12, 1000, 1)

# Encoder
x = Conv2D(32, (3, 3), strides=(1, 2), padding='same', activation='tanh')(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)

x = Conv2D(64, (3, 3), strides=(1, 2), padding='same', activation='tanh')(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)

x = Conv2D(128, (3, 3), strides=(2, 2), padding='same', activation='tanh')(x)
x = BatchNormalization()(x)
x = Dropout(0.2)(x)

# Decoder
x = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding='same', activation='tanh')(x)
x = BatchNormalization()(x)

x = Conv2DTranspose(32, (3, 3), strides=(1, 2), padding='same', activation='tanh')(x)
x = BatchNormalization()(x)

x = Conv2DTranspose(1, (3, 3), strides=(1, 2), padding='same', activation='tanh')(x)  # back to 12x1000x1

# Output: reshape back to (1000, 12)
x = Reshape((12, 1000))(x)
decoded = Permute((2, 1))(x)  # → (1000, 12)

# Model
autoencoder = Model(inputs, decoded)
autoencoder.compile(optimizer='adam', loss='mae')
autoencoder.summary()

# Normalize input
x_healthy_train_scaled = x_healthy_train / np.max(np.abs(x_healthy_train), axis=(1,2), keepdims=True)
x_healthy_test_scaled  = x_healthy_test  / np.max(np.abs(x_healthy_test), axis=(1,2), keepdims=True)

# Train
autoencoder.fit(
    x_healthy_train_scaled, x_healthy_train_scaled,
    validation_data=(x_healthy_test_scaled, x_healthy_test_scaled),
    epochs=1000,
    batch_size=32
)

autoencoder.save('autoencoder_v3.h5')


In [None]:
from tensorflow.keras.models import load_model
from tensorflow.keras.losses import MeanAbsoluteError

# Load the saved model (compiled with 'mae' loss)
autoencoder = load_model(
    "/kaggle/input/autoencoder_v3/keras/default/1/autoencoder_v3.h5",
    custom_objects={"mae": MeanAbsoluteError()}
)

print("✅ Autoencoder loaded successfully.")

# ✅ Normalize Test Data
X_test = np.copy(x_all_test)
X_test = (X_test - np.min(X_test)) / (np.max(X_test) - np.min(X_test))

print(f"Data SHAPE {X_test.shape}")

# ✅ Predict Reconstructed ECGs
X_test_reconstructed = autoencoder.predict(X_test)

# ✅ Compute Reconstruction Error (MAE per sample)
reconstruction_errors = np.mean(np.abs(X_test - X_test_reconstructed), axis=(1, 2))

# ✅ Set Threshold (95th percentile)
threshold = np.percentile(reconstruction_errors, 90)
pred_label = (reconstruction_errors > threshold).astype(int)  # 1 = Anomalous, 0 = Normal

# ✅ Print Summary
print(f"Threshold (95th percentile): {threshold}")
print("\n✅ Reconstruction Error Stats:")
print(f"Mean: {np.mean(reconstruction_errors):.5f} | Std: {np.std(reconstruction_errors):.5f}")
print("\n✅ Anomaly Predictions:")
print(f"Predicted Normals: {(pred_label == 0).sum()} | Anomalies: {(pred_label == 1).sum()}")

# ✅ Optional: Compare with ground truth
from sklearn.metrics import classification_report, confusion_matrix

print("\n✅ Classification Report (Autoencoder vs. Ground Truth):")
# print(classification_report(y_all_test, anomaly_labels, target_names=["Normal", "Abnormal"]))

# ✅ Visualize a Normal ECG
normal_indices = np.where(pred_label == 0)[0]
if len(normal_indices) > 0:
    idx = normal_indices[0]
    fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
    axes[0].plot(X_test[idx, :, 0], label="Original ECG - Normal")
    axes[0].set_title("Original ECG - Normal")
    axes[1].plot(X_test[idx, :, 0], linestyle="--", label="Reconstructed ECG - Normal", color="green")
    axes[1].set_title("Reconstructed ECG - Normal")
    plt.xlabel("Time Steps")
    plt.tight_layout()
    plt.show()

# ✅ Visualize an Anomalous ECG
anomalous_indices = np.where(pred_label == 1)[0]
if len(anomalous_indices) > 0:
    idx = anomalous_indices[0]
    fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
    axes[0].plot(X_test[idx, :, 0], label="Original ECG - Anomalous")
    axes[0].set_title("Original ECG - Anomalous")
    axes[1].plot(X_test_reconstructed[idx, :, 0], label="Reconstructed ECG - Anomalous", color="red")
    axes[1].set_title("Reconstructed ECG - Anomalous")
    plt.xlabel("Time Steps")
    plt.tight_layout()
    plt.show()


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, roc_curve, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# ✅ Step 1: Compute Reconstruction Error (MSE per sample)
reconstructed_ecg = autoencoder.predict(x_all_test)
reconstruction_errors = np.mean(np.power(x_all_test - reconstructed_ecg, 2), axis=(1, 2))

# ✅ Step 2: Apply threshold to get anomaly predictions
threshold = np.mean(reconstruction_errors) + 2 * np.std(reconstruction_errors)
predicted_labels = (reconstruction_errors > threshold).astype(int)

# ✅ Step 3: Ground Truth Labels (from y_all_test)
true_labels = y_all_test  # Already 0 = NORM, 1 = ABN

# ✅ Step 4: Metrics
precision = precision_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels)
auc = roc_auc_score(true_labels, reconstruction_errors)

print(f"✅ Precision: {precision:.4f}")
print(f"✅ Recall:    {recall:.4f}")
print(f"✅ F1-Score:  {f1:.4f}")
print(f"✅ AUC-ROC:   {auc:.4f}")

# ✅ Step 5: ROC Curve
fpr, tpr, _ = roc_curve(true_labels, reconstruction_errors)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}", color="blue")
plt.plot([0, 1], [0, 1], linestyle="--", color="grey")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve for Autoencoder-Based Anomaly Detection")
plt.legend()
plt.grid(True)
plt.show()

# ✅ Step 6: Confusion Matrix
cm = confusion_matrix(true_labels, predicted_labels)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", 
            xticklabels=["Normal", "Anomalous"], 
            yticklabels=["Normal", "Anomalous"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()


In [None]:
pip install stable-baselines3

In [None]:
# import gym
# import numpy as np
# from gym import spaces
# from stable_baselines3 import PPO

# class ECGAnomalyEnv(gym.Env):
#     def __init__(self, reconstruction_errors, true_labels):
#         super(ECGAnomalyEnv, self).__init__()

#         # Save input data (MSE reconstruction errors and labels)
#         self.reconstruction_errors = reconstruction_errors
#         self.true_labels = true_labels  # 0 = Normal, 1 = Anomalous

#         # Define state space (Normalized reconstruction error)
#         self.observation_space = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)

#         # Define action space (Increase, Decrease, or Maintain threshold)
#         self.action_space = spaces.Discrete(3)  # [0: Decrease, 1: Maintain, 2: Increase]

#         # Initialize threshold and step index
#         self.threshold = np.percentile(self.reconstruction_errors, 90)  # Start with high threshold
#         self.current_step = 0

#     def step(self, action):
#         """
#         RL Agent takes an action to adjust the anomaly threshold.
#         """
#         # Adjust threshold based on action
#         if action == 0:  # Decrease threshold
#             self.threshold *= 0.95
#         elif action == 2:  # Increase threshold
#             self.threshold *= 1.05

#         # Get current sample reconstruction error & ground truth label
#         recon_error = self.reconstruction_errors[self.current_step]
#         true_label = self.true_labels[self.current_step]

#         # Apply threshold-based classification
#         predicted_label = 1 if recon_error > self.threshold else 0

#         # Compute reward
#         if predicted_label == true_label:  # Correct classification
#             reward = 1
#         elif predicted_label == 1 and true_label == 0:  # False Positive (Penalize)
#             reward = -1
#         else:  # False Negative (Critical Error)
#             reward = -2

#         # Move to next step
#         self.current_step += 1
#         done = self.current_step >= len(self.reconstruction_errors)  # Episode ends at dataset end

#         # Return new state, reward, and done flag
#         return np.array([recon_error], dtype=np.float32), reward, done, {}

#     def reset(self):
#         """
#         Reset the environment at the beginning of each episode.
#         """
#         self.current_step = 0
#         self.threshold = np.percentile(self.reconstruction_errors, 90)  # Reset threshold
#         return np.array([self.reconstruction_errors[self.current_step]], dtype=np.float32)

# import csv
# import os
# from stable_baselines3.common.callbacks import BaseCallback

# # ✅ Define Custom Logging Callback to Save Training Logs in CSV
# class PPOLoggingCallback(BaseCallback):
#     def __init__(self, log_file="ppo_training_logs.csv", verbose=1):
#         super(PPOLoggingCallback, self).__init__(verbose)
#         self.log_file = log_file
#         self.first_write = not os.path.exists(log_file)  # Check if file exists

#     def _on_step(self) -> bool:
#         # Collect training data
#         log_data = {
#             "timesteps": self.num_timesteps,
#             "policy_loss": float(self.model.logger.name_to_value.get("train/policy_gradient_loss", 0)),
#             "value_loss": float(self.model.logger.name_to_value.get("train/value_loss", 0)),
#             "explained_variance": float(self.model.logger.name_to_value.get("train/explained_variance", 0)),
#             "entropy_loss": float(self.model.logger.name_to_value.get("train/entropy_loss", 0)),
#             "clip_fraction": float(self.model.logger.name_to_value.get("train/clip_fraction", 0)),
#             "approx_kl": float(self.model.logger.name_to_value.get("train/approx_kl", 0))
#         }

#         # Write logs to CSV file
#         with open(self.log_file, mode="a", newline="") as f:
#             writer = csv.DictWriter(f, fieldnames=log_data.keys())

#             if self.first_write:
#                 writer.writeheader()  # Write headers only once
#                 self.first_write = False

#             writer.writerow(log_data)  # Append new log data

#         return True  # Continue training

# # ✅ Integrate Logging Callback into PPO Training
# log_callback = PPOLoggingCallback(log_file="ppo_training_logs.csv")

# # ✅ Now logs will be saved in `ppo_training_logs.csv` during training! 🎯




In [None]:
import gym
import numpy as np
import os
import csv
import matplotlib.pyplot as plt
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback

class ECGAnomalyEnv(gym.Env):
    def __init__(self, reconstruction_errors, true_labels, patient_ages):
        super(ECGAnomalyEnv, self).__init__()
        self.reconstruction_errors = reconstruction_errors
        self.true_labels = true_labels
        self.patient_ages = patient_ages  # New input for age
        self.observation_space = spaces.Box(low=0, high=1, shape=(2,), dtype=np.float32)  # Include reconstruction error and age as state
        self.action_space = spaces.Discrete(3)
        self.threshold = np.percentile(self.reconstruction_errors, 90)
        self.current_step = 0
        self.logged_thresholds = []
        self.logged_rewards = []

    def step(self, action):
        # Adjust threshold based on action
        if action == 0:
            self.threshold *= 0.95
        elif action == 2:
            self.threshold *= 1.05

        # Get reconstruction error, true label, and patient's age
        recon_error = self.reconstruction_errors[self.current_step]
        true_label = self.true_labels[self.current_step]
        age = self.patient_ages[self.current_step]

        # Predicted label based on reconstruction error
        predicted_label = 1 if recon_error > self.threshold else 0

        # Reward calculation
        reward = self.compute_reward(predicted_label, true_label, age)

        # Log the thresholds and rewards
        self.logged_thresholds.append(self.threshold)
        self.logged_rewards.append(reward)

        self.current_step += 1
        done = self.current_step >= len(self.reconstruction_errors)
        return np.array([recon_error, age], dtype=np.float32), reward, done, {}

    def reset(self):
        self.current_step = 0
        self.threshold = np.percentile(self.reconstruction_errors, 90)
        self.logged_thresholds = []
        self.logged_rewards = []
        return np.array([self.reconstruction_errors[self.current_step], self.patient_ages[self.current_step]], dtype=np.float32)

    def compute_reward(self, predicted_label, true_label, age):
        # Base reward based on label prediction
        if predicted_label == true_label:
            reward = 1  # Correct detection
        elif predicted_label == 1 and true_label == 0:
            reward = -1  # False positive
        else:
            reward = -2  # False negative (critical error)

        # Adjust reward based on age group
        if 18 <= age <= 35:
            reward *= 1.2  # Young adults
        elif 36 <= age <= 60:
            reward *= 1  # Middle-aged adults
        else:
            reward *= 0.8  # Senior adults

        return reward


# ✅ 2. PPO Logging Callback for Training Metrics
class PPOLoggingCallback(BaseCallback):
    def __init__(self, log_file="ppo_training_logs.csv", verbose=1):
        super(PPOLoggingCallback, self).__init__(verbose)
        self.log_file = log_file
        self.first_write = not os.path.exists(log_file)

    def _on_step(self) -> bool:
        log_data = {
            "timesteps": self.num_timesteps,
            "policy_loss": float(self.model.logger.name_to_value.get("train/policy_gradient_loss", 0)),
            "value_loss": float(self.model.logger.name_to_value.get("train/value_loss", 0)),
            "explained_variance": float(self.model.logger.name_to_value.get("train/explained_variance", 0)),
            "entropy_loss": float(self.model.logger.name_to_value.get("train/entropy_loss", 0)),
            "clip_fraction": float(self.model.logger.name_to_value.get("train/clip_fraction", 0)),
            "approx_kl": float(self.model.logger.name_to_value.get("train/approx_kl", 0))
        }
        with open(self.log_file, mode="a", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=log_data.keys())
            if self.first_write:
                writer.writeheader()
                self.first_write = False
            writer.writerow(log_data)
        return True

# ✅ Fixed: Add _on_step method to make the class instantiable
class PPOExtendedLoggingCallback(BaseCallback):
    def __init__(self, env, log_dir="ppo_logs", verbose=1):
        super(PPOExtendedLoggingCallback, self).__init__(verbose)
        self.env = env
        self.log_dir = log_dir
        os.makedirs(log_dir, exist_ok=True)

    def _on_step(self) -> bool:
        return True  # Required by BaseCallback

    def _on_training_end(self):
        np.save(os.path.join(self.log_dir, "dynamic_thresholds.npy"), np.array(self.env.logged_thresholds))
        np.save(os.path.join(self.log_dir, "reward_progression.npy"), np.array(self.env.logged_rewards))

In [None]:
import numpy as np

# ✅ Use already loaded and processed data
# X = ecg_100Hz (shape: samples, leads, time)
# y = df_balanced["label"].values

# ✅ Normalize signals (optional, for better training)
X_normalized = (ecg_100Hz - np.min(ecg_100Hz)) / (np.max(ecg_100Hz) - np.min(ecg_100Hz))

# ✅ Transpose shape to (samples, time, leads) if needed
if X_normalized.shape[1] == 12:
    X_normalized = np.transpose(X_normalized, (0, 2, 1))  # From (N, 12, T) to (N, T, 12)

# ✅ Split test dataset (already done earlier)
# x_all_test, y_all_test = ...

# ✅ Print info
print("Test Set Shape:", x_all_test.shape)
print("Test Labels Shape:", y_all_test.shape)

# ✅ Count labels in test set
print("\n✅ Test Label Distribution:")
print("Normal     (0):", np.sum(y_all_test == 0))
print("Anomalous  (1):", np.sum(y_all_test == 1))


In [None]:
import numpy as np

# ✅ Normalize all ECG signals (global)
X_all = (ecg_100Hz - np.min(ecg_100Hz)) / (np.max(ecg_100Hz) - np.min(ecg_100Hz))

# ✅ Reshape to (samples, time, leads) if needed
if X_all.shape[1] == 12:
    X_all = np.transpose(X_all, (0, 2, 1))  # From (N, 12, T) to (N, T, 12)

# ✅ Get predictions from autoencoder
reconstructed_ecg = autoencoder.predict(X_all)

# ✅ Compute reconstruction error (MSE per sample)
reconstruction_errors = np.mean(np.power(X_all - reconstructed_ecg, 2), axis=(1, 2))

# ✅ Get true labels (0 = normal, 1 = anomalous)
y_true = df_balanced["label"].values

# ✅ Save to .npy files
np.save("reconstruction_errors.npy", reconstruction_errors)
np.save("true_labels.npy", y_true)

print("✅ Saved reconstruction_errors.npy and true_labels.npy")


In [None]:
# Training Process
env = ECGAnomalyEnv(reconstruction_errors, true_labels)
ppo_model = PPO(
    "MlpPolicy", env, verbose=1, 
    learning_rate=0.0005, 
    gamma=0.99, 
    n_steps=512, 
    batch_size=64, 
    clip_range=0.2,  
    ent_coef=0.02, 
    vf_coef=0.7, 
    max_grad_norm=0.5
)

# Train PPO model
log_callback = PPOLoggingCallback("ppo_training_logs.csv")
ppo_model.learn(total_timesteps=100000, callback=log_callback)

# Save trained PPO model
ppo_model.save("ppo_ecg_anomaly_model")

# Visualization after training
thresholds = np.load("ppo_logs/dynamic_thresholds.npy")
rewards = np.load("ppo_logs/reward_progression.npy")

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(thresholds, color="purple")
plt.title("Dynamic Threshold Over Time")
plt.xlabel("Timestep")
plt.ylabel("Threshold")

plt.subplot(1, 2, 2)
plt.plot(np.convolve(rewards, np.ones(50)/50, mode='valid'), color="orange")
plt.title("Reward Progression")
plt.xlabel("Timestep")
plt.ylabel("Reward")

plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt

# ✅ Load PPO training logs
df_logs = pd.read_csv("ppo_training_logs.csv")

# ✅ Rename Columns If Needed (Check for mismatches)
df_logs.columns = df_logs.columns.str.strip().str.lower()  # Normalize column names

# ✅ Plot Learning Curves
plt.figure(figsize=(12, 5))

# Policy Loss Curve
plt.subplot(1, 2, 1)
plt.plot(df_logs["timesteps"], df_logs["policy_loss"], label="Policy Loss", color="blue")
plt.xlabel("Timesteps")
plt.ylabel("Loss")
plt.title("PPO Policy Loss Over Time")
plt.legend()

# Value Loss & Explained Variance Curve
plt.subplot(1, 2, 2)
plt.plot(df_logs["timesteps"], df_logs["value_loss"], label="Value Loss", color="red")
plt.plot(df_logs["timesteps"], df_logs["explained_variance"], label="Explained Variance", color="green")
plt.xlabel("Timesteps")
plt.ylabel("Value")
plt.title("PPO Value Loss & Explained Variance")
plt.legend()

plt.show()
