In [None]:
import pandas as pd
import os, glob
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report,confusion_matrix,accuracy_score,f1_score
from numpy import mean
from numpy import std
from sklearn.datasets import make_classification
from sklearn.model_selection import RepeatedKFold,KFold
from sklearn.model_selection import cross_val_score,cross_validate
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from scipy.spatial import distance

In [None]:
import sys
import collections
import itertools
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import mode
from scipy.spatial.distance import squareform

plt.style.use('bmh')
%matplotlib inline

try:
    from IPython.display import clear_output
    have_ipython = True
except ImportError:
    have_ipython = False

class KnnDtw(object):
    """K-nearest neighbor classifier using dynamic time warping
    as the distance measure between pairs of time series arrays
    
    Arguments
    ---------
    n_neighbors : int, optional (default = 5)
        Number of neighbors to use by default for KNN
        
    max_warping_window : int, optional (default = infinity)
        Maximum warping window allowed by the DTW dynamic
        programming function
            
    subsample_step : int, optional (default = 1)
        Step size for the timeseries array. By setting subsample_step = 2,
        the timeseries length will be reduced by 50% because every second
        item is skipped. Implemented by x[:, ::subsample_step]
    """
    
    def __init__(self, n_neighbors=5, max_warping_window=10000, subsample_step=1):
        self.n_neighbors = n_neighbors
        self.max_warping_window = max_warping_window
        self.subsample_step = subsample_step
    
    def fit(self, x, l):
        """Fit the model using x as training data and l as class labels
        
        Arguments
        ---------
        x : array of shape [n_samples, n_timepoints]
            Training data set for input into KNN classifer
            
        l : array of shape [n_samples]
            Training labels for input into KNN classifier
        """
        
        self.x = x
        self.l = l
        
    def _dtw_distance(self, ts_a, ts_b, d = lambda x,y: abs(x-y)):
        """Returns the DTW similarity distance between two 2-D
        timeseries numpy arrays.

        Arguments
        ---------
        ts_a, ts_b : array of shape [n_samples, n_timepoints]
            Two arrays containing n_samples of timeseries data
            whose DTW distance between each sample of A and B
            will be compared
        
        d : DistanceMetric object (default = abs(x-y))
            the distance measure used for A_i - B_j in the
            DTW dynamic programming function
        
        Returns
        -------
        DTW distance between A and B
        """

        # Create cost matrix via broadcasting with large int
        ts_a, ts_b = np.array(ts_a), np.array(ts_b)
        M, N = len(ts_a), len(ts_b)
        cost = sys.maxsize * np.ones((M, N))

        # Initialize the first row and column
        cost[0, 0] = d(ts_a[0], ts_b[0])
        for i in range(1, M):
            cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])

        for j in range(1, N):
            cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])

        # Populate rest of cost matrix within window
        for i in range(1, M):
            for j in range(max(1, i - self.max_warping_window),
                            min(N, i + self.max_warping_window)):
                choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
                cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])

        # Return DTW distance given window 
        return cost[-1, -1]
    
    def _dist_matrix(self, x, y):
        """Computes the M x N distance matrix between the training
        dataset and testing dataset (y) using the DTW distance measure
        
        Arguments
        ---------
        x : array of shape [n_samples, n_timepoints]
        
        y : array of shape [n_samples, n_timepoints]
        
        Returns
        -------
        Distance matrix between each item of x and y with
            shape [training_n_samples, testing_n_samples]
        """
        
        # Compute the distance matrix        
        dm_count = 0
        
        # Compute condensed distance matrix (upper triangle) of pairwise dtw distances
        # when x and y are the same array
        if(np.array_equal(x, y)):
            x_s = np.shape(x)
            dm = np.zeros((x_s[0] * (x_s[0] - 1)) // 2, dtype=np.double)
            
            p = ProgressBar(shape(dm)[0])
            
            for i in range(0, x_s[0] - 1):
                for j in range(i + 1, x_s[0]):
                    dm[dm_count] = self._dtw_distance(x[i, ::self.subsample_step],
                                                      y[j, ::self.subsample_step])
                    
                    dm_count += 1
                    if dm_count%100==0:
                        p.animate(dm_count)
            
            # Convert to squareform
            dm = squareform(dm)
            return dm
        
        # Compute full distance matrix of dtw distnces between x and y
        else:
            x_s = np.shape(x)
            y_s = np.shape(y)
            dm = np.zeros((x_s[0], y_s[0])) 
            dm_size = x_s[0]*y_s[0]
            
            p = ProgressBar(dm_size)
        
            for i in range(0, x_s[0]):
                for j in range(0, y_s[0]):
                    dm[i, j] = self._dtw_distance(x[i, ::self.subsample_step],
                                                  y[j, ::self.subsample_step])
                    # Update progress bar
                    dm_count += 1
                    if dm_count%100==0:
                        p.animate(dm_count)
        
            return dm
        
    def predict(self, x):
        """Predict the class labels or probability estimates for 
        the provided data

        Arguments
        ---------
          x : array of shape [n_samples, n_timepoints]
              Array containing the testing data set to be classified
          
        Returns
        -------
          2 arrays representing:
              (1) the predicted class labels 
              (2) the knn label count probability
        """
        
        dm = self._dist_matrix(x, self.x)

        # Identify the k nearest neighbors
        knn_idx = dm.argsort()[:, :self.n_neighbors]

        # Identify k nearest labels
        knn_labels = self.l[knn_idx]
        
        # Model Label
        mode_data = mode(knn_labels, axis=1)
        mode_label = mode_data[0]
        mode_proba = mode_data[1]/self.n_neighbors

        return mode_label.ravel(), mode_proba.ravel()

class ProgressBar:
    """This progress bar was taken from PYMC
    """
    def __init__(self, iterations):
        self.iterations = iterations
        self.prog_bar = '[]'
        self.fill_char = '*'
        self.width = 40
        self.__update_amount(0)
        if have_ipython:
            self.animate = self.animate_ipython
        else:
            self.animate = self.animate_noipython

    def animate_ipython(self, iter):
        print('\r', self)
        sys.stdout.flush()
        self.update_iteration(iter + 1)

    def update_iteration(self, elapsed_iter):
        self.__update_amount((elapsed_iter / float(self.iterations)) * 100.0)
        self.prog_bar += '  %d of %s complete' % (elapsed_iter, self.iterations)

    def __update_amount(self, new_amount):
        percent_done = int(round((new_amount / 100.0) * 100.0))
        all_full = self.width - 2
        num_hashes = int(round((percent_done / 100.0) * all_full))
        self.prog_bar = '[' + self.fill_char * num_hashes + ' ' * (all_full - num_hashes) + ']'
        pct_place = (len(self.prog_bar) // 2) - len(str(percent_done))
        pct_string = '%d%%' % percent_done
        self.prog_bar = self.prog_bar[0:pct_place] + \
            (pct_string + self.prog_bar[pct_place + len(pct_string):])

    def __str__(self):
        return str(self.prog_bar)

In [None]:
# organized function

def bradycardia_detection(input_trace_array,training_X_dir,training_Y_dir,n_neighbors0=7, max_warping_window0=40):
    """Predict whether the heart rate are bradycardia for the provided data

    Arguments
    ---------
      x : array of shape [n_samples, n_timepoints]
          Array containing the testing data set to be classified

    Returns
    -------
       bradycardia(1) or not(0)
       
    """
    
    X10 = np.load(training_X_dir)
    Y10 = np.load(training_Y_dir)
    
    clf = KnnDtw(n_neighbors=n_neighbors0, max_warping_window=40)
    clf.fit(X10, Y10)
    
    trace_list = []
    for t in range(0,input_trace_array.shape[0]):
        temp_trace = input_trace_array[t,:]/np.mean(input_trace_array[t,300:899])
        trace_list.append(temp_trace)
    npy_trace_list = np.array(trace_list)[:,800:1200]
        
    dectection_result, proba = clf.predict(npy_trace_list)

    return dectection_result

In [None]:
test = bradycardia_detection(np.array(x_test),'training_set_X.npy','training_set_Y.npy',n_neighbors0=7, max_warping_window0=40)

In [None]:
# cross validation

def cv_knndtw(X,Y,n_neighbors0=5, max_warping_window0=40,k=5,repeats=1):
    list_f1score = []
    list_accuracy = []
    for rep in range(0,repeats):
        KF = KFold(n_splits=k)
        for train_index, test_index in KF.split(X):
            X_train, X_test = X[train_index], X[test_index]
            Y_train, Y_test = Y[train_index], Y[test_index]
            clf_knndtw = KnnDtw(n_neighbors=n_neighbors0, max_warping_window=max_warping_window0)
            clf_knndtw.fit(np.array(X_train), np.array(Y_train))
            label, proba = clf_knndtw.predict(np.array(X_test))
            list_f1score.append(f1_score(Y_test,label))
            list_accuracy.append(accuracy_score(Y_test,label))
    return list_f1score,list_accuracy

In [None]:
dataset_list = ['HSA_Luminance_7','LCr_UV_HSA_Lum_Low_1','LCr_UV_HSA_Luminance_4','LCr_UV_HSA_Luminance_5','size_and_speed_raw','behavior_color']
X = []
Y = []
for dataset in dataset_list:
    trainingset_list = glob.glob('F:\\'+dataset+'\\*\\behavior\\*\\training_dataset*')
    for trainingset_dir in trainingset_list:
        trainingset_name = trainingset_dir.split('\\')[-1]
        temp_main_dir = trainingset_dir[:-len(trainingset_name)]
        if trainingset_name[-21:-13] =='20230312':
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'_new.xlsx'
        else:
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'.xlsx'
#         print(bradyinfo_dir)
        temp_heart = pd.read_excel(bradyinfo_dir,sheet_name = 'heart_rate_trace')
        temp_training_csv = pd.read_csv(trainingset_dir)
        if temp_training_csv.shape[0]>0:
            for i in range(0,temp_training_csv.shape[0]):
                trial_idx = temp_training_csv['trial index'][i]
                Y.append(int(temp_training_csv.Manual_Annotation[i]))
                
                temp_heart_i = temp_heart.iloc[trial_idx-1,:]/np.mean(temp_heart.iloc[trial_idx-1,300:899])
                fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

                plt.plot(temp_heart_i,linewidth = 3,c = 'b')
                x1 = np.arange(900,1000)
                plt.fill_between(x1,2,-2,linewidth = 1,color = 'violet',alpha = 0.3)
                plt.ylim(0,2)
                plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
                plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
                plt.ylabel('Normalized Heart Rate')
                plt.xlabel('frame')
                plt.title(trainingset_name[-21:-4]+' trial'+str(trial_idx)+', '+str(temp_training_csv.Manual_Annotation[i]))
                plt.show()
                X.append(temp_heart_i)

In [None]:
# get some trials examples for fig sup2
dataset_list = ['HSA_Luminance_7','LCr_UV_HSA_Lum_Low_1','LCr_UV_HSA_Luminance_4','LCr_UV_HSA_Luminance_5','size_and_speed_raw','behavior_color']
X = []
Y = []
for dataset in dataset_list:
    trainingset_list = glob.glob('F:\\'+dataset+'\\*\\behavior\\*\\training_dataset*')
    for trainingset_dir in trainingset_list:
        trainingset_name = trainingset_dir.split('\\')[-1]
        temp_main_dir = trainingset_dir[:-len(trainingset_name)]
        if trainingset_name[-21:-13] =='20230312':
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'_new.xlsx'
        else:
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'.xlsx'
#         print(bradyinfo_dir)
        temp_heart = pd.read_excel(bradyinfo_dir,sheet_name = 'heart_rate_trace')
        temp_training_csv = pd.read_csv(trainingset_dir)
        if temp_training_csv.shape[0]>0:
            for i in range(0,temp_training_csv.shape[0]):
                trial_idx = temp_training_csv['trial index'][i]
                Y.append(int(temp_training_csv.Manual_Annotation[i]))
                
                temp_heart_i = temp_heart.iloc[trial_idx-1,:]/np.mean(temp_heart.iloc[trial_idx-1,300:899])
                fig, (ax1) = plt.subplots(1, 1, figsize=(13, 6), sharex=True, sharey=True,dpi = 800)
                if temp_training_csv.Manual_Annotation[i] == True:
                    plt.plot(temp_heart_i,linewidth = 6,c = 'dodgerblue')
                if temp_training_csv.Manual_Annotation[i] == False:
                    plt.plot(temp_heart_i,linewidth = 6,c = 'goldenrod') 
                x1 = np.arange(900,1000)
                plt.fill_between(x1,2,-2,linewidth = 1,color = 'violet',alpha = 0.3)
                plt.ylim(0.7,1.2)
                
#                 plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
                plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
                plt.ylabel('Normalized Heart Rate')
                plt.xlabel('frame')
                print(str(temp_training_csv.Manual_Annotation[i]))
#                 ax1.axvline(x = 800,ymin = 0.1,ymax = 0.9,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axvline(x = 1200,ymin = 0.1,ymax = 0.9,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axhline(y = 1.8,xmin = 6/11,xmax = 10/11,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axhline(y = 0.2,xmin = 6/11,xmax = 10/11,color = 'grey',alpha = 0.4,linewidth =2)
#                 plt.title(trainingset_name[-21:-4]+' trial'+str(trial_idx)+', '+str(temp_training_csv.Manual_Annotation[i]))
                plt.axis('off')
                plt.xlim(400,1300)
                plt.show()
                X.append(temp_heart_i)

In [None]:
# get some trials examples for fig sup2
dataset_list = ['HSA_Luminance_7','LCr_UV_HSA_Lum_Low_1','LCr_UV_HSA_Luminance_4','LCr_UV_HSA_Luminance_5','size_and_speed_raw','behavior_color']
X = []
Y = []
for dataset in dataset_list:
    trainingset_list = glob.glob('F:\\'+dataset+'\\*\\behavior\\*\\training_dataset*')
    for trainingset_dir in trainingset_list:
        trainingset_name = trainingset_dir.split('\\')[-1]
        temp_main_dir = trainingset_dir[:-len(trainingset_name)]
        if trainingset_name[-21:-13] =='20230312':
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'_new.xlsx'
        else:
            bradyinfo_dir = temp_main_dir+'Bradyinfo_'+trainingset_name[-21:-4]+'.xlsx'
#         print(bradyinfo_dir)
        temp_heart = pd.read_excel(bradyinfo_dir,sheet_name = 'heart_rate_trace')
        temp_training_csv = pd.read_csv(trainingset_dir)
        if temp_training_csv.shape[0]>0:
            for i in range(0,temp_training_csv.shape[0]):
                trial_idx = temp_training_csv['trial index'][i]
                Y.append(int(temp_training_csv.Manual_Annotation[i]))
                
                temp_heart_i = temp_heart.iloc[trial_idx-1,:]/np.mean(temp_heart.iloc[trial_idx-1,300:899])
                fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True,dpi = 800)
                plt.plot(temp_heart_i,linewidth = 6,c = 'coral') 
                x1 = np.arange(900,1000)
                plt.fill_between(x1,2,-2,linewidth = 1,color = 'violet',alpha = 0.3)
                plt.ylim(0,2)
                
#                 plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
                plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
                plt.ylabel('Normalized Heart Rate')
                plt.xlabel('frame')
                print(str(temp_training_csv.Manual_Annotation[i]))
#                 ax1.axvline(x = 800,ymin = 0.1,ymax = 0.9,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axvline(x = 1200,ymin = 0.1,ymax = 0.9,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axhline(y = 1.8,xmin = 6/11,xmax = 10/11,color = 'grey',alpha = 0.4,linewidth =2)
#                 ax1.axhline(y = 0.2,xmin = 6/11,xmax = 10/11,color = 'grey',alpha = 0.4,linewidth =2)
#                 plt.title(trainingset_name[-21:-4]+' trial'+str(trial_idx)+', '+str(temp_training_csv.Manual_Annotation[i]))
                plt.axis('off')
                plt.xlim(200,1300)
                plt.show()
                X.append(temp_heart_i)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size = 0.2)

