In [None]:
import numpy as np
import pandas as pd
import warnings
import time
from scipy import signal
from scipy.signal import butter, lfilter
import matplotlib.pyplot as plt
import math
import pickle
import statistics
import random
from sklearn.metrics.pairwise import cosine_similarity
from math import log
from sklearn import preprocessing
from sklearn import metrics
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import sktime as sktime
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_val_score
from sktime.datasets import load_from_tsfile_to_dataframe
from sktime.alignment.dtw_python import AlignerDTWfromDist
from sktime.dists_kernels.scipy_dist import ScipyDist
from sktime.dists_kernels.compose_from_align import DistFromAligner
import os
import sys
%matplotlib inline
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import RidgeClassifierCV
from sktime.transformations.panel.rocket import Rocket
from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier
from sktime.classification.hybrid import HIVECOTEV2
from sktime.classification.dictionary_based import MUSE
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

### Load normal and abnormal datasets

In [None]:
final_normal_df = pd.read_pickle('normal_EMO_pain.pkl')
final_abnormal_df = pd.read_pickle('abnormal_df.pkl')

In [None]:
final_abnormal_df = final_abnormal_df.drop(columns=['subject_number','activity'])
final_abnormal_df.rename(columns={'new_subject_number': 'Subject'}, inplace=True)

In [None]:
abnormal_indices = (final_abnormal_df['Subject']).unique()
abnormal_no = (final_abnormal_df['Subject']).nunique()

In [None]:
# Specify the file path of the saved pkl file
file_path = 'final_bins_67.pkl'

# Load the dictionary from the pkl file
with open(file_path, 'rb') as file:
    final_numbers_dictionary = pickle.load(file)
# Now, 'loaded_dictionary' contains the dictionary loaded from the pkl file
display("Dictionary loaded from pickle file:")

In [None]:
dictionary_keys = final_numbers_dictionary.keys()
keys_list = list(dictionary_keys)

### Function to find the length of the time series

In [None]:
# Define a function to z-normalize a column
def z_normalize(column):
    mean = column.mean()
    std = column.std()
    z_normalized = (column - mean) / std
    return z_normalized

In [None]:
def time_series_length_finding(df):
    train_1_abnormal = df.copy()
    train_1_ab_sub = (train_1_abnormal['Subject'].unique()).tolist()
    row_count = []
    for i in train_1_ab_sub:
        train_1_abnormal_new = train_1_abnormal[(train_1_abnormal['Subject']== i)]
        no_rows = train_1_abnormal_new.shape[0]
        row_count.append(no_rows)
    display("Avg length:",statistics.mean(row_count))
    avg_len_ts = int(statistics.mean(row_count))
    return(avg_len_ts)

In [None]:
def preprocessing_normal_df(df,desired_length):
    train_2_normal = df.copy()
    df_train_3 = pd.DataFrame(columns = train_2_normal.columns)
    num_dim = df_train_3.shape[1]
    display(num_dim)
    df_train_3.columns = range(len(df_train_3.columns))
    train_2_sub = (train_2_normal['Subject'].unique()).tolist()
    m = 0
    for i in train_2_sub:
        train_2_normal_new = train_2_normal[(train_2_normal['Subject']== i)]
        df_row_count = train_2_normal_new.shape[0]
        if df_row_count < desired_length:
            rows_to_add = desired_length - df_row_count
            train_2_normal_new.tail()
            last_row = train_2_normal_new.ffill().iloc[[-1]]  # Extract the last row
            new_rows_df = pd.concat([last_row] * rows_to_add, ignore_index=True)
            train_2_normal_initial = pd.concat([train_2_normal_new, new_rows_df], ignore_index=True)
            train_2_normal_initial_no_sub = train_2_normal_initial.drop(columns='Subject')
            train_2_normal_final = z_normalize(train_2_normal_initial_no_sub)
            train_2_normal_final.insert(0, 'Subject', i)

        else:
            train_2_normal_initial = train_2_normal_new.iloc[:desired_length,:]
            train_2_normal_initial_no_sub = train_2_normal_initial.drop(columns='Subject')
            train_2_normal_final = z_normalize(train_2_normal_initial_no_sub)
            train_2_normal_final.insert(0, 'Subject', i)
            
        train_2_normal_final.columns = range(len(train_2_normal_final.columns))
        
        df_train_3.at[m,0] = i
        j = 0
        while j < num_dim:
            df_train_3.at[m,j] = train_2_normal_final[j]
            
            j += 1
        m += 1    
    df_train_3['class'] = 'normal'
    df_normal_baseline = df_train_3.iloc[:,1:]
    return(df_normal_baseline)

