In [1]:
import os
import tensorflow as tf
import wfdb
from scipy import signal
import numpy as np
import pandas as pd
import csv

In [2]:
# Configuration
MODEL_PATH = "./model/model.keras"
REQUIRED_LENGTH = 1000
NUM_SAMPLES_TO_PLOT = 1
TARGET_FS = 100
SaMiTrop = True

In [3]:
def get_dat_files(directory):
    files = os.listdir(directory)
    dat_files = [file for file in files if file.endswith('.dat')]
    return dat_files

In [4]:
def process_hea_files(directory, dat_files,true_file,false_file):
    true_files = []
    false_files = []
    
    for dat_file in dat_files:
        hea_file = dat_file.replace('.dat', '.hea')
        hea_path = os.path.join(directory, hea_file)

        if os.path.exists(hea_path):
            with open(hea_path, 'r') as file:
                content = file.read()
                
                if "Chagas label: True" in content:
                    true_files.append(dat_file)
                elif "Chagas label: False" in content:
                    false_files.append(dat_file)

    with open(true_file, 'w') as true_file:
        for file_name in true_files:
            true_file.write(file_name.split('.dat')[0] + '\n')
    
    with open(false_file, 'w') as false_file:
        for file_name in false_files:
            false_file.write(file_name.split('.dat')[0] + '\n')

In [5]:
def split_csv(input_csv, true_output, false_output):
    with open(input_csv, 'r', newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        
        with open(true_output, 'w', encoding='utf-8') as true_file, open(false_output, 'w', encoding='utf-8') as false_file:
            for row in reader:
                if row[reader.fieldnames[2]].strip().upper() == 'TRUE':
                    true_file.write(row[reader.fieldnames[0]] + '\n')
                else:
                    false_file.write(row[reader.fieldnames[0]] + '\n')

In [6]:
def check_files(input_txt, directory, output_txt):
    with open(input_txt, 'r', encoding='utf-8') as file:
        filenames = [line.strip() for line in file] 
    
    existing_files = [f for f in filenames if os.path.isfile(os.path.join(directory, f + '.dat'))]
    
    with open(output_txt, 'w', encoding='utf-8') as output_file:
        for file in existing_files:
            output_file.write(file + '\n')

In [7]:
def load_model(model_path):
    if not tf.io.gfile.exists(model_path):
        raise FileNotFoundError(f"Model file {model_path} not found.")
    
    model = tf.keras.models.load_model(model_path, compile=False)
    return model

In [8]:
def resample_signal(original_signal, original_fs, target_fs=100):
    if original_fs == target_fs:
        return original_signal
    fs_ratio = target_fs / original_fs
    return signal.resample(original_signal, int(original_signal.shape[0] * fs_ratio))

In [9]:
def adjust_signal_length(signal, target_length):
    """Pad or truncate signal to target length"""
    if signal.shape[0] < target_length:
        return np.pad(signal, ((0, target_length - signal.shape[0]), (0, 0)), mode='constant')
    return signal[:target_length]

In [10]:
#Helper
def get_header_file(record):
    if not record.endswith('.hea'):
        header_file = record + '.hea'
    else:
        header_file = record
    return header_file

def load_text(filename):
    with open(filename, 'r') as f:
        string = f.read()
    return string

def load_header(record):
    header_file = get_header_file(record)
    header = load_text(header_file)
    return header

def load_signals(record):
    signal, fields = wfdb.rdsamp(record)
    return signal, fields

def get_variable(string, variable_name):
    variable = ''
    has_variable = False
    for l in string.split('\n'):
        if l.startswith(variable_name):
            variable = l[len(variable_name):].strip()
            has_variable = True
    return variable, has_variable

def remove_extra_characters(x):
    x = str(x)
    x = x.replace('"', '').replace("'", "")
    x = x.replace('(', '').replace(')', '').replace('[', '').replace(']', '').replace('{', '').replace('}', '')
    x = x.replace(' ', '').replace('\t', '')
    x = x.strip()
    return x

def is_number(x):
    try:
        float(x)
        return True
    except (ValueError, TypeError):
        return False

def sanitize_boolean_value(x):
    x = remove_extra_characters(x)
    if (is_number(x) and float(x)==0) or (remove_extra_characters(x).casefold() in ('false', 'f', 'no', 'n')):
        return 0
    elif (is_number(x) and float(x)==1) or (remove_extra_characters(x).casefold() in ('true', 't', 'yes', 'y')):
        return 1
    else:
        return float('nan')

def get_label(string, allow_missing=False):
    label, has_label = get_variable(string, label_string)
    if not has_label and not allow_missing:
        raise Exception('No label is available: are you trying to load the labels from the held-out data?')
    label = sanitize_boolean_value(label)
    return label

def load_label(record):
    header = load_header(record)
    label = get_label(header)
    return label

In [11]:
def process_and_save_predictions(directory,file_list, output_excel,truefalse):
    results = []
    count = 0;
    
    with open(file_list, "r") as f:
        for line in f:
            o_f = line.strip()
            dat_file = directory + o_f
            
            # Load the ECG signal
            ecg, text = load_signals(dat_file)
            original_fs = int(text["fs"])
            original_signal = ecg
            
            # Process the signal
            resampled_signal = resample_signal(original_signal, original_fs, TARGET_FS)
            adjusted_signal = adjust_signal_length(resampled_signal, REQUIRED_LENGTH)
            input_tensor = tf.convert_to_tensor(np.expand_dims(adjusted_signal, 0), dtype=tf.float32)
            
            # Make prediction
            probability_output = model.predict(input_tensor)
            
            # Store the results in a list
            results.append([o_f, probability_output[0][0]])
            count = count+1
            if count == 200 and not truefalse:
                break
    
    # Convert results to a DataFrame and sort by prediction result in descending order
    df = pd.DataFrame(results, columns=["File Name", "Prediction Result"])
    if truefalse:
        df = df.sort_values(by="Prediction Result", ascending=False)
    else:
        df = df.sort_values(by="Prediction Result", ascending=True)
    
    # Save to an Excel file
    df.to_excel(output_excel, index=False)

In [12]:
model = load_model(MODEL_PATH)

if SaMiTrop:
    directory = '../samitrop_processed/'
    dat_files = get_dat_files(directory)
    process_hea_files(directory, dat_files,'existing_true_files.txt','existing_false_files.txt')
    
else:
    directory = '../GMC2025/code15_wfdb/'
    chagas_csv_label = '../GMC2025/code15_hdf5/code15_chagas_labels.csv'
    split_csv(chagas_csv_label, 'true_values.txt', 'false_values.txt')
    check_files('true_values.txt', directory, 'existing_true_files.txt')
    check_files('false_values.txt', directory, 'existing_false_files.txt')

In [13]:
process_and_save_predictions(directory,"existing_true_files.txt", "true_file_predictions.xlsx",True)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 210ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2

In [15]:
process_and_save_predictions(directory,"existing_false_files.txt", "false_file_predictions.xlsx",False)