In [None]:
# test 1
m1 = KnnDtw(n_neighbors=5, max_warping_window=10)
m1.fit(np.array(x_train), np.array(y_train))
label1, proba1 = m1.predict(np.array(x_test))

print(classification_report(y_test,label1))
print(confusion_matrix(y_test, label1))

In [None]:
# test2
x_train2_cropped = np.array(x_train)[:,800:1200]
x_test2_cropped = np.array(x_test)[:,800:1200]

m2 = KnnDtw(n_neighbors=5, max_warping_window=10)
m2.fit(np.array(x_train2_cropped), np.array(y_train))
label2, proba2 = m2.predict(np.array(x_test2_cropped))
print(classification_report(y_test,label2))
print(confusion_matrix(y_test, label2))

In [None]:
# test 3
x_train3_cropped = np.array(x_train)[:,800:1200]
x_test3_cropped = np.array(x_test)[:,800:1200]

m3 = KnnDtw(n_neighbors=5, max_warping_window=100)
m3.fit(np.array(x_train3_cropped), np.array(y_train))
label3, proba3 = m3.predict(np.array(x_test3_cropped))
print(classification_report(y_test,label3))
print(confusion_matrix(y_test, label3))

In [None]:
# test 4
x_train4_cropped = np.array(x_train)[:,800:1200]
x_test4_cropped = np.array(x_test)[:,800:1200]

