### 1. Setup & Imports

In [None]:
!pip install wfdb

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import wfdb  # PhysioNet's Waveform Database library
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Conv1D, MaxPooling1D, UpSampling1D, Reshape
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import os

### Download dataset

In [None]:
!wget -r -N -c -np https://physionet.org/files/mitdb/1.0.0/
!mv physionet.org/files/mitdb/1.0.0 ./mitdb  # Move files to a simpler path
!rm -r physionet.org  # Clean up

### 2. Load MIT-BIH Data

In [None]:
# List all records (e.g., '100', '101', ...)
records = [f.split('.')[0] for f in os.listdir('mitdb') if f.endswith('.dat')]

# Load signals and annotations
def load_ecg_record(record_name):
    signal = wfdb.rdrecord(f'mitdb/{record_name}').p_signal[:, 0]  # Lead II (MLII)
    annotation = wfdb.rdann(f'mitdb/{record_name}', 'atr')
    return signal, annotation

# Example: Load record
ecg, ann = load_ecg_record('100')
print(len(ecg))

In [None]:
# List all records (e.g., '100', '101', ...)
records = [f.split('.')[0] for f in os.listdir('mitdb') if f.endswith('.dat')]

import numpy as np
import pandas as pd
import wfdb

def load_ecg_with_labels(record_name, window_size=256):
    # Load ECG and annotations
    signal = wfdb.rdrecord(f'mitdb/{record_name}').p_signal[:, 0]  # Lead II
    annotation = wfdb.rdann(f'mitdb/{record_name}', 'atr')

    segments = []
    labels = []

    # Loop over signal in fixed-size windows
    for start in range(0, len(signal) - window_size, window_size):
        end = start + window_size
        segment = signal[start:end]

        # Find beats within this window
        beat_indices = np.where((annotation.sample >= start) & (annotation.sample < end))[0]
        beat_labels = [annotation.symbol[i] for i in beat_indices]

        # Label as anomaly if any beat != 'N'
        label = 0 if all(b == 'N' for b in beat_labels) else 1

        segments.append(segment)
        labels.append(label)

    # Convert to DataFrame
    df = pd.DataFrame({
        "ECG_Segment": segments,
        "Label": labels
    })

    return df

df = load_ecg_with_labels('100', window_size=256)
print(df.head())
print(df['Label'].value_counts())


### Basic EDA

In [None]:
symbols, counts = np.unique(ann.symbol, return_counts=True)
readable_counts = {str(sym): int(cnt) for sym, cnt in zip(symbols, counts)}
print("Symbols and their counts: ",readable_counts)

In [None]:
# ------------------------------
# Show dataset distribution
# ------------------------------
total_samples = len(df['Label'])
normal_samples = np.sum(df['Label'] == 0)
anomaly_samples = np.sum(df['Label'] == 1)

print(f"Total ECG Segments: {total_samples}")
print(f"Normal Segments: {normal_samples}")
print(f"Anomaly Segments: {anomaly_samples}")


In [None]:
plt.figure(figsize=(5,4))
plt.bar(["Normal", "Anomaly"], [normal_samples, anomaly_samples], color=["green", "red"])
plt.title("ECG Data Distribution")
plt.ylabel("Count")
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Pick first segment and flatten
segment = df["ECG_Segment"].iloc[0]  # This is already a 1D array
plt.figure(figsize=(10, 4))
plt.plot(segment)
plt.title("First ECG Segment")
plt.xlabel("Sample")
plt.ylabel("Amplitude (mV)")
plt.show()

In [None]:
# List all .dat files (each corresponds to one record)
records = [f.split('.')[0] for f in os.listdir('mitdb') if f.endswith('.dat')]
print(f"Found {len(records)} records: {records[:5]}...")

### 3. Preprocessing

3.1 Segment ECG into Normal Beats

In [None]:
all_segments = []
window_size = 256

