In [None]:
!pip install wfdb antropy EMD-signal

^C




In [4]:
import os
import wfdb
import pandas as pd
import numpy as np
from scipy.signal import welch
import antropy as ant
import json
from joblib import Parallel, delayed
import pywt
from PyEMD import EMD
from scipy.signal import hilbert
from xgboost import XGBClassifier
from sklearn.metrics import f1_score,accuracy_score,recall_score,precision_score

In [None]:
project_directory="als_datasets"  # content/drive/MyDrive/Colab Notebooks/Diplomska
emglab_dataset=os.path.join(project_directory,"emglab_dataset_bb")


als_myopathy_dataset=os.path.join(project_directory,"als_myopathy_dataset")
control_myopathy_dataset=os.path.join(project_directory,"control_myopathy_dataset")
control_als_dataset=os.path.join(project_directory,"control_als_dataset")
control_als_myopathy_dataset=os.path.join(project_directory,"control_als_myopathy_dataset")


In [None]:
def create_folder_if_not_exist(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)


create_folder_if_not_exist(als_myopathy_dataset)
create_folder_if_not_exist(control_myopathy_dataset)
create_folder_if_not_exist(control_als_dataset)
create_folder_if_not_exist(control_als_myopathy_dataset)

In [None]:
def parse_binary_files(database_dir):
    signal_data_list=[]
    record_name_list=[]

    hea_files=[f for f in os.listdir(database_dir) if f.endswith(".hea")]

    for file in hea_files:
        record_name=os.path.splitext(file)[0]
        record_path=os.path.join(database_dir,record_name)
        record=wfdb.rdrecord(record_path)

        signal_data=np.array(record.p_signal).flatten()
        
        signal_data_list.append(signal_data)
        record_name_list.append(record_name)

    signal_data_array=np.stack(signal_data_list)

    data = list(zip(record_name_list, signal_data_array))
    sorted_data = sorted(data, key=lambda x: x[0])

    record_name_list=[x[0] for x in sorted_data]
    signal_data_array=[x[1] for x in sorted_data]

    return signal_data_array,record_name_list

In [None]:
def generate_database(data,file_names):
    
    def classification_func(row):
        return {"C":0,"M":2, "A":1}[row["Diagnosis"]]

    df_data=[
        {
            "Diagnosis":file_names[i][5],
            "Patient":int(file_names[i][6:8]),
            "Signal":data[i]
        } for i in range(0,len(data))
    ]

    df=pd.DataFrame(df_data)

    df["Class"]=df.apply(classification_func,axis=1)
    df.drop(columns=["Diagnosis"],inplace=True)
    return df

In [None]:
def segmentation(signal,fs=23437,window_length=1):
    window_size = int(round(fs * window_length))
    segments = [signal[i:i + window_size] for i in range(0, len(signal), window_size)]
    segments = [s for s in segments if len(s) == window_size]
    return segments

In [None]:
def extract_time_features(segment):
    mav = np.mean(np.abs(segment))  # Mean Absolute Value
    wl = np.sum(np.abs(np.diff(segment)))  # Waveform Length
    zc = np.sum(np.diff(np.sign(segment)) != 0)  # Zero Crossing
    ssc = np.sum((segment[1:-1] - segment[:-2]) * (segment[1:-1] - segment[2:]) > 0)  # Slope Sign Changes
    return [mav, wl, zc, ssc]

def extract_frequency_features(segment, fs=23437.5):
    freqs, psd = welch(segment, fs=fs, nperseg=len(segment))

    if len(psd) == 0 or np.sum(psd) == 0: 
        return [0, 0]  
    
    mnf = np.sum(freqs * psd) / np.sum(psd)  # Mean Frequency

    
    cumulative_psd = np.cumsum(psd)
    valid_indices = np.where(cumulative_psd >= np.sum(psd) / 2)[0]
    
    if len(valid_indices) == 0:  
        mdf = 0
    else:
        mdf = freqs[valid_indices[0]]  # Median Frequency

    return [mnf, mdf]

def extract_nonlinear_features(segment):
    apen = ant.app_entropy(segment)  # Approximate Entropy
    sampen = ant.sample_entropy(segment)  # Sample Entropy
    fd = ant.higuchi_fd(segment)  # Fractal Dimension
    return [apen, sampen, fd]


def extract_spectral_features(segment,fs=23437.5):
    coefficients, _ = pywt.cwt(segment, np.arange(1, 128), 'morl')
    we=np.sum(coefficients ** 2)  # Wavelength Energy

    emd = EMD()
    imfs = emd(segment)

    all_freqs = []
    for imf in imfs:
        analytic_signal = hilbert(imf)
        instantaneous_phase = np.unwrap(np.angle(analytic_signal))
        instantaneous_freq = np.diff(instantaneous_phase) / (2.0 * np.pi) * fs
        all_freqs.append(np.mean(instantaneous_freq))
    mif = np.mean(all_freqs)  # Mean Instantaneous Frequency

    return [we,mif]