m4 = KnnDtw(n_neighbors=5, max_warping_window=150)
m4.fit(np.array(x_train4_cropped), np.array(y_train))
label4, proba4 = m4.predict(np.array(x_test4_cropped))
print(classification_report(y_test,label4))
print(confusion_matrix(y_test, label4))

In [None]:
# test 5
x_train5_cropped = np.array(x_train)[:,800:1200]
x_test5_cropped = np.array(x_test)[:,800:1200]

m5 = KnnDtw(n_neighbors=5, max_warping_window=50)
m5.fit(np.array(x_train5_cropped), np.array(y_train))
label5, proba5 = m5.predict(np.array(x_test5_cropped))
print(classification_report(y_test,label5))
print(confusion_matrix(y_test, label5))

In [None]:
# test 6
x_train6_cropped = np.array(x_train)[:,800:1200]
x_test6_cropped = np.array(x_test)[:,800:1200]

m6 = KnnDtw(n_neighbors=5, max_warping_window=30)
m6.fit(np.array(x_train6_cropped), np.array(y_train))
label6, proba6 = m6.predict(np.array(x_test6_cropped))
print(classification_report(y_test,label6))
print(confusion_matrix(y_test, label6))

In [None]:
# test 7
x_train7_cropped = np.array(x_train)[:,850:1150]
x_test7_cropped = np.array(x_test)[:,850:1150]