In [None]:
def preprocessing_dataframe(df,desired_length):
    train_1_abnormal = df.copy()
    df_train_2 = pd.DataFrame(columns = train_1_abnormal.columns)
    num_dim_ab = df_train_2.shape[1]

    df_train_2.columns = range(len(df_train_2.columns))
 
    train_1_ab_sub = (train_1_abnormal['Subject'].unique()).tolist()
    m = 0
    for i in train_1_ab_sub:
        train_1_abnormal_new = train_1_abnormal[(train_1_abnormal['Subject']== i)]
        df_row_count = train_1_abnormal_new.shape[0]
        if df_row_count < desired_length:
            rows_to_add = desired_length - df_row_count

            last_row = train_1_abnormal_new.ffill().iloc[[-1]]  # Extract the last row
            new_rows_df = pd.concat([last_row] * rows_to_add, ignore_index=True)
            train_1_abnormal_initial = pd.concat([train_1_abnormal_new, new_rows_df], ignore_index=True)
            train_1_abnormal_initial_no_sub = train_1_abnormal_initial.drop(columns='Subject')
            train_1_abnormal_final = z_normalize(train_1_abnormal_initial_no_sub)
            train_1_abnormal_final.insert(0, 'Subject', i)

        else:
            train_1_abnormal_initial = train_1_abnormal_new.iloc[:desired_length,:]
            train_1_abnormal_initial_no_sub = train_1_abnormal_initial.drop(columns='Subject')
            train_1_abnormal_final = z_normalize(train_1_abnormal_initial_no_sub)
            train_1_abnormal_final.insert(0, 'Subject', i)
        train_1_abnormal_final.columns = range(len(train_1_abnormal_final.columns))

        df_train_2.at[m,0] = i
        j = 0
        while j < num_dim_ab:
            df_train_2.at[m,j] = train_1_abnormal_final[j]
            j += 1
        m += 1    
    df_train_2['class'] = 'abnormal'
    df_final = df_train_2.iloc[:,1:]
    return(df_final)

In [None]:
def ts_classifier(df_class_1):
    acc_mean_list = []
    prec_mean_list = []
    recall_mean_list = []
    f1_mean_list = []

    X = df_class_1.iloc[:,:-1]
    y = df_class_1.iloc[:,-1]

    for i in range(1, 31):
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=i, stratify=y)
        
        accuracy_dict = {} 
        precision_dict = {}
        recall_dict = {}
        f1_dict = {}
        
        comb = 1
        for a in[2,4,6]:
            for b in [4,8]:
                clf = MUSE(window_inc=a,alphabet_size=b,use_first_order_differences=False, random_state=42)
                clf.fit(X_train, y_train)
                
                # Predict using the final model
                y_pred = clf.predict(X_test)

                pos_label = 'abnormal'

                acc = accuracy_score(y_test, y_pred)
                prec = precision_score(y_test, y_pred, pos_label=pos_label)
                recall = recall_score(y_test, y_pred, pos_label=pos_label)
                f1 = f1_score(y_test, y_pred, pos_label=pos_label)
                
                accuracy_dict[(a, b)] = acc
                precision_dict[(a, b)] = prec
                recall_dict[(a, b)] = recall
                f1_dict[(a, b)] = f1
                
                display("Completed comb",comb)
                comb += 1
        
        accuracy_max_dict = max(accuracy_dict.items(), key=lambda x: x[1])
        display("Accuracy Dictionary:",accuracy_dict)
        accuracy_max = accuracy_max_dict[1]

        max_a_b = accuracy_max_dict[0]
        display("(a, b) combination for maximum accuracy:",max_a_b)

        display('Accuracy: %f' % accuracy_max)
        acc_mean_list.append(accuracy_max)

        precision_max = precision_dict[max_a_b]
        display('Precision: %f' % precision_max)
        prec_mean_list.append(precision_max)

        recall_max = recall_dict[max_a_b]
        display('Recall: %f' % recall_max)
        recall_mean_list.append(recall_max)

        f1_score_max = f1_dict[max_a_b]
        display('f1_score: %f' % f1_score_max)
        f1_mean_list.append(f1_score_max)

        display("Completed", i)

    acc_mean = np.mean(acc_mean_list)
    acc_std_dev = np.std(acc_mean_list)
    display('Accuracy Mean: %f' % acc_mean)

    prec_mean = np.mean(prec_mean_list)
    prec_std_dev = np.std(prec_mean_list)

    recall_mean = np.mean(recall_mean_list)
    recall_std_dev = np.std(recall_mean_list)

    f1_mean = np.mean(f1_mean_list)
    f1_std_dev = np.std(f1_mean_list)
    display('F1 Mean: %f' % f1_mean)

    return (acc_mean, acc_std_dev, prec_mean, prec_std_dev, recall_mean, recall_std_dev, f1_mean, f1_std_dev)

