In [3]:
print("This is Cardio Scan")

This is Cardio Scan


In [4]:
print("Hello World")

Hello World


In [None]:
import scipy.io
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import pathlib as pl
import numpy as np
import statistics as stat
import random
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, Activation, GlobalAveragePooling1D, Dense
from tensorflow.keras.models import Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import StratifiedKFold,KFold

In [None]:
def read_heafile(file_name):
    # Open the .hea file
    with open(file_name, 'r') as file:
        # Read the content of the .hea file
        hea_content = file.readlines()

    return hea_content

In [None]:
def create_array(hea_content):
    ID = hea_content[0].strip().split()[0]
    
    # Extract 'Age' from .hea file content
    age_info = hea_content[13].strip().split()
    age = int(age_info[2]) if len(age_info) > 2 and age_info[2].isdigit() else 0
    
    # Extract 'Gender' from .hea file content
    gender = hea_content[14].strip().split()[2] if len(hea_content) > 14 else 'Unknown'
    
    # Extract 'Abnormality' from .hea file content
    abnormality = hea_content[15].strip().split()[2] if len(hea_content) > 15 else 'Unknown'
    
    return [ID, age, gender, abnormality]

In [None]:
def create_dataframes(training_directory):
    dataframes = {}

    subdirectories = [subdir for subdir in pl.Path(training_directory).iterdir() if subdir.is_dir()]
    
    for source_folder_path in subdirectories:
        source_folder_name = source_folder_path.name
        columns = ['ID', 'Age', 'Gender', 'Abnormality']
        source_dataframe = pd.DataFrame(columns=columns)
        patient_data = {}  # To collect patient information
        
        for subdir in source_folder_path.iterdir():
            if subdir.is_dir():
                data_dir = pl.Path(subdir)
                header_files = list(data_dir.glob('*.hea'))

                for header_file in header_files:
                    header_path = data_dir.joinpath(header_file.name)
                    hea_content = read_heafile(header_path)
                    patient_info = create_array(hea_content)
                    patient_id = patient_info[0]
                    
                    # Collect patient information
                    for i, column_name in enumerate(['Age', 'Gender', 'Abnormality']):
                        patient_data.setdefault(patient_id, {})[column_name] = patient_info[i + 1]
                        
        # Create a list of patient data dictionaries
        patient_rows = []
        for patient_id, info in patient_data.items():
            row = {'ID': patient_id, 'Age': info.get('Age'), 'Gender': info.get('Gender'), 'Abnormality': info.get('Abnormality')}
            patient_rows.append(row)
            
        # Concatenate patient data into the dataframe
        source_dataframe = pd.concat([source_dataframe, pd.DataFrame(patient_rows)])
        
        dataframes[f'{source_folder_name}_df'] = source_dataframe
        
    return dataframes

In [None]:
df = create_dataframes('training')


In [None]:
df.keys()

In [None]:
data = pd.read_csv('Dx_map.csv')

In [None]:
def create_anomalies_array(data):
    """
    This function will take a .csv file as the input.
    It will create a array containing all the anomalies
    """
    anomalies_array = []
    
    for index,row in data.iterrows():
        anomalies_array.append(row['SNOMED CT Code'])
    
    return anomalies_array

In [None]:
anomalies = create_anomalies_array(data)

In [None]:
def create_single_output_array(array,anomalies):
    """
    This will take the anomalies array and the array of anomalies of a patient
    This will output an array conatinimg binary values.
    It represents the 1 when a patient has the relavent anomaly , otherwise 0
    """
   
    data = create_anomalies_array(anomalies)
    
    for i in range(len(data)):
        if(data[i] in array):
            data[i] = 1
        else:
            data[i] = 0
    return data

In [None]:
def create_output_array(df,anomalies):
    """
    This will take anomalies array and a data frame as the input
    This will output the Y data set 
    """
    Y = []
    
    for index,row in df.iterrows():
        # Create the anomalies array for the relavent row
        # --------code here---------
        
        array = list(map(int,row['Abnormality'].split(",")))
        output = create_single_output_array(array,anomalies)
        
        Y.append(output)
        
    return Y

In [None]:
srce_files = ['cpsc_2018_df', 'cpsc_2018_extra_df', 'georgia_df', 'ptb_df', 'ptb-xl_df', 'st_petersburg_incart_df']

Y = []

for ele in srce_files:
    y = create_output_array(df[ele],data)
    Y = Y + y
    

In [None]:
np.array(Y).shape

In [None]:
# Function for normializing the wave 
#parameters 
#  wave form representing the array
#  frequency for normalization
#  frequency of the waveform
def normalize_wave(array,nrm_freq,freq):
    factor = round(freq/nrm_freq)
    normalized_array = []
    for ele in array:
        new_ele = ele[::factor]
        normalized_array.append(new_ele)
    return len(normalized_array[0]),normalized_array