In [None]:
def dataset_formatting(dataframe,columns_to_drop):
    dataset=[]
    for i, row in dataframe.drop(columns=columns_to_drop).iterrows():
        sample={}
        prompt="Quantitative EMG: "
        for key,value in row.items():
            if(key != "Class"):
                prompt+=f"{key} = {value}, "
        completion="Healthy" if row["Class"] == 0 else "ALS" if row["Class"] == 1 else "Myopathy"
        sample["prompt"]=prompt[:-2]
        sample["completion"]=completion
        
        dataset.append(sample)
    
    return dataset

In [None]:
def save_jsonl(data, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item) + "\n")

In [None]:
signal_data_array,record_name_list=parse_binary_files(emglab_dataset)
df=generate_database(signal_data_array,record_name_list)

df["Signal_Segment"]=df["Signal"].apply(lambda sig:segmentation(sig,window_length=0.25))

In [None]:
df_interval=pd.DataFrame(columns=["Patient","Class","Interval"])

for i,row in df.iterrows():
    for seg in row["Signal_Segment"]:
        new_row=[row["Patient"],row["Class"],seg]
        df_interval.loc[len(df_interval)]=new_row

In [None]:
df_interval["Interval"].apply(lambda seg:len(seg)).value_counts()

Interval
5859    20900
Name: count, dtype: int64

In [None]:
df_interval["Interval"]=df_interval["Interval"].apply(lambda seg:[s for s in seg if not np.isnan(s)])
df_interval["To_Drop"]=df_interval["Interval"].apply(lambda seg:len(seg) == 5859)
df_interval=df_interval[df_interval["To_Drop"] == True].drop(columns=["To_Drop"])

In [None]:
df_interval[["MAV","WL", "ZC", "SSC"]]=df_interval["Interval"].apply(lambda seg:extract_time_features(np.array(seg))).apply(pd.Series)
df_interval[["MNF","MDF"]]=df_interval["Interval"].apply(lambda seg:extract_frequency_features(np.array(seg))).apply(pd.Series)

nonlinear_results = Parallel(n_jobs=-1)(
    delayed(extract_nonlinear_features)(np.array(seg)) for seg in df_interval["Interval"]
)
df_interval[["APEN", "SampEn", "FracDim"]] = pd.DataFrame(nonlinear_results)

spectral_results=Parallel(n_jobs=-1)(
    delayed(extract_spectral_features)(np.array(seg)) for seg in df_interval["Interval"]
)
df_interval[["WE","MIF"]]=pd.DataFrame(spectral_results)

In [None]:
df_interval.dropna(inplace=True)
train_patients=[3,4,5,6,7]
test_patients=[1,2,8,9]

train_dataset=df_interval[df_interval["Patient"].isin(train_patients)].drop(columns=["Interval","Patient"])
test_dataset=df_interval[df_interval["Patient"].isin(test_patients)].drop(columns=["Interval","Patient"])

# train_dataset.to_csv("train.csv",index=False)
# test_dataset.to_csv("test.csv",index=False)

In [None]:
# train_dataset=pd.read_csv("train.csv")
# test_dataset=pd.read_csv("test.csv")

In [6]:
train_control=train_dataset[train_dataset["Class"] == 0]
train_als=train_dataset[train_dataset["Class"] == 1]
train_myopathy=train_dataset[train_dataset["Class"] == 2]

test_control=test_dataset[test_dataset["Class"] == 0]
test_als=test_dataset[test_dataset["Class"] == 1]
test_myopathy=test_dataset[test_dataset["Class"] == 2]

In [7]:
print("Train Control samples:",train_control.shape[0],"\t","Test Control samples:",test_control.shape[0])
print("Train ALS samples:",train_als.shape[0],"\t","Test ALS samples:",test_als.shape[0])
print("Train Myopathy samples:",train_myopathy.shape[0],"\t","Test Myopathy samples:",test_myopathy.shape[0])

Train Control samples: 6597 	 Test Control samples: 5280
Train ALS samples: 3062 	 Test ALS samples: 1173
Train Myopathy samples: 3562 	 Test Myopathy samples: 1095


In [8]:
train_control_sample=train_control.sample(3300)
test_control_sample=test_control.sample(1100)

In [9]:
train_control_als_df=pd.concat([train_control_sample,train_als])
train_control_myopathy_df=pd.concat([train_control_sample,train_myopathy])
train_als_myopathy_df=pd.concat([train_als,train_myopathy])
train_control_als_myopathy_df=pd.concat([train_control_sample,train_als,train_myopathy])


test_control_als_df=pd.concat([test_control_sample,test_als])
test_control_myopathy_df=pd.concat([test_control_sample,test_myopathy])
test_als_myopathy_df=pd.concat([test_als,test_myopathy])
test_control_als_myopathy_df=pd.concat([test_control_sample,test_als,test_myopathy])