m7 = KnnDtw(n_neighbors=5, max_warping_window=30)
m7.fit(np.array(x_train7_cropped), np.array(y_train))
label7, proba7 = m7.predict(np.array(x_test7_cropped))
print(classification_report(y_test,label7))
print(confusion_matrix(y_test, label7))

In [None]:
# test 8
x_train8_cropped = np.array(x_train)[:,800:1200]
x_test8_cropped = np.array(x_test)[:,800:1200]

m8 = KnnDtw(n_neighbors=5, max_warping_window=40)
m8.fit(np.array(x_train8_cropped), np.array(y_train))
label8, proba8 = m8.predict(np.array(x_test8_cropped))
print(classification_report(y_test,label8))
print(confusion_matrix(y_test, label8))

In [None]:
temp_trial_no = 0
for i in np.where(label8==0)[0]:
    if bool(y_test[i]):
        temp_trial_no = temp_trial_no+1
        fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

        plt.plot(x_test[i],linewidth = 3,c = 'b')
        x1 = np.arange(900,1000)
        plt.fill_between(x1,2,-2,linewidth = 1,color = 'lightcoral',alpha = 0.2)
        plt.ylim(0,2)
        plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
        plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
        plt.ylabel('Normalized Heart Rate')
        plt.xlabel('frame')
        plt.title(str(not bool(y_test[i]))+' Trial')
        plt.show()