In [None]:
# Create a new dictionary with combined values
# Keys to combine
key_to_combine1 = 'sample_numbers_list_0.0_to_0.1'
key_to_combine2 = 'sample_numbers_list_0.1_to_0.2'

# Create a new dictionary with combined values
hb_dict = {'hb_key': final_numbers_dictionary[key_to_combine1] + final_numbers_dictionary[key_to_combine2]}
display(hb_dict)

In [None]:
classifier_dict_hb = {}
key = 'hb_key'
normal_sample_numbers = [] 
normal_sample_numbers.extend(hb_dict[key])
normal_sample_length = len(normal_sample_numbers)
display(normal_sample_length)

if len(normal_sample_numbers) <= abnormal_no:
    df_normal = final_normal_df[final_normal_df['Subject'].isin(normal_sample_numbers)]
    display(df_normal.head(2))

    random.seed(42)
    abnormal_sample_numbers = random.sample(abnormal_indices.tolist(), normal_sample_length)
    df_abnormal = final_abnormal_df[final_abnormal_df['Subject'].isin(abnormal_sample_numbers)]
    display(df_abnormal.head(2))

    desired_length_for_ts = time_series_length_finding(df_abnormal)

    df_normal_preprocessed = preprocessing_normal_df(df_normal,desired_length_for_ts)
    df_abnormal_preprocessed = preprocessing_dataframe(df_abnormal,desired_length_for_ts)

    df_classifier = (pd.concat([df_normal_preprocessed,df_abnormal_preprocessed]).reset_index()).drop(columns='index')
    acc_mean,acc_std_dev,prec_mean,prec_std_dev,recall_mean,recall_std_dev,f1_mean,f1_std_dev = ts_classifier(df_classifier)

    classifier_dict_hb[key] = {'acc_mean': acc_mean,'acc_std_dev': acc_std_dev,'prec_mean': prec_mean,'prec_std_dev': prec_std_dev,'recall_mean': recall_mean,'recall_std_dev': recall_std_dev,'f1_mean': f1_mean,'f1_std_dev': f1_std_dev}

else:
    df_abnormal = final_abnormal_df

    random.seed(42)
    random_sample_numbers = random.sample(normal_sample_numbers, abnormal_no)
    df_normal = final_normal_df[final_normal_df['Subject'].isin(random_sample_numbers)]
    display(df_normal)

    desired_length_for_ts = time_series_length_finding(df_abnormal)

    df_normal_preprocessed = preprocessing_normal_df(df_normal,desired_length_for_ts)
    df_abnormal_preprocessed = preprocessing_dataframe(df_abnormal,desired_length_for_ts)

    df_classifier = (pd.concat([df_normal_preprocessed,df_abnormal_preprocessed]).reset_index()).drop(columns='index')
    acc_mean,acc_std_dev,prec_mean,prec_std_dev,recall_mean,recall_std_dev,f1_mean,f1_std_dev = ts_classifier(df_classifier)

    classifier_dict_hb[key] = {'acc_mean': acc_mean,'acc_std_dev': acc_std_dev,'prec_mean': prec_mean,'prec_std_dev': prec_std_dev,'recall_mean': recall_mean,'recall_std_dev': recall_std_dev,'f1_mean': f1_mean,'f1_std_dev': f1_std_dev}