In [None]:
def normalize_mats(dir_path):
    # This function will iterate thorugh a data directory and return a list of 
    # nomlized waveforms for the ECG's in that directory
    normalized_waves = []
    lengths = []
    # Iterating through the subdirectories inside the given directory
    for subdir in pl.Path(dir_path).iterdir():
        
        if subdir.is_dir():
            
            data_dir = pl.Path(subdir)
            
            head_file_list = list(data_dir.glob('*.hea'))
            mat_file_list = list(data_dir.glob('*.mat'))
            for i in range(len(head_file_list)):
                head_file_path = data_dir.joinpath(head_file_list[i].name)
                mat_file_path = data_dir.joinpath(mat_file_list[i].name)

                data = scipy.io.loadmat(mat_file_path)['val']
                current_frequency = int(read_heafile(head_file_path)[0].split()[2])
                length,nomralized_wave = normalize_wave(data,250,current_frequency)
                normalized_waves.append(nomralized_wave)
                lengths.append(length)
    return lengths,normalized_waves

In [None]:
srce_files = ['cpsc_2018', 'cpsc_2018_extra', 'georgia', 'ptb', 'ptb-xl', 'st_petersburg_incart']
X = []
lengths = []
for ele in srce_files:
    length,array = normalize_mats('training/' + ele)
    lengths = lengths + length
    X = X + array


In [None]:
x_copy = X
y_copy = Y

In [None]:
plt.figure(figsize=(20,10))
plt.plot(lengths[:(len(lengths)-74)])
plt.show()

In [None]:
ranges = [(1000,5000),(5000,10000),(10000,15000),(15000,20000),(20000,25000),(25000,30000),(30000,35000),(35000,40000)]

# Create a dictionary to store values for each range
range_values = {r: [] for r in ranges}

# Categorize values into ranges
for value in lengths:
    for r in ranges:
        if r[0] <= value < r[1]:
            range_values[r].append(value)
            break

In [None]:
range_labels = [f"{r[0]}-{r[1]}" for r in range_values.keys()]
lengths_values = [len(values) for values in range_values.values()]


plt.figure(figsize=(20,10))
# Create the bar chart
plt.bar(range_labels, lengths_values)

# Add labels and title
plt.xlabel('Ranges')
plt.ylabel('No of Data points')
plt.title('No of data points for Ranges')

# Show the chart
plt.show()

In [None]:
print(lengths_values)
print(sum(lengths_values[1:]))

In [None]:
new_sizes = []
for i in range(len(lengths)):
    if(lengths[i] < 1000 or lengths[i] > 5000):
        y_copy[i] = 0
        x_copy[i] = 0
    else:
        new_sizes.append(lengths[i])

In [None]:
len(lengths)

In [None]:
x_copy = [item for item in x_copy if type(item) != int]
y_copy = [item for item in y_copy if type(item) != int]

In [None]:
print('max :',max(new_sizes))
print('min :',min(new_sizes))
print('average :',round(stat.mean(new_sizes)))

In [None]:
x_copy_new = []
for ele in x_copy:
    
    size = len(ele[0])
    
    if(size < 2617):
        
        start = round((2617 - size)/2)
        end = 2617 - size - start
        
        new_array = []
        
        for data in ele:

            lower_bound,upper_bound = min(data),max(data)
            
            start_list = [random.randint(lower_bound, upper_bound) for _ in range(start)]
            end_list = [random.randint(lower_bound, upper_bound) for _ in range(end)]
            
            new_sub_array = np.array(start_list + list(data) + end_list)
            new_array.append(new_sub_array)
   
        x_copy_new.append(new_array)
    else:
        extra = size - 2617
        half_extra = round(extra)
        
        new_array = []
        
        for data in ele:
            new_sub_array = list(data)[(half_extra-1):(half_extra + 2616)]
            new_array.append(new_sub_array)
        x_copy_new.append(new_array)
    

Training the model

In [None]:
for i in range(len(x_copy_new)):
    x_copy_new[i] = np.array(x_copy_new[i])

In [None]:
sizes_np_arrays = [ele.shape for ele in x_copy_new]

In [None]:
for i in range(len(y_copy)):
    y_copy[i] = np.array(y_copy[i])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(np.array(x_copy_new), np.array(y_copy), test_size=0.1, random_state=42)

In [None]:
y_train.shape

In [None]:
input_shape = (X_train.shape[1], X_train.shape[2])  # Shape: (sequence_length, num_leads)
num_classes = y_train.shape[1]  # Number of anomaly classes

In [None]:
def residual_block(x, filters, kernel_size=3, stride=1):
    identity = x
    
    x = Conv1D(filters, kernel_size, strides=stride, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    x = Conv1D(filters, kernel_size, padding='same')(x)
    x = BatchNormalization()(x)
    
    
    if stride != 1:
        identity = Conv1D(filters, 1, strides=stride)(identity)
    
    x = tf.keras.layers.add([x, identity])
    x = Activation('relu')(x)
    
    return x

In [None]:
inputs = Input(shape=input_shape)
x = Conv1D(64, 7, strides=2, padding='same')(inputs)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# Add residual blocks
x = residual_block(x, 64, stride=1)
x = residual_block(x, 64, stride=1)

x = GlobalAveragePooling1D()(x)
outputs = Dense(num_classes, activation='sigmoid')(x)

In [None]:
# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(X_test, y_test)

In [None]:
print("Test Accuracy : ",test_accuracy)
print("Test Loss : ",test_loss)

In [None]:
model.save('CardioScanPro_resnet_model.h5')

In [None]:
from tensorflow.keras.models import load_model

In [None]:
# This array will print the probabilities for each anomaly
def get_best_(array):
    sorted_array = sorted(array)[::-1]
    for ele in sorted_array:
        print(str(array.index(ele)) + " ==> " + str(ele) )