print('FN trials, in total '+str(temp_trial_no)+' trials')

In [None]:
temp_trial_no = 0
for i in np.where(label8==1)[0]:
    if not bool(y_test[i]):
        temp_trial_no = temp_trial_no+1
        fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

        plt.plot(x_test[i],linewidth = 3,c = 'b')
        x1 = np.arange(900,1000)
        plt.fill_between(x1,2,-2,linewidth = 1,color = 'lightcoral',alpha = 0.2)
        plt.ylim(0,2)
        plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
        plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
        plt.ylabel('Normalized Heart Rate')
        plt.xlabel('frame')
        plt.title(str(bool(y_test[i]))+' Trial')
        plt.show()
print('FP trials, in total '+str(temp_trial_no)+' trials')

In [None]:
list_f1score8,list_accuracy8 = cv_knndtw(np.array(X)[:,800:1200],np.array(Y),n_neighbors0=5, max_warping_window0=40,k=5,repeats=2)

In [None]:
print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy8), std(list_accuracy8)))
print('f1: %.3f (%.3f)' % (mean(list_f1score8), std(list_f1score8)))

In [None]:
list_accuracy8

In [None]:
list_f1score8

In [None]:
list_f1score8,list_accuracy8 = cv_knndtw(np.array(X)[:,800:1200],np.array(Y),n_neighbors0=5, max_warping_window0=40,k=10,repeats=1)
print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy8), std(list_accuracy8)))
print('f1: %.3f (%.3f)' % (mean(list_f1score8), std(list_f1score8)))

In [None]:
# test 9
x_train9_cropped = np.array(x_train)[:,850:1150]
x_test9_cropped = np.array(x_test)[:,850:1150]

m9 = KnnDtw(n_neighbors=5, max_warping_window=40)
m9.fit(np.array(x_train9_cropped), np.array(y_train))
label9, proba9 = m9.predict(np.array(x_test9_cropped))
print(classification_report(y_test,label9))
print(confusion_matrix(y_test, label9))

In [None]:
# test 10
x_train10_cropped = np.array(x_train)[:,800:1200]
x_test10_cropped = np.array(x_test)[:,800:1200]

m10 = KnnDtw(n_neighbors=7, max_warping_window=40)
m10.fit(np.array(x_train10_cropped), np.array(y_train))
label10, proba10 = m10.predict(np.array(x_test10_cropped))
print(classification_report(y_test,label10))
print(confusion_matrix(y_test, label10))

In [None]:
##### main classifier in the jupyter notebook
list_f1score10,list_accuracy10 = cv_knndtw(np.array(X)[:,800:1200],np.array(Y),n_neighbors0=7, max_warping_window0=40,k=10,repeats=1)
print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy10), std(list_accuracy10)))
print('f1: %.3f (%.3f)' % (mean(list_f1score10), std(list_f1score10)))
########

In [None]:
list_f1score10_1,list_accuracy10_1 = cv_knndtw(np.array(X)[:,800:1200],np.array(Y),n_neighbors0=1, max_warping_window0=40,k=10,repeats=3)
print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy10_1), std(list_accuracy10_1)))
print('f1: %.3f (%.3f)' % (mean(list_f1score10_1), std(list_f1score10_1)))

In [None]:
# test 11
x_train11_cropped = np.array(x_train)[:,800:1200]
x_test11_cropped = np.array(x_test)[:,800:1200]

m11 = KnnDtw(n_neighbors=9, max_warping_window=40)
m11.fit(np.array(x_train11_cropped), np.array(y_train))
label11, proba11 = m11.predict(np.array(x_test11_cropped))
print(classification_report(y_test,label11))
print(confusion_matrix(y_test, label11))