In [None]:
import pickle

# Specify the file path where you want to save the pickle file
pickle_file_path = 'hb_EMO_pain_MUSE_normalized.pickle'

# Save the dictionary to a pickle file
with open(pickle_file_path, 'wb') as file:
    pickle.dump(classifier_dict_hb, file)

print(f"Dictionary saved to {pickle_file_path}")

In [None]:
classifier_dict_hb

In [None]:
all_values = [value for sublist in final_numbers_dictionary.values() for value in sublist]
healthy_dict = {'healthy_key': all_values}
display(healthy_dict)

In [None]:
classifier_dict_healthy = {}
key = 'healthy_key'
normal_sample_numbers = []
normal_sample_numbers.extend(healthy_dict[key])
normal_sample_length = len(normal_sample_numbers)
display(normal_sample_length)

if len(normal_sample_numbers) <= abnormal_no:
    df_normal = final_normal_df[final_normal_df['Subject'].isin(normal_sample_numbers)]
    display(df_normal.head())

    random.seed(42)
    abnormal_sample_numbers = random.sample(abnormal_indices.tolist(), normal_sample_length)
    df_abnormal = final_abnormal_df[final_abnormal_df['Subject'].isin(abnormal_sample_numbers)]
    display(df_abnormal.head())

    desired_length_for_ts = time_series_length_finding(df_abnormal)

    df_normal_preprocessed = preprocessing_normal_df(df_normal,desired_length_for_ts)
    df_abnormal_preprocessed = preprocessing_dataframe(df_abnormal,desired_length_for_ts)

    df_classifier = (pd.concat([df_normal_preprocessed,df_abnormal_preprocessed]).reset_index()).drop(columns='index')
    acc_mean,acc_std_dev,prec_mean,prec_std_dev,recall_mean,recall_std_dev,f1_mean,f1_std_dev = ts_classifier(df_classifier)

    classifier_dict_healthy[key] = {'acc_mean': acc_mean,'acc_std_dev': acc_std_dev,'prec_mean': prec_mean,'prec_std_dev': prec_std_dev,'recall_mean': recall_mean,'recall_std_dev': recall_std_dev,'f1_mean': f1_mean,'f1_std_dev': f1_std_dev}

else:
    df_abnormal = final_abnormal_df

    random.seed(42)
    random_sample_numbers = random.sample(normal_sample_numbers, abnormal_no)
    df_normal = final_normal_df[final_normal_df['Subject'].isin(random_sample_numbers)]
    display(df_normal)

    desired_length_for_ts = time_series_length_finding(df_abnormal)

    df_normal_preprocessed = preprocessing_normal_df(df_normal,desired_length_for_ts)
    df_abnormal_preprocessed = preprocessing_dataframe(df_abnormal,desired_length_for_ts)

    df_classifier = (pd.concat([df_normal_preprocessed,df_abnormal_preprocessed]).reset_index()).drop(columns='index')
    acc_mean,acc_std_dev,prec_mean,prec_std_dev,recall_mean,recall_std_dev,f1_mean,f1_std_dev = ts_classifier(df_classifier)

    classifier_dict_healthy[key] = {'acc_mean': acc_mean,'acc_std_dev': acc_std_dev,'prec_mean': prec_mean,'prec_std_dev': prec_std_dev,'recall_mean': recall_mean,'recall_std_dev': recall_std_dev,'f1_mean': f1_mean,'f1_std_dev': f1_std_dev}

In [None]:
classifier_dict_healthy

In [None]:
import pickle

# Specify the file path where you want to save the pickle file
pickle_file_path = 'healthy_EMO_pain_MUSE_normalized.pickle'

# Save the dictionary to a pickle file
with open(pickle_file_path, 'wb') as file:
    pickle.dump(classifier_dict_healthy, file)

print(f"Dictionary saved to {pickle_file_path}")