for record in records:
    try:
        # Load ECG and annotations
        ecg, ann = load_ecg_record(record)  # Your existing function

        # Extract segments (normal beats 'N' + other valid labels)
        normal_labels = ['N', 'L', 'R', 'e', 'j']  # Normal variants
        r_peaks = ann.sample[np.isin(ann.symbol, normal_labels)]

        for peak in r_peaks:
            start = max(0, peak - window_size//2)
            end = start + window_size
            if end <= len(ecg):
                segment = ecg[start:end]
                all_segments.append(segment)
    except Exception as e:
        print(f"⚠️ Error processing {record}: {e}")
        continue

all_segments = np.array(all_segments)
print(f"✅ Total segments extracted: {len(all_segments)}")

In [None]:
# Create DataFrame for beats
df_beats = pd.DataFrame(all_segments)

print("\nShape of segmented beats array:", df_beats.shape)
print("First beat segment:\n", df_beats.iloc[0])

In [None]:
plt.figure(figsize=(10, 4))
plt.plot(df_beats[:1000])
plt.title("ECG Signal Snippet")
plt.xlabel("Sample")
plt.ylabel("Amplitude (mV)")
plt.show()

3.2 Normalize Data

In [None]:
from sklearn.preprocessing import MinMaxScaler

# Normalize all segments together
scaler = MinMaxScaler(feature_range=(-1, 1))
segments_normalized = scaler.fit_transform(all_segments.reshape(-1, window_size)).reshape(-1, window_size, 1)

In [None]:
segments_normalized[:10]

### Dataset before and after preprocessing

In [None]:
import numpy as np
import pandas as pd
import wfdb
from sklearn.preprocessing import MinMaxScaler

# -----------------------------
# Load raw ECG signal (Record 100)
# -----------------------------
record_name = "100"
signal = wfdb.rdrecord(f"mitdb/{record_name}").p_signal[:, 0]  # Lead II
annotation = wfdb.rdann(f"mitdb/{record_name}", "atr")

# Create DataFrame for raw dataset (first 20 samples for display)
raw_df = pd.DataFrame({
    "Index": range(20),
    "ECG_Value": signal[:20],
    "Annotation": [annotation.symbol[i] if i < len(annotation.symbol) else None for i in range(20)]
})

print("Raw Dataset (Record 100 - first 10 samples):")
display(raw_df.head(10))
print("\n\n")
# -----------------------------
# Preprocessing: Segment & Label
# -----------------------------
window_size = 256
df_preprocessed = load_ecg_with_labels(record_name="100", window_size=window_size)

# Keep only NORMAL segments
df_normal = df_preprocessed[df_preprocessed["Label"] == 0].reset_index(drop=True)

# -----------------------------
# Normalization (Min-Max scaling to [-1, 1])
# -----------------------------
scaler = MinMaxScaler(feature_range=(-1, 1))
segments_array = np.stack(df_normal["ECG_Segment"].values)  # shape: (n_samples, window_size)
segments_normalized = scaler.fit_transform(segments_array)

# Add normalized data back into DataFrame
df_normal["Normalized_Segment"] = list(segments_normalized)
# --- create a clean table (first 10 values of each normalized segment) ---
normalized_df = pd.DataFrame(
    segments_normalized[:, :10],  # take first 10 values for readability
    columns=[f"Val_{i+1}" for i in range(10)]
)

# (Optional) Add the Label column for clarity
normalized_df["Label"] = df_normal["Label"].values
print("Preprocessed ECG Segments (showing first 10 values of each segment):")
display(normalized_df.head(10))


### Display Anomaly data

In [None]:
import pandas as pd
import numpy as np

# Assuming 'df' contains record 100 ECG data with 'ECG_Segment' and 'Label' columns
# Label = 0 -> Normal, 1 -> Anomalous

# --- Filter anomalous segments ---
df_anomaly = df[df["Label"] == 1].reset_index(drop=True)

# --- Convert segments into array ---
segments_array = np.stack(df_anomaly["ECG_Segment"].values)

# --- Create DataFrame (show first 10 values for readability) ---
anomaly_df = pd.DataFrame(
    segments_array[:, :10],  # first 10 values of each anomalous segment
    columns=[f"Val_{i+1}" for i in range(10)]
)

# Add annotation/label info
anomaly_df["Label"] = df_anomaly["Label"].values

print("Anomalous ECG Segments from Record 100 (first 10 values shown):")
display(anomaly_df.head(10))


3.3 Train-Test Split

In [None]:
X_train, X_test = train_test_split(segments_normalized, test_size=0.1, random_state=42)
X_train = X_train.reshape(-1, window_size, 1)  # Add channel dim for Conv1D
X_test = X_test.reshape(-1, window_size, 1)

### 4. Build GAN Model

In [None]:
from tensorflow.keras.layers import Input, Dense, Reshape, Conv1DTranspose, BatchNormalization, LeakyReLU
from tensorflow.keras.models import Model

def build_generator(latent_dim=100, window_size=256):
    noise = Input(shape=(latent_dim,))

    x = Dense(64)(noise)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Dense(128)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Dense(window_size)(x)
    x = Reshape((window_size, 1))(x)

    return Model(noise, x)

generator = build_generator()
generator.summary()

In [None]:
from tensorflow.keras.layers import (Input, Conv1D, LeakyReLU, LayerNormalization,
                                    Dropout, GlobalMaxPooling1D, Dense)
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

def build_stable_discriminator(window_size=256):
    ecg_input = Input(shape=(window_size, 1))

    # Feature extraction with spectral normalization
    x = Conv1D(64, 5, strides=2, padding='same')(ecg_input)
    x = LeakyReLU(0.2)(x)
    x = LayerNormalization()(x)  # Better than BatchNorm for GANs
    x = Dropout(0.3)(x)

    x = Conv1D(128, 5, strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    x = LayerNormalization()(x)
    x = Dropout(0.3)(x)

    x = Conv1D(256, 5, strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    x = LayerNormalization()(x)

    # Decision layer
    x = GlobalMaxPooling1D()(x)  # More stable than Flatten()
    x = Dense(1, activation='sigmoid')(x)

    discriminator = Model(ecg_input, x)

    # Use lower learning rate for discriminator
    optimizer = Adam(learning_rate=0.0002, beta_1=0.5)
    discriminator.compile(loss='binary_crossentropy',
                         optimizer=optimizer,
                         metrics=['accuracy'])
    return discriminator

# Usage
discriminator = build_stable_discriminator(window_size=256)
discriminator.summary()

In [None]:
from tensorflow.keras.optimizers import Adam

# Freeze discriminator during generator training
discriminator.trainable = False

# Connect generator -> discriminator
gan_input = Input(shape=(100,))
fake_ecg = generator(gan_input)
gan_output = discriminator(fake_ecg)

# Compile GAN
gan = Model(gan_input, gan_output)
gan.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))

### 5. Train the Model


In [None]:
import numpy as np
from tensorflow.keras.optimizers import Adam

def train_gan(X_train, generator, discriminator, gan,
             epochs, batch_size, window_size=256):
    # Adversarial ground truths
    valid = np.ones((batch_size, 1))
    fake = np.zeros((batch_size, 1))

    for epoch in range(epochs):
        # ---------------------
        #  Train Discriminator
        # ---------------------
        # Select random real ECGs
        idx = np.random.randint(0, X_train.shape[0], batch_size)
        real_ecgs = X_train[idx]

        # Generate fake ECGs
        noise = np.random.normal(0, 1, (batch_size, 100))
        fake_ecgs = generator.predict(noise)

        # Train discriminator
        d_loss_real = discriminator.train_on_batch(real_ecgs, valid)
        d_loss_fake = discriminator.train_on_batch(fake_ecgs, fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # ---------------------
        #  Train Generator
        # ---------------------
        noise = np.random.normal(0, 1, (batch_size, 100))
        g_loss = gan.train_on_batch(noise, valid)

        # Print progress
        if epoch % 1000 == 0:
            print(f"Epoch {epoch} | D Loss: {d_loss[0]:.4f} | D Acc: {100*d_loss[1]:.2f}% | G Loss: {g_loss:.4f}")


# Run training with your existing data
train_gan(
    X_train=X_train,  # Your preprocessed training data (shape: [n_samples, window_size, 1])
    generator=generator,
    discriminator=discriminator,
    gan=gan,
    epochs=1000,
    batch_size=64,     # Adjust based on GPU memory
    window_size=256    # Must match your segment length
)

### 6. Reconstruction and Anomaly Detection

6.1 Compute Reconstruction Error


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from scipy.optimize import minimize

# Force TensorFlow to use GPU
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    try:
        tf.config.experimental.set_memory_growth(physical_devices[0], True)
        print("✅ Using GPU for TensorFlow operations")
    except:
        pass
else:
    print("⚠️ No GPU found — running on CPU")

def ecg_to_latent(real_ecg, generator, iterations=500):
    """Find latent code that best reconstructs the input ECG using GPU for predictions"""
    latent_dim = generator.input_shape[1]
    real_ecg = real_ecg.reshape(1, -1, 1).astype(np.float32)

    # Loss function to minimize
    def loss(z):
        z_tensor = tf.convert_to_tensor(z.reshape(1, latent_dim), dtype=tf.float32)
        generated = generator(z_tensor, training=False)  # GPU inference
        return float(tf.reduce_mean(tf.square(generated - real_ecg)).numpy())

    # Optimize latent vector (CPU)
    result = minimize(loss,
                      x0=np.random.randn(latent_dim),
                      method='L-BFGS-B',
                      options={'maxiter': iterations})
    return result.x

def reconstruct_with_gan(ecg_signal, generator):
    """Generate reconstructed ECG using GPU inference"""
    latent_code = ecg_to_latent(ecg_signal, generator)
    z_tensor = tf.convert_to_tensor(latent_code.reshape(1, -1), dtype=tf.float32)
    reconstructed = generator(z_tensor, training=False)
    return reconstructed.numpy()[0]

# Example usage
original_ecg = X_test[0]  # Your normalized ECG segment
reconstructed = reconstruct_with_gan(original_ecg, generator)
mse = np.mean((original_ecg - reconstructed) ** 2)
print(f"Reconstruction MSE: {mse:.4f}")


6.2 Visualize Reconstruction

In [None]:
import matplotlib.pyplot as plt

def plot_reconstruction(original, reconstructed, title=""):
    plt.figure(figsize=(10,4))
    plt.plot(original.flatten(), label='Original', linewidth=2)
    plt.plot(reconstructed.flatten(), label='Reconstructed', linestyle='--')
    plt.title(f"{title} (MSE: {np.mean((original-reconstructed)**2):.4f}")
    plt.legend()
    plt.xlabel('Samples')
    plt.ylabel('Amplitude (normalized)')
    plt.show()

# Visualize normal vs abnormal reconstructions
normal_ecg = X_test[0]  # Replace with your normal ECG
abnormal_ecg = X_test[np.argmax(discriminator.predict(X_test))]  # Most anomalous

for ecg, label in [(normal_ecg, "Normal ECG"),
                   (abnormal_ecg, "Abnormal ECG")]:
    reconstructed = reconstruct_with_gan(ecg, generator)
    plot_reconstruction(ecg, reconstructed, label)

6.3 Detect Anomalies

In [None]:
def gan_anomaly_detector(ecg_signal, generator, discriminator, threshold=0.3):
    """Combined anomaly score using reconstruction error and discriminator"""
    # Reconstruction error
    reconstructed = reconstruct_with_gan(ecg_signal, generator)
    mse = np.mean((ecg_signal - reconstructed)**2)

    # Discriminator confidence
    realness = discriminator.predict(ecg_signal.reshape(1, -1, 1))[0][0]

    # Combined score (weighted)
    anomaly_score = 0.7*mse + 0.3*(1-realness)  # Adjust weights as needed

    if anomaly_score > threshold:
        print(f"🚨 Anomaly Detected (Score: {anomaly_score:.3f})")
        print(f" - Reconstruction MSE: {mse:.4f}")
        print(f" - Discriminator 'real' confidence: {realness*100:.1f}%")
    else:
        print(f"✅ Normal ECG (Score: {anomaly_score:.3f})")

    return anomaly_score

# Example usage
abnormal_ecg, _ = load_ecg_record('124')
abnormal_segment = abnormal_ecg[1000:1000+256]  # Window of 256 samples
abnormal_segment = scaler.transform(abnormal_segment.reshape(1, -1)).reshape(1, 256, 1)
gan_anomaly_detector(abnormal_segment, generator, discriminator)

### 7. Save & Export

In [None]:
generator.save('generator.h5')  # For later use

### Confusion matrix

In [None]:
import numpy as np
import wfdb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score, accuracy_score
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
import tensorflow as tf

# ------------------------------
# 1. Load ECG Record + Annotation
# ------------------------------
def load_ecg_record(record_name):
    signal = wfdb.rdrecord(f"mitdb/{record_name}").p_signal[:, 0]  # Lead II (MLII)
    annotation = wfdb.rdann(f"mitdb/{record_name}", 'atr')
    return signal, annotation

# ------------------------------
# 2. Segment ECG & Create Labels
# ------------------------------
def get_labels_from_annotations(signal, ann, window_size=256):
    normal_beats = ['N']  # only 'N' considered normal
    labels = []
    segments = []

    for start in range(0, len(signal) - window_size, window_size):
        seg = signal[start:start+window_size]
        beat_indices = [i for i, s in enumerate(ann.sample) if start <= s < start+window_size]
        beat_symbols = [ann.symbol[i] for i in beat_indices]

        if any(sym not in normal_beats for sym in beat_symbols):
            label = 1   # anomaly
        else:
            label = 0   # normal

        segments.append(seg)
        labels.append(label)

    return np.array(segments), np.array(labels)

# ------------------------------
# 3. Load & Process Data
# ------------------------------
ecg, ann = load_ecg_record('100')  # Change record ID as needed
segments, labels = get_labels_from_annotations(ecg, ann, window_size=256)

print(f"Segments shape: {segments.shape}")
print(f"Labels distribution  [normal, anomaly]: {np.bincount(labels)}")
print(f"Total Segments: {len(segments)}")

# ------------------------------
# Train only on Normal ECGs
# ------------------------------
normal_indices = np.where(labels == 0)[0]
anomaly_indices = np.where(labels == 1)[0]

segments_normal = segments[normal_indices]
labels_normal = labels[normal_indices]

# Normalize (fit only on normal data)
scaler = StandardScaler()
segments_normalized = scaler.fit_transform(segments_normal)

# Train/test split for NORMAL data only
X_train, X_test_normal, y_train, y_test_normal = train_test_split(
    segments_normalized, labels_normal, test_size=0.1, random_state=42
)

# Keep anomalies only for testing
X_test_anomaly = segments[anomaly_indices]
y_test_anomaly = labels[anomaly_indices]

# Normalize anomalies using the same scaler
X_test_anomaly = scaler.transform(X_test_anomaly)

# Combine normal test + anomaly test
X_test = np.vstack([X_test_normal, X_test_anomaly])
y_test = np.hstack([y_test_normal, y_test_anomaly])

# Reshape for Conv1D
window_size = 256
X_train = X_train.reshape(-1, window_size, 1)
X_test = X_test.reshape(-1, window_size, 1)

# Info
print(f"\nTraining set contains ONLY normal ECGs:")
print(f"   X_train: {len(X_train)}, y_train distribution: {np.bincount(y_train)}")

print(f"\nTesting set contains normal + anomalies:")
print(f"   X_test: {len(X_test)}, y_test distribution: {np.bincount(y_test)}")

# ------------------------------
# 4. Load Generator
# ------------------------------
generator = load_model("generator.h5", compile=False)

# ------------------------------
# 5. GPU-Optimized Reconstruction Function
# ------------------------------
def reconstruct_with_gan_gpu(ecg_signal, generator, steps=50, lr=0.05):
    latent_dim = generator.input_shape[1]
    target = tf.convert_to_tensor(ecg_signal.reshape(1, -1, 1), dtype=tf.float32)

    z = tf.Variable(tf.random.normal([1, latent_dim], dtype=tf.float32))
    optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

    for _ in range(steps):
        with tf.GradientTape() as tape:
            generated = generator(z, training=False)
            loss = tf.reduce_mean(tf.square(generated - target))
        grads = tape.gradient(loss, [z])
        optimizer.apply_gradients(zip(grads, [z]))

    return generator(z, training=False)[0].numpy()

# ------------------------------
# 6. Compute Reconstruction Errors
# ------------------------------
errors = []
for i in range(len(X_test)):
    reconstructed = reconstruct_with_gan_gpu(X_test[i], generator, steps=50)
    mse = np.mean((X_test[i] - reconstructed)**2)
    errors.append(mse)
errors = np.array(errors)

# ------------------------------
# 7. Evaluation (Confusion Matrix & Metrics)
# ------------------------------
threshold = np.percentile(errors, 70)  # select threshold
y_pred = (errors > threshold).astype(int)

cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(cm, display_labels=["Normal", "Anomaly"])
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

# Metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, zero_division=0)
recall = recall_score(y_test, y_pred, zero_division=0)
f1 = f1_score(y_test, y_pred, zero_division=0)


print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")


#### confusion matrix for high threshold(90%)

In [None]:

# ------------------------------
# 90th Percentile Evaluation
# ------------------------------

import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score, precision_score, recall_score, f1_score

print(f"Segments shape: {segments.shape}")
print(f"Labels distribution  [normal, anomaly]: {np.bincount(labels)}")
print(f"Total Segments: {len(segments)}")

print(f"\nTraining set contains ONLY normal ECGs:")
print(f"   X_train: {len(X_train)}, y_train distribution: {np.bincount(y_train)}")

print(f"\nTesting set contains normal + anomalies:")
print(f"   X_test: {len(X_test)}, y_test distribution: {np.bincount(y_test)}")


# Calculate 90th percentile threshold
threshold = np.percentile(errors, 90)  # 90th percentile
y_pred = (errors > threshold).astype(int)

# Confusion Matrix
fig, ax = plt.subplots(figsize=(5,5))  # smaller figure prevents hanging
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(cm, display_labels=["Normal", "Anomaly"])
disp.plot(cmap=plt.cm.Blues, ax=ax)
plt.title("Confusion Matrix")
plt.show()

# Metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred, zero_division=0)
recall = recall_score(y_test, y_pred, zero_division=0)
f1 = f1_score(y_test, y_pred, zero_division=0)

print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")
