<a href="https://colab.research.google.com/github/Nakib-Nasrullah/Python/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# cell 1
# Install + import
!pip install wfdb numpy pandas scipy matplotlib

import wfdb
import numpy as np
import pandas as pd
import os

# Download dataset
DATA_DIR = "mitdb"
if not os.path.exists(DATA_DIR):
    wfdb.dl_database('mitdb', dl_dir=DATA_DIR)

# Parameters
WINDOW = 187
HALF = WINDOW // 2

label_map = {
    'N': 0, 'L': 0, 'R': 0, 'e': 0, 'j': 0,
    'A': 1, 'a': 1, 'J': 1, 'S': 1,
    'V': 2, 'E': 2,
    'F': 3
}

beats = []

records = sorted([f.split('.')[0] for f in os.listdir(DATA_DIR) if f.endswith('.dat')])

for record in records:
    try:
        signal, _ = wfdb.rdsamp(os.path.join(DATA_DIR, record))
        ann = wfdb.rdann(os.path.join(DATA_DIR, record), 'atr')
    except:
        continue

    ecg = signal[:, 0]
    for r, sym in zip(ann.sample, ann.symbol):
        if sym not in label_map:
            continue
        if r - HALF < 0 or r + HALF >= len(ecg):
            continue
        beat = ecg[r-HALF:r+HALF+1]
        if len(beat) == WINDOW:
            beats.append([record] + beat.tolist() + [label_map[sym]])

columns = ["record_id"] + [f"f{i}" for i in range(WINDOW)] + ["label"]
df = pd.DataFrame(beats, columns=columns)
df.to_csv("mitbih_patient_level.csv", index=False)

print("mitbih_patient_level.csv created")



Collecting wfdb
  Downloading wfdb-4.3.0-py3-none-any.whl.metadata (3.8 kB)
Collecting pandas
  Downloading pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
Downloading wfdb-4.3.0-py3-none-any.whl (163 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (12.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m92.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pandas, wfdb
  Attempting uninstall: pandas
    Found existing installation: pandas 2.2.2
    Uninstalling pandas-2.2.2:
      Successfully uninstalled pandas-2.2.2
[31mERROR: pip's dependency resolver does not currently take into account all the packages

In [4]:
# cell 2
from sklearn.model_selection import train_test_split
import pandas as pd

df = pd.read_csv("mitbih_patient_level.csv")

patients = df['record_id'].unique()

train_patients, test_patients = train_test_split(
    patients, test_size=0.30, random_state=42
)

train_df = df[df['record_id'].isin(train_patients)]
test_df  = df[df['record_id'].isin(test_patients)]

assert set(train_df['record_id']).isdisjoint(set(test_df['record_id']))

train_df.to_csv("mitbih_train_patient.csv", index=False)
test_df.to_csv("mitbih_test_patient.csv", index=False)

print(" Patient-independent split saved")

#cell 3
import numpy as np
import pandas as pd

train_df = pd.read_csv("mitbih_train_patient.csv")
test_df  = pd.read_csv("mitbih_test_patient.csv")

print(train_df.shape, test_df.shape)

 Patient-independent split saved
(69432, 189) (31994, 189)


In [5]:
# Map to AAMI 3-class
def aami_3class(label):
    if label == 0:
        return 0         # N
    elif label == 1:
        return 1         # SVEB
    else:
        return 2         # VEB + Fusion

train_df['label'] = train_df['label'].apply(aami_3class)
test_df['label']  = test_df['label'].apply(aami_3class)

print("Train label distribution:\n", train_df['label'].value_counts())
print("Test label distribution:\n", test_df['label'].value_counts())

Train label distribution:
 label
0    61392
2     5704
1     2336
Name: count, dtype: int64
Test label distribution:
 label
0    29216
2     2333
1      445
Name: count, dtype: int64


In [6]:
# cell 4
X_train = train_df.iloc[:, 1:-1].values
y_train = train_df['label'].values.astype(int)

X_test = test_df.iloc[:, 1:-1].values
y_test = test_df['label'].values.astype(int)

# Per-beat normalization
X_train = (X_train - X_train.mean(axis=1, keepdims=True)) / (X_train.std(axis=1, keepdims=True) + 1e-8)
X_test  = (X_test  - X_test.mean(axis=1, keepdims=True))  / (X_test.std(axis=1, keepdims=True)  + 1e-8)

X_train = X_train.reshape(-1, 187, 1)
X_test  = X_test.reshape(-1, 187, 1)

print("CNN input:", X_train.shape)

CNN input: (69432, 187, 1)


In [7]:
# cell 5
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Dense, Dropout, BatchNormalization, Flatten
from tensorflow.keras.optimizers import Adam

model = Sequential([
    Conv1D(32, 7, activation='relu', padding='same', input_shape=(187,1)),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(64, 5, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(128, 3, activation='relu', padding='same'),
    BatchNormalization(),
    MaxPooling1D(2),

    Conv1D(256, 3, activation='relu', padding='same'),   # NEW
    BatchNormalization(),
    MaxPooling1D(2),

    Flatten(),
    Dense(256, activation='relu'),   # Bigger FC
    Dropout(0.4),

    Dense(4, activation='softmax')
])

model.compile(
    optimizer=Adam(learning_rate=0.0003),   # Lower LR
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [8]:
# cell 6
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)

class_weight_dict = {i:w for i,w in zip(np.unique(y_train), class_weights)}
print("Class weights:", class_weight_dict)

Class weights: {np.int64(0): np.float64(0.37698722960646336), np.int64(1): np.float64(9.907534246575343), np.int64(2): np.float64(4.0575035063113605)}


In [None]:
# cell 7


from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-5,
    verbose=1
)

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=6,
    restore_best_weights=True,
    verbose=1
)

history = model.fit(
    X_train, y_train,
    epochs=40,                 # allow learning
    batch_size=32,             # smaller batch = better generalization
    validation_split=0.1,
    class_weight=class_weight_dict,   # VERY IMPORTANT
    callbacks=[lr_scheduler, early_stop],
    verbose=1
)

Epoch 1/40
[1m1953/1953[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m112s[0m 55ms/step - accuracy: 0.8576 - loss: 0.4155 - val_accuracy: 0.9748 - val_loss: 0.0989 - learning_rate: 3.0000e-04
Epoch 2/40
[1m1458/1953[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m27s[0m 55ms/step - accuracy: 0.9419 - loss: 0.1517

In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(" Patient-independent Test Accuracy:", test_acc)