In [None]:
# test 12
# try normalize x 
x_train12_cropped = np.array(x_train)[:,800:1200]
x_test12_cropped = np.array(x_test)[:,800:1200]

for i in range(0,x_train12_cropped.shape[0]):
    temp_x = x_train12_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_train12_cropped[i,:] = temp_x
for i in range(0,x_test12_cropped.shape[0]):
    temp_x = x_test12_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_test12_cropped[i,:] = temp_x
    


m12 = KnnDtw(n_neighbors=5, max_warping_window=40)
m12.fit(np.array(x_train12_cropped), np.array(y_train))
label12, proba12 = m12.predict(np.array(x_test12_cropped))
print(classification_report(y_test,label12))
print(confusion_matrix(y_test, label12))

In [None]:
# test 13
# try normalize x 
x_train13_cropped = np.array(x_train)[:,850:1200]
x_test13_cropped = np.array(x_test)[:,850:1200]

for i in range(0,x_train13_cropped.shape[0]):
    temp_x = x_train13_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_train13_cropped[i,:] = temp_x
for i in range(0,x_test13_cropped.shape[0]):
    temp_x = x_test13_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_test13_cropped[i,:] = temp_x
    


m13 = KnnDtw(n_neighbors=5, max_warping_window=40)
m13.fit(np.array(x_train13_cropped), np.array(y_train))
label13, proba13 = m13.predict(np.array(x_test13_cropped))
print(classification_report(y_test,label13))
print(confusion_matrix(y_test, label13))

In [None]:
1

In [None]:
# test 14
x_train14_cropped = np.array(x_train)[:,800:1200]
x_test14_cropped = np.array(x_test)[:,800:1200]

for i in range(0,x_train14_cropped.shape[0]):
    temp_x = x_train14_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_train14_cropped[i,:] = temp_x
for i in range(0,x_test14_cropped.shape[0]):
    temp_x = x_test14_cropped[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    x_test14_cropped[i,:] = temp_x
    


m14 = KnnDtw(n_neighbors=7, max_warping_window=40)
m14.fit(np.array(x_train14_cropped), np.array(y_train))
label14, proba14 = m14.predict(np.array(x_test14_cropped))
print(classification_report(y_test,label14))
print(confusion_matrix(y_test, label14))

In [None]:
X14 = np.array(X)[:,800:1200]
Y14 = np.array(Y)

for i in range(0,X14.shape[0]):
    temp_x = X14[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    X14[i,:] = temp_x

list_f1score14,list_accuracy14 = cv_knndtw(X14,Y14,n_neighbors0=7, max_warping_window0=40,k=5,repeats=3)

print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy14), std(list_accuracy14)))
print('f1: %.3f (%.3f)' % (mean(list_f1score14), std(list_f1score14)))

In [None]:
X14 = np.array(X)[:,800:1200]
Y14 = np.array(Y)

