In [4]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.regularizers import l2
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
from imblearn.under_sampling import RandomUnderSampler

# Load data
df_normal = pd.read_csv("data/ptbdb_normal.csv", header=None)
df_abnormal = pd.read_csv("data/ptbdb_abnormal.csv", header=None)

# Merge and shuffle data
df = pd.concat([df_normal, df_abnormal])
df = shuffle(df, random_state=42)

# Separate signals and labels
X = df.iloc[:, :-1].values
y = df.iloc[:, -1].values

# Check class distribution
print(f"Original class distribution: Normal = {sum(y==0)}, Abnormal = {sum(y==1)}")

# Standardize the data (per channel)
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Reshape signals to (samples, signal_length, channels)
X = X.reshape((X.shape[0], X.shape[1], 1))

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Handle class imbalance - Using combination of under-sampling and SMOTE
print("\nHandling class imbalance...")
rus = RandomUnderSampler(random_state=42)
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_train_res, y_train_res = rus.fit_resample(X_train_flat, y_train)
X_train_res = X_train_res.reshape(-1, X.shape[1], 1)

# Now apply SMOTE to the under-sampled data
smote = SMOTE(random_state=42)
X_train_flat = X_train_res.reshape(X_train_res.shape[0], -1)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train_flat, y_train_res)
X_train_resampled = X_train_resampled.reshape(-1, X.shape[1], 1)

print(f"Resampled class distribution: Normal = {sum(y_train_resampled==0)}, Abnormal = {sum(y_train_resampled==1)}")

# Custom F1 metric for monitoring
class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1_score', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()
        self.recall = tf.keras.metrics.Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred = tf.round(y_pred)
        self.precision.update_state(y_true, y_pred, sample_weight)
        self.recall.update_state(y_true, y_pred, sample_weight)

    def result(self):
        p = self.precision.result()
        r = self.recall.result()
        return 2 * ((p * r) / (p + r + tf.keras.backend.epsilon()))

    def reset_states(self):
        self.precision.reset_states()
        self.recall.reset_states()

# Build improved 1D CNN model with better architecture
def create_model(input_shape):
    model = Sequential([
        Conv1D(filters=32, kernel_size=25, activation='relu', input_shape=input_shape,
               kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        MaxPooling1D(pool_size=3),
        Dropout(0.3),
        
        Conv1D(filters=64, kernel_size=15, activation='relu', 
               kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        MaxPooling1D(pool_size=2),
        Dropout(0.3),
        
        Conv1D(filters=128, kernel_size=7, activation='relu',
               kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        MaxPooling1D(pool_size=2),
        Dropout(0.4),
        
        Conv1D(filters=256, kernel_size=3, activation='relu',
               kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        GlobalAveragePooling1D(),
        Dropout(0.5),
        
        Dense(128, activation='relu', kernel_regularizer=l2(0.001)),
        BatchNormalization(),
        Dropout(0.5),
        
        Dense(1, activation='sigmoid')
    ])
    return model

model = create_model((X.shape[1], 1))

# Compile the model with adjusted class weights
# More weight to normal class since it's the minority in original data
class_weight = {0: 3., 1: 1.}

optimizer = Adam(learning_rate=0.0005)
model.compile(optimizer=optimizer, 
              loss='binary_crossentropy', 
              metrics=['accuracy', 
                       tf.keras.metrics.Precision(name='precision'),
                       tf.keras.metrics.Recall(name='recall'),
                       tf.keras.metrics.AUC(name='auc'),
                       F1Score()])

# Enhanced callbacks
checkpoint = ModelCheckpoint("best_model.h5", 
                           monitor='val_f1_score', 
                           mode='max', 
                           save_best_only=True, 
                           save_weights_only=False,
                           verbose=1)

reduce_lr = ReduceLROnPlateau(monitor='val_loss', 
                            factor=0.5, 
                            patience=5, 
                            min_lr=1e-6, 
                            verbose=1)

early_stopping = EarlyStopping(monitor='val_f1_score', 
                             mode='max',
                             patience=20, 
                             restore_best_weights=True,
                             verbose=1)

# Train the model
print("\nTraining model...")
history = model.fit(X_train_resampled, y_train_resampled,
                  epochs=100,
                  batch_size=64,
                  validation_split=0.15,
                  callbacks=[checkpoint, reduce_lr, early_stopping],
                  class_weight=class_weight,
                  verbose=1)

# Load best model using the exact same architecture
model = load_model("best_model.h5", custom_objects={'F1Score': F1Score})

# Evaluation
print("\nEvaluating model...")
y_pred = (model.predict(X_test) > 0.5).astype("int32")
y_probs = model.predict(X_test)

# Metrics
accuracy = accuracy_score(y_test, y_pred)
auc_score = roc_auc_score(y_test, y_probs)
f1 = f1_score(y_test, y_pred)

print(f"\n✅ Test Accuracy: {accuracy * 100:.2f}%")
print(f"🎯 AUC Score: {auc_score * 100:.2f}%")
print(f"🔍 F1 Score: {f1 * 100:.2f}%")
print("\n📊 Classification Report:")
print(classification_report(y_test, y_pred, target_names=["normal", "abnormal"], zero_division=0))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", 
           xticklabels=["normal", "abnormal"], 
           yticklabels=["normal", "abnormal"])
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.tight_layout()
plt.savefig("confusion_matrix.png")
plt.show()

# ROC Curve
fpr, tpr, thresholds = roc_curve(y_test, y_probs)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'AUC = {auc_score:.2f}')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc='lower right')
plt.savefig("roc_curve.png")
plt.show()

# Plot training history
plt.figure(figsize=(15, 6))

# Accuracy plot
plt.subplot(1, 3, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()

# Loss plot
plt.subplot(1, 3, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend()

# F1 Score plot
plt.subplot(1, 3, 3)
plt.plot(history.history['f1_score'], label='Train F1')
plt.plot(history.history['val_f1_score'], label='Validation F1')
plt.title('F1 Score')
plt.ylabel('F1 Score')
plt.xlabel('Epoch')
plt.legend()

plt.tight_layout()
plt.savefig("training_history.png")
plt.show()

# Save predictions for further analysis
results = pd.DataFrame({
    'true_label': y_test,
    'predicted_label': y_pred.flatten(),
    'prediction_prob': y_probs.flatten()
})
results.to_csv("predictions.csv", index=False)

Original class distribution: Normal = 4046, Abnormal = 10506

Handling class imbalance...
Resampled class distribution: Normal = 3237, Abnormal = 3237


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)



Training model...
Epoch 1/100
[1m86/86[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step - accuracy: 0.5948 - auc: 0.6290 - f1_score: 0.5204 - loss: 2.1986 - precision: 0.5124 - recall: 0.5305
Epoch 1: val_f1_score improved from -inf to 0.07525, saving model to best_model.h5




[1m86/86[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 70ms/step - accuracy: 0.5955 - auc: 0.6297 - f1_score: 0.5209 - loss: 2.1953 - precision: 0.5133 - recall: 0.5307 - val_accuracy: 0.0391 - val_auc: 0.0000e+00 - val_f1_score: 0.0752 - val_loss: 1.7052 - val_precision: 1.0000 - val_recall: 0.0391 - learning_rate: 5.0000e-04
Epoch 2/100
[1m53/86[0m [32m━━━━━━━━━━━━[0m[37m━━━━━━━━[0m [1m1s[0m 39ms/step - accuracy: 0.6920 - auc: 0.7198 - f1_score: 0.5629 - loss: 1.6816 - precision: 0.6750 - recall: 0.4834

KeyboardInterrupt: 