In [1]:
from tkinter import Tk, Label
from PIL import Image, ImageTk
import wfdb
import numpy as np
from scipy.signal import butter, filtfilt

# Paths to the ECG data for the subjects
subjects = {
    's2': r'subjects\p_156\s0299lre',
    's3': r'subjects\p_165\s0322lre',
    's5': r'subjects\p_180\s0374lre',
    's7': r'subjects\p_184\s0363lre',
    's8': r'subjects\p_166\s0275lre',
    's10': r'subjects\sub_150\s0287lre',
    's13': r'subjects\sub260\s0496_re'
}

def load_ecg(subject_path):
    record = wfdb.rdrecord(subject_path)
    ecg_signal = record.p_signal[:, 0]  # Assuming the first channel is used
    return ecg_signal

In [2]:

def check_channels(ecg_signal):
    if ecg_signal.ndim == 1:
        return "Single-channel"
    elif ecg_signal.ndim == 2:
        num_channels = ecg_signal.shape[1]
        return f"Multi-channel with {num_channels} channels"
    else:
        return "Unknown format"

# Load ECG data and check the number of channels for each subject
for subject, path in subjects.items():
    ecg_signal = load_ecg(path)
    channel_info = check_channels(ecg_signal)
    print(f"Subject {subject}: {channel_info}")


Subject s2: Single-channel
Subject s3: Single-channel
Subject s5: Single-channel
Subject s7: Single-channel
Subject s8: Single-channel
Subject s10: Single-channel
Subject s13: Single-channel


In [3]:
from biosppy.signals import ecg

def preprocess_with_drift_correction(ecg_signal):
    # Apply baseline drift correction
    corrected_ecg = ecg.ecg(signal=ecg_signal, sampling_rate=1000, show=False)
    corrected_signal = corrected_ecg['filtered']
    
    # Apply bandpass filtering
    nyquist = 0.5 * 1000
    low = 0.5 / nyquist
    high = 45 / nyquist
    b, a = butter(1, [low, high], btype='band')
    filtered_ecg = filtfilt(b, a, corrected_signal)
    
    return filtered_ecg

ecg_data = {subject: preprocess_with_drift_correction(load_ecg(path)) for subject, path in subjects.items()}


In [4]:
from scipy.signal import find_peaks

def segment_ecg(ecg_signal, segment_length=600, threshold=None):
    # Find R-peaks
    peaks, _ = find_peaks(ecg_signal, distance=200, height=threshold)
    segments = []
    for peak in peaks:
        start = max(0, peak - 200)
        end = start + segment_length
        if end <= len(ecg_signal):
            segments.append(ecg_signal[start:end])
    return segments

segmented_data = {subject: segment_ecg(ecg, threshold=0.2) for subject, ecg in ecg_data.items()}


In [5]:
segment_counts = {subject: len(segments) for subject, segments in segmented_data.items()}

for subject, count in segment_counts.items():
    print(f"Subject {subject} has {count} segments.")

Subject s2 has 56 segments.
Subject s3 has 224 segments.
Subject s5 has 130 segments.
Subject s7 has 165 segments.
Subject s8 has 230 segments.
Subject s10 has 255 segments.
Subject s13 has 118 segments.


In [6]:
import matplotlib.pyplot as plt

subject = 's2'
raw_ecg = load_ecg(subjects[subject])
preprocessed_ecg = preprocess_with_drift_correction(raw_ecg)



In [7]:
from scipy.fftpack import dct
import pywt

def extract_features(segment, method='dct'):
    if method == 'dct':
        return dct(segment, norm='ortho')
    elif method == 'wavelet':
        coeffs, _ = pywt.dwt(segment, 'db1')
        return coeffs

# Extract features for all segments
features_dct = {subject: [extract_features(seg, method='dct') for seg in segments] for subject, segments in segmented_data.items()}
features_wavelet = {subject: [extract_features(seg, method='wavelet') for seg in segments] for subject, segments in segmented_data.items()}


In [8]:
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split

# Prepare data for classification
def prepare_data(features):
    X, y = [], []
    for subject, seg_features in features.items():
        X.extend(seg_features)
        y.extend([subject] * len(seg_features))
    return np.array(X), np.array(y)

X_dct, y_dct = prepare_data(features_dct)
X_wavelet, y_wavelet = prepare_data(features_wavelet)

# Split data into training and testing sets
X_train_dct, X_test_dct, y_train_dct, y_test_dct = train_test_split(X_dct, y_dct, test_size=0.2)
X_train_wavelet, X_test_wavelet, y_train_wavelet, y_test_wavelet = train_test_split(X_wavelet, y_wavelet, test_size=0.2)


In [21]:
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from collections import Counter
import joblib
import os

# Assuming X_train_dct, X_test_dct, y_train_dct, y_test_dct, X_train_wavelet, X_test_wavelet, y_train_wavelet, y_test_wavelet are already defined

# Define paths to save the models
model_dir = 'models'
os.makedirs(model_dir, exist_ok=True)
svm_dct_path = os.path.join(model_dir, 'svm_dct_model.pkl')
svm_wavelet_path = os.path.join(model_dir, 'svm_wavelet_model.pkl')
rf_dct_path = os.path.join(model_dir, 'rf_dct_model.pkl')
rf_wavelet_path = os.path.join(model_dir, 'rf_wavelet_model.pkl')

# Train SVM
svm_dct = SVC(kernel='rbf', C=1, probability=True)
svm_wavelet = SVC(kernel='rbf', C=1, probability=True)
svm_dct.fit(X_train_dct, y_train_dct)
svm_wavelet.fit(X_train_wavelet, y_train_wavelet)