In [None]:
train_control_als_df=dataset_formatting(train_control_als_df,[])
train_control_myopathy_df=dataset_formatting(train_control_myopathy_df,[])
train_als_myopathy_df=dataset_formatting(train_als_myopathy_df,[])
train_control_als_myopathy_df=dataset_formatting(train_control_als_myopathy_df,[])
test_control_als_df=dataset_formatting(test_control_als_df,[])
test_control_myopathy_df=dataset_formatting(test_control_myopathy_df,[])
test_als_myopathy_df=dataset_formatting(test_als_myopathy_df,[])
test_control_als_myopathy_df=dataset_formatting(test_control_als_myopathy_df,[])

save_jsonl(train_control_als_df,os.path.join(control_als_dataset,"train.jsonl"))
save_jsonl(train_control_myopathy_df,os.path.join(control_myopathy_dataset,"train.jsonl"))
save_jsonl(train_als_myopathy_df,os.path.join(als_myopathy_dataset,"train.jsonl"))
save_jsonl(train_control_als_myopathy_df,os.path.join(control_als_myopathy_dataset,"train.jsonl"))
save_jsonl(test_control_als_df,os.path.join(control_als_dataset,"test.jsonl"))
save_jsonl(test_control_myopathy_df,os.path.join(control_myopathy_dataset,"test.jsonl"))
save_jsonl(test_als_myopathy_df,os.path.join(als_myopathy_dataset,"test.jsonl"))
save_jsonl(test_control_als_myopathy_df,os.path.join(control_als_myopathy_dataset,"test.jsonl"))

In [11]:
train_control_myopathy_df["Class"]=train_control_myopathy_df["Class"].apply(lambda c:c if c != 2 else 1)
train_als_myopathy_df["Class"]=train_als_myopathy_df["Class"].apply(lambda c:c if c != 2 else 0)

test_control_myopathy_df["Class"]=test_control_myopathy_df["Class"].apply(lambda c:c if c !=2 else 1)
test_als_myopathy_df["Class"]=test_als_myopathy_df["Class"].apply(lambda c:c if c !=2 else 0)

In [None]:
model=XGBClassifier()
model.fit(train_control_als_df.drop(columns=["Class"]),train_control_als_df["Class"])
y_true=test_control_als_df["Class"]
y_pred=model.predict(test_control_als_df.drop(columns=["Class"]))
print("Control - ALS Binary Dataset")
print()
print("Precision:",precision_score(y_true,y_pred))
print("Recall:",recall_score(y_true,y_pred))
print("F1 Score:",f1_score(y_true,y_pred))
print("Accuracy:",accuracy_score(y_true,y_pred))

Control - ALS Binary Dataset

Precision: 0.725609756097561
Recall: 0.8115942028985508
F1 Score: 0.7661971830985915
Accuracy: 0.7443906731192257


In [None]:
model=XGBClassifier()
model.fit(train_control_myopathy_df.drop(columns=["Class"]),train_control_myopathy_df["Class"])
y_true=test_control_myopathy_df["Class"]
y_pred=model.predict(test_control_myopathy_df.drop(columns=["Class"]))
print("Control - Myopathy Binary Dataset")
print()
print("Precision:",precision_score(y_true,y_pred))
print("Recall:",recall_score(y_true,y_pred))
print("F1 Score:",f1_score(y_true,y_pred))
print("Accuracy:",accuracy_score(y_true,y_pred))

Control - Myopathy Binary Dataset

Precision: 0.7615571776155717
Recall: 0.5716894977168949
F1 Score: 0.6531038080333855
Accuracy: 0.6970387243735763


In [None]:
model=XGBClassifier()
model.fit(train_als_myopathy_df.drop(columns=["Class"]),train_als_myopathy_df["Class"])
y_true=test_als_myopathy_df["Class"]
y_pred=model.predict(test_als_myopathy_df.drop(columns=["Class"]))
print("ALS - Myopathy Binary Dataset")
print()
print("Precision:",precision_score(y_true,y_pred))
print("Recall:",recall_score(y_true,y_pred))
print("F1 Score:",f1_score(y_true,y_pred))
print("Accuracy:",accuracy_score(y_true,y_pred))

ALS - Myopathy Binary Dataset

Precision: 0.8113348247576435
Recall: 0.927536231884058
F1 Score: 0.8655529037390612
Accuracy: 0.8509700176366843


In [12]:
model=XGBClassifier()
model.fit(train_control_als_myopathy_df.drop(columns=["Class"]),train_control_als_myopathy_df["Class"])
y_true=test_control_als_myopathy_df["Class"]
y_pred=model.predict(test_control_als_myopathy_df.drop(columns=["Class"]))
print("Control - ALS - Myopathy Binary Dataset")
print()
print("Precision:",precision_score(y_true,y_pred,average="macro"))
print("Recall:",recall_score(y_true,y_pred,average="macro"))
print("F1 Score:",f1_score(y_true,y_pred,average="macro"))
print("Accuracy:",accuracy_score(y_true,y_pred))

Control - ALS - Myopathy Binary Dataset

Precision: 0.672957032239012
Recall: 0.6571336388797763
F1 Score: 0.6574641821140785
Accuracy: 0.6603325415676959