for i in range(0,X14.shape[0]):
    temp_x = X14[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    X14[i,:] = temp_x

list_f1score14,list_accuracy14 = cv_knndtw(X14,Y14,n_neighbors0=7, max_warping_window0=40,k=10,repeats=3)

print('Accuracy: %.3f (%.3f)' % (mean(list_accuracy14), std(list_accuracy14)))
print('f1: %.3f (%.3f)' % (mean(list_f1score14), std(list_f1score14)))

In [None]:
# TEST CLF8 AND CLF14 in low elevation day1

main_dir = 'F:\\LCr_UV_HSA_Lum_Low_1\\'
heart_list = glob.glob(main_dir+'\\*\\behavior\\*\\Bradyinfo*new.xlsx')

trace_list = []
brady_list = []

for heart in heart_list:
    trace = pd.read_excel(heart,sheet_name = 'heart_rate_trace')
    Brady_info = pd.read_excel(heart,sheet_name = 'Bradyinfo')
    
    for t in range(0,Brady_info.shape[0]):
        if Brady_info.Video_Quality[t] == True:
            temp_trace = trace.iloc[t,:]/np.mean(trace.iloc[t,300:899])
            
            trace_list.append(temp_trace)
            
            brady_list.append(int(Brady_info.Manual_Annotation[t]))

np_trace = np.array(trace_list)[:,800:1200]
np_brady = np.array(brady_list)

In [None]:
# CLF8
X8 = np.array(X)[:,800:1200]
Y8 = np.array(Y)

m8 = KnnDtw(n_neighbors=5, max_warping_window=40)
m8.fit(X8, Y8)


label8_lowelev, proba8_lowelev = m8.predict(np_trace)
print(classification_report(np_brady,label8_lowelev))
print(confusion_matrix(np_brady, label8_lowelev))

In [None]:
# CLF10
X10 = np.array(X)[:,800:1200]
Y10 = np.array(Y)

m10 = KnnDtw(n_neighbors=7, max_warping_window=40)
m10.fit(X10, Y10)


label10_lowelev, proba10_lowelev = m10.predict(np_trace)
print(classification_report(np_brady,label10_lowelev))
print(confusion_matrix(np_brady, label10_lowelev))

In [None]:
print(accuracy_score(np_brady, label10_lowelev))
print(f1_score(np_brady, label10_lowelev, average='weighted'))

In [None]:
# CLF10_1
X10_1 = np.array(X)[:,800:1200]
Y10_1 = np.array(Y)

m10_1 = KnnDtw(n_neighbors=1, max_warping_window=40)
m10_1.fit(X10_1, Y10_1)


label10_1_lowelev, proba10_1_lowelev = m10_1.predict(np_trace)
print(classification_report(np_brady,label10_1_lowelev))
print(confusion_matrix(np_brady, label10_1_lowelev))

In [None]:
temp_trial_no = 0
for i in np.where(label10_lowelev==0)[0]:
    if bool(np_brady[i]):
        temp_trial_no = temp_trial_no+1
        fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

        plt.plot(trace_list[i],linewidth = 3,c = 'b')
        x1 = np.arange(900,1000)
        plt.fill_between(x1,2,-2,linewidth = 1,color = 'lightcoral',alpha = 0.2)
        plt.ylim(0,2)
        plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
        plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
        plt.ylabel('Normalized Heart Rate')
        plt.xlabel('frame')
        plt.title(str(not bool(np_brady[i]))+' Trial')
        plt.show()
print('FN trials, in total '+str(temp_trial_no)+' trials')

In [None]:
temp_trial_no = 0
for i in np.where(label10_lowelev==1)[0]:
    if not bool(np_brady[i]):
        temp_trial_no = temp_trial_no+1
        fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

        plt.plot(trace_list[i],linewidth = 3,c = 'b')
        x1 = np.arange(900,1000)
        plt.fill_between(x1,2,-2,linewidth = 1,color = 'lightcoral',alpha = 0.2)
        plt.ylim(0,2)
        plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
        plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
        plt.ylabel('Normalized Heart Rate')
        plt.xlabel('frame')
        plt.title(str(bool(np_brady[i]))+' Trial')
        plt.show()
print('FP trials, in total '+str(temp_trial_no)+' trials')

In [None]:
# CLF14
X14 = np.array(X)[:,800:1200]
Y14 = np.array(Y)

for i in range(0,X14.shape[0]):
    temp_x = X14[i,:]
    temp_x = (temp_x-np.min(temp_x))/(np.max(temp_x)-np.min(temp_x))
    X14[i,:] = temp_x
    
m14 = KnnDtw(n_neighbors=7, max_warping_window=40)
m14.fit(X14, Y14)
label14_lowelev, proba14_lowelev= m14.predict(np_trace)
print(classification_report(np_brady,label14_lowelev))
print(confusion_matrix(np_brady,label14_lowelev))

In [None]:
X8 = np.array(X)[:,800:1200]

In [None]:
X8

In [None]:
X14

In [None]:
# low elev2 blind test
import random
import shutil

main_dir = 'F:\\LCr_UV_HSA_Lum_Low_2\\'

bradyinfo_list = glob.glob(main_dir+'*\\behavior\\*\\Bradyinfo*.xlsx')

df_sampling_result = pd.DataFrame(columns = ['video_idx','Bradycardia_auto','threh_4perc'])
df_sampling_result['video_idx'] = range(0,80)
video_idx = 0

for bradyinfo_dir in bradyinfo_list:
    Brady_info = pd.read_excel(bradyinfo_dir,sheet_name = 'Bradyinfo')
    heart_trace = pd.read_excel(bradyinfo_dir,sheet_name = 'heart_rate_trace')
    Brady_classification = Brady_info['Bradycardia_Classification']
    Brady_1_idx = np.where(Brady_classification == 1)[0]
    Brady_0_idx = np.where(Brady_classification == 0)[0]
    if Brady_1_idx.shape[0]>3:
        sample_idx = np.sort(random.sample(list(Brady_1_idx), 4)+random.sample(list(Brady_0_idx), 4))
    else:
        sample_idx = np.sort(random.sample(list(Brady_1_idx), Brady_1_idx.shape[0])+random.sample(list(Brady_0_idx), 8-Brady_1_idx.shape[0]))
    for idx in sample_idx:
        df_sampling_result['Bradycardia_auto'][video_idx] = Brady_classification[idx]
        
        
        
        temp_heart = heart_trace.iloc[idx,:]
        temp_heart = temp_heart/np.mean(temp_heart[300:899])
        
        if np.min(temp_heart[900:1100])<0.96:
            df_sampling_result['threh_4perc'][video_idx] = 1
        else:
            df_sampling_result['threh_4perc'][video_idx] = 0
        
        date = bradyinfo_dir.split('\\')[2]
        fish = bradyinfo_dir.split('\\')[4]
        source_path = bradyinfo_dir[:-32]+date+'_'+fish+'_heart_'+str(idx+1)+'.avi'
        target_path = main_dir +'\\blind_test\\video_'+str(video_idx)+'.avi'
        shutil.copyfile(source_path, target_path)
        
        fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6))
        ax1.grid(False)
        plt.axis('off')
        plt.plot(temp_heart,linewidth = 4,c = 'navy')
        x1 = np.arange(900,1000)
        plt.fill_between(x1,1.25,0.7,linewidth = 1,color = 'lightcoral',alpha = 0.2)
        plt.ylim(0.5,1.5)
        plt.yticks([0.5,0.75,1,1.25,1.5],fontsize = 22)