# Train Random Forest
rf_dct = RandomForestClassifier(n_estimators=100)
rf_wavelet = RandomForestClassifier(n_estimators=100)
rf_dct.fit(X_train_dct, y_train_dct)
rf_wavelet.fit(X_train_wavelet, y_train_wavelet)

# Save the models
joblib.dump(svm_dct, svm_dct_path)
joblib.dump(svm_wavelet, svm_wavelet_path)
joblib.dump(rf_dct, rf_dct_path)
joblib.dump(rf_wavelet, rf_wavelet_path)

# Evaluate classifiers
svm_dct_accuracy = svm_dct.score(X_test_dct, y_test_dct)
svm_wavelet_accuracy = svm_wavelet.score(X_test_wavelet, y_test_wavelet)
rf_dct_accuracy = rf_dct.score(X_test_dct, y_test_dct)
rf_wavelet_accuracy = rf_wavelet.score(X_test_wavelet, y_test_wavelet)

print(f"SVM DCT Accuracy: {svm_dct_accuracy}")
print(f"SVM Wavelet Accuracy: {svm_wavelet_accuracy}")
print(f"Random Forest DCT Accuracy: {rf_dct_accuracy}")
print(f"Random Forest Wavelet Accuracy: {rf_wavelet_accuracy}")


SVM DCT Accuracy: 1.0
SVM Wavelet Accuracy: 0.9957627118644068
Random Forest DCT Accuracy: 0.9957627118644068
Random Forest Wavelet Accuracy: 0.9957627118644068


In [22]:
# Load models for testing
svm_dct = joblib.load(svm_dct_path)
svm_wavelet = joblib.load(svm_wavelet_path)
rf_dct = joblib.load(rf_dct_path)
rf_wavelet = joblib.load(rf_wavelet_path)

def identify_subject(ecg_segment, method='dct', threshold=0.5):
    features = extract_features(ecg_segment, method=method)
    if method == 'dct':
        svm_pred = svm_dct.predict([features])
        rf_pred = rf_dct.predict([features])
        svm_confidence = svm_dct.predict_proba([features])[0]
        rf_confidence = rf_dct.predict_proba([features])[0]
    else:
        svm_pred = svm_wavelet.predict([features])
        rf_pred = rf_wavelet.predict([features])
        svm_confidence = svm_wavelet.predict_proba([features])[0]
        rf_confidence = rf_wavelet.predict_proba([features])[0]

    # Get the class with the highest confidence for each model
    svm_max_confidence = max(svm_confidence)
    rf_max_confidence = max(rf_confidence)

    if svm_max_confidence >= rf_max_confidence:
        final_pred = svm_pred[0]
        final_confidence = svm_max_confidence
    else:
        final_pred = rf_pred[0]
        final_confidence = rf_max_confidence

    if final_confidence > threshold:
        return final_pred, final_confidence
    else:
        return 'unidentified', final_confidence



Identified Subject: s13 with confidence 0.9625164239330287


In [26]:
# Example usage
ecg_segment = segmented_data['s13'][117]  # Use a segment from subject s13 as an example
identified_subject, confidence = identify_subject(ecg_segment, method='dct', threshold=0.66)
print(f"Identified Subject: {identified_subject} with confidence {confidence}")

Identified Subject: s13 with confidence 0.99


In [12]:
subjects = {
    's2': r'subjects\p_156\s0299lre',
    's3': r'subjects\p_165\s0322lre',
    's5': r'subjects\p_180\s0374lre',
    's7': r'subjects\p_184\s0363lre',
    's8': r'subjects\p_185\s0336lre',
    's10': r'subjects\sub_150\s0287lre',
    's13': r'subjects\sub260\s0496_re'
}

In [27]:

subjects_test = {
    's1': r'subjects\p_155\s0301lre',
    's2': r'subjects\sub_170\s0274lre',
}
ecg_data_test = {subject: preprocess_with_drift_correction(load_ecg(path)) for subject, path in subjects_test.items()}
segmented_data_test = {subject: segment_ecg(ecg, threshold=0.2) for subject, ecg in ecg_data_test.items()}
ecg_segment_test = segmented_data_test['s1'][0]


identified_subject = identify_subject(ecg_segment_test, method='dct', threshold=0.6)
print(f"Identified Subject: {identified_subject}")

Identified Subject: ('unidentified', 0.47)


In [28]:
from tkinter import Tk, Label, Button
from PIL import Image, ImageTk

# Paths to subject photos
photos = {
    's2': r'imgs/photo1.jpg',
    's3': r'imgs/photo2.jpg',
    's5': r'imgs/photo3.jpg',
    's7': r'imgs/photo4.jpg',
    's8': r'imgs/photo5.jpg',
    's9': r'imgs/photo6.jpg',
    's10': r'imgs/photo7.jpg',
    's13': r'imgs/photo8.jpg'
}


# Load images
def load_image(subject):
    photo_path = photos.get(subject, 'imgs/photo3.jpg')
    image = Image.open(photo_path)
    photo = ImageTk.PhotoImage(image)
    return photo

# Show photo in the UI
def show_photo(subject):
    photo = load_image(subject)
    label.config(image=photo)
    label.image = photo  # Keep a reference to the image to prevent garbage collection

# Function to simulate ECG segment acquisition and identification
def on_ecg_acquired():
    ecg_segment = segmented_data['s10'][0]  # Use a segment from subject s2 as an example
    svm_pred, knn_pred = identify_subject(ecg_segment, method='dct')
    identified_subject = majority_voting([svm_pred, knn_pred])
    print("subb"+ identified_subject)
    show_photo(identified_subject)

on_ecg_acquired

<function __main__.on_ecg_acquired()>