<a href="https://colab.research.google.com/github/MoyoMbongeni/ML/blob/main/Untitled6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Help from Chat-GPT

In [1]:
# =====================================================
# ECG Noise Detection & Severity Ranking
# SVM (feature-based) vs CNN (raw-signal, multi-label)
# =====================================================

import numpy as np
import pywt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, GlobalAveragePooling1D, Dense
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.svm import SVC
from sklearn.multiclass import OneVsRestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, hamming_loss


In [2]:
np.random.seed(42)

def simulate_ecg(noise_type, length=1800):
    t = np.linspace(0, 5, length)
    base = np.sin(2*np.pi*1.2*t)

    if noise_type == 'BW':
        base += 0.6 * np.sin(2*np.pi*0.2*t)
    if noise_type == 'MA':
        base += 0.3 * np.random.randn(length)
    if noise_type == 'EM':
        base += 0.5 * np.sign(np.sin(2*np.pi*0.5*t))

    return base

labels = ['BW', 'MA', 'EM']
X, Y = [], []

for _ in range(300):
    sig = np.zeros(1800)
    y = [0, 0, 0]

    for i, n in enumerate(labels):
        if np.random.rand() > 0.5:
            sig += simulate_ecg(n)
            y[i] = 1

    sig = (sig - sig.min()) / (sig.max() - sig.min() + 1e-8)
    X.append(sig)
    Y.append(y)

X = np.array(X)
Y = np.array(Y)


In [3]:
def dwt_features(sig):
    coeffs = pywt.wavedec(sig, 'db4', level=3)
    feats = []
    for c in coeffs:
        feats.append(np.sum(c**2))
        feats.append(np.std(c))
    return feats  # 8 features

def terma_features(sig, fs=360):
    sq = sig**2
    short = int(0.05*fs)
    long = int(0.2*fs)
    ma_s = np.convolve(sq, np.ones(short)/short, 'same')
    ma_l = np.convolve(sq, np.ones(long)/long, 'same')
    terma = ma_s - ma_l
    return [np.mean(terma), np.std(terma), np.sum(terma > np.mean(terma))]

X_svm = np.array([dwt_features(x) + terma_features(x) for x in X])


In [4]:
X_cnn = X.reshape(-1, 1800, 1)


In [4]:
import numpy as np
import pywt
from sklearn.model_selection import train_test_split

np.random.seed(42)

def simulate_ecg(noise_type, length=1800):
    t = np.linspace(0, 5, length)
    base = np.sin(2*np.pi*1.2*t)

    if noise_type == 'BW':
        base += 0.6 * np.sin(2*np.pi*0.2*t)
    if noise_type == 'MA':
        base += 0.3 * np.random.randn(length)
    if noise_type == 'EM':
        base += 0.5 * np.sign(np.sin(2*np.pi*0.5*t))

    return base

labels = ['BW', 'MA', 'EM']
X, Y = [], []

for _ in range(300):
    sig = np.zeros(1800)
    y = [0, 0, 0]

    for i, n in enumerate(labels):
        if np.random.rand() > 0.5:
            sig += simulate_ecg(n)
            y[i] = 1

    sig = (sig - sig.min()) / (sig.max() - sig.min() + 1e-8)
    X.append(sig)
    Y.append(y)

X = np.array(X)
Y = np.array(Y)

def dwt_features(sig):
    coeffs = pywt.wavedec(sig, 'db4', level=3)
    feats = []
    for c in coeffs:
        feats.append(np.sum(c**2))
        feats.append(np.std(c))
    return feats

def terma_features(sig, fs=360):
    sq = sig**2
    short = int(0.05*fs)
    long = int(0.2*fs)
    ma_s = np.convolve(sq, np.ones(short)/short, 'same')
    ma_l = np.convolve(sq, np.ones(long)/long, 'same')
    terma = ma_s - ma_l
    return [np.mean(terma), np.std(terma), np.sum(terma > np.mean(terma))]

X_svm = np.array([dwt_features(x) + terma_features(x) for x in X])

X_cnn = X.reshape(-1, 1800, 1)

X_train_cnn, X_temp_cnn, Y_train, Y_temp = train_test_split(
    X_cnn, Y, test_size=0.30, random_state=42)

X_val_cnn, X_test_cnn, Y_val, Y_test = train_test_split(
    X_temp_cnn, Y_temp, test_size=0.50, random_state=42)

X_train_svm, X_temp_svm, _, _ = train_test_split(
    X_svm, Y, test_size=0.30, random_state=42)

X_val_svm, X_test_svm, _, _ = train_test_split(
    X_temp_svm, Y_temp, test_size=0.50, random_state=42)

In [7]:
from sklearn.multiclass import OneVsRestClassifier
from sklearn.svm import SVC

svm = OneVsRestClassifier(
    SVC(kernel='rbf', C=1.0, gamma='scale')
)
svm.fit(X_train_svm, Y_train)

In [9]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, GlobalAveragePooling1D, Dense
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf

cnn = Sequential([
    Conv1D(32, 7, activation='relu', input_shape=(1800,1)),
    Conv1D(64, 5, activation='relu'),
    Conv1D(64, 3, activation='relu'),
    GlobalAveragePooling1D(),
    Dense(32, activation='relu'),
    Dense(3, activation='sigmoid')
])

cnn.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss='binary_crossentropy'
)

cnn.fit(
    X_train_cnn, Y_train,
    validation_data=(X_val_cnn, Y_val),
    epochs=50,
    batch_size=32,
    callbacks=[EarlyStopping(patience=5, restore_best_weights=True)],
    verbose=1
)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 445ms/step - loss: 0.6906 - val_loss: 0.6851
Epoch 2/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - loss: 0.6860 - val_loss: 0.6841
Epoch 3/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - loss: 0.6810 - val_loss: 0.6854
Epoch 4/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - loss: 0.6809 - val_loss: 0.6821
Epoch 5/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - loss: 0.6777 - val_loss: 0.6767
Epoch 6/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - loss: 0.6801 - val_loss: 0.6757
Epoch 7/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - loss: 0.6690 - val_loss: 0.6683
Epoch 8/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - loss: 0.6568 - val_loss: 0.6509
Epoch 9/50
[1m7/7[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

<keras.src.callbacks.history.History at 0x7e048333f320>

In [11]:
def analyze_ecg_segment(signal, model):
    signal = (signal - signal.min()) / (signal.max() - signal.min() + 1e-8)
    pred = model.predict(signal.reshape(1,1800,1))[0]

    detected = {
        'baseline_wander': bool(pred[0] > 0.5),
        'muscle_artifact': bool(pred[1] > 0.5),
        'electrode_motion': bool(pred[2] > 0.5)
    }

    severity = {
        k: round(v*100,1)
        for k,v in zip(detected.keys(), pred)
        if v > 0.5
    }

    dominant = max(severity, key=severity.get) if severity else 'clean'
    quality = 'good' if sum(pred) < 0.5 else 'moderate' if sum(pred) < 1.5 else 'poor'

    return {
        'detected_noises': detected,
        'severity_scores': severity,
        'dominant_noise': dominant,
        'overall_quality': quality
    }