#         plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
        plt.xticks([0,200,400,600,800,1000,1200])  # Set label locations.
        ax1.set_xticklabels([0,2,4,6,8,10,12],fontsize = 22)
        plt.ylabel('Normalized Heart Rate',fontsize = 24)
        plt.xlabel('Seconds',fontsize = 24)
#         plt.savefig(main_dir+'\\blind_test\\heart_rate_'+str(video_idx)+'.png')
        plt.show()
        
        
        
        video_idx = video_idx+1

In [None]:
auto_csv = pd.read_csv('F:\\LCr_UV_HSA_Lum_Low_2\\blind_test\\blind_test_auto.csv')
manual_csv = pd.read_csv('F:\\LCr_UV_HSA_Lum_Low_2\\blind_test\\blind_test_manual.csv')
test_info = np.array(manual_csv['Bradycardia_manual'])
KNN_label = np.array(auto_csv['Bradycardia_auto'])
threshold4_label = np.array(auto_csv['threh_4perc'])



In [None]:
print(classification_report(test_info ,threshold4_label))
print(confusion_matrix(test_info ,threshold4_label))

In [None]:
print(accuracy_score(test_info ,threshold4_label))
print(f1_score(test_info ,threshold4_label, average='weighted'))

In [None]:
print(classification_report(test_info ,KNN_label))
print(confusion_matrix(test_info ,KNN_label))

In [None]:
print(accuracy_score(test_info ,KNN_label))
print(f1_score(test_info ,KNN_label, average='weighted'))

In [None]:
conf_matrix = np.array([[0.82, 0.18 ], [ 0.06, 0.94]])

fig, ax = plt.subplots(figsize=(5.8, 5.8),dpi =800)
ax.matshow(conf_matrix, cmap=plt.cm.Blues, alpha=0.4)

ax.text(x=0, y=0,s='TPR: 82%', va='center', ha='center', size=22)
ax.text(x=0, y=1,s='FPR: 6%', va='center', ha='center', size=22)
ax.text(x=1, y=0,s='FNR: 18%', va='center', ha='center', size=22)
ax.text(x=1, y=1,s='TNR: 94%', va='center', ha='center', size=22)

plt.xticks([0,1],['Bradycardia','Non-Bradycardia'],fontsize = 20)
plt.yticks([0,1],['Bradycardia','Non-Bradycardia'],rotation = 90,va = 'center',fontsize = 20)
plt.grid(False)
ax.tick_params(axis=u'both', which=u'both',length=0)
plt.xlabel('Predictions', fontsize=22)
ax.xaxis.set_label_position('top') 
plt.ylabel('Actuals', fontsize=22)
# plt.title('Confusion Matrix', fontsize=18)
plt.show()

In [None]:
import random
np.sort(random.sample(list(Brady_1_idx), 4)+random.sample(list(Brady_0_idx), 4))

In [None]:
bradyinfo_dir[:-32]

In [None]:
# draw the illustration
X = np.load('training_set_X.npy')
Y = np.load('training_set_Y.npy')

In [None]:
nonbrady_idx = np.where(Y ==0)[0][:3]
brady_idx = np.where(Y ==1)[0][:3]

In [None]:
for idx in list(nonbrady_idx)+list(brady_idx):
    fig, (ax1) = plt.subplots(1, 1, figsize=(16, 6), sharex=True, sharey=True)

    plt.plot(X[idx,:],linewidth = 3,c = 'b')
    x1 = np.arange(900,1000)
    plt.fill_between(x1,2,-2,linewidth = 1,color = 'lightcoral',alpha = 0.2)
    plt.ylim(0,2)
    #         plt.axhline(y = 0.8,xmin = 0, xmax =1 , alpha = 0.4)
    plt.xticks([0,900,1000,1100,1200,1300])  # Set label locations.
    plt.ylabel('Normalized Heart Rate')
    plt.xlabel('frame')
    plt.show()