In [1]:
import sys
import time

import math
import numpy as np
import matplotlib.pyplot as plt

In [2]:
import sys
import os
import pandas as pd
import numpy as np

module_path = os.path.abspath(os.path.join('..'))

sys.path.insert(1, module_path + '/src')
import audio_time_series_classification as preproject
import utility

sys.path.insert(1, module_path + '/src/models/')
import helper

from sklearn.decomposition import PCA
from sktime.utils.data_io import load_from_tsfile_to_dataframe
from sklearn.feature_selection import SelectKBest, chi2
from sktime.utils.data_processing import from_nested_to_2d_array
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
import matplotlib.pyplot as plt
from sklearn.cross_decomposition import PLSRegression

figure_path = module_path + '/figures/'

In [3]:
ts_path = module_path + '/data/ts_files/crackleNoCrackle_FULL.ts'
#NB ! This file has already been downsampled and denoised before saving
kwargs_ls = {
    'ts_file_path': ts_path,
    'sr' : 8000,
    'UCR_file': False,
    'n_mfcc' : 30,
    'denoise' : False,
    'downsample' : False,
    'downsample_new_sr' : 8000,
    'update' : False,
    'name' : 'lungsound',
    'module_path' : module_path
}

ls = preproject.overproduced_audio_time_series_features(**kwargs_ls)
X_train, X_test, y_train, y_test = ls.get_X_y_transformed(train_test_split = True)





In [13]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, train_size=0.75, random_state=1)

indices_0 = np.where(y_train.astype(int) == 0)[0]
indices_1 = np.where(y_train.astype(int) == 1)[0]
indices = np.concatenate((indices_0[0:1929], indices_1))

In [5]:
def sort_it(list_to_sort):    
    try:
        for i in range(len(list_to_sort)):
            for j in range(len(list_to_sort) - 1):
                if list_to_sort[j] > list_to_sort[j+1]:
                    list_to_sort[j], list_to_sort[j + 1] = list_to_sort[j + 1], list_to_sort[j]
                
        return list_to_sort
    except:
        print(sys.exc_info())
        
def run_prediction(X_train, y_train, X_test, y_test, k=1):
    
    try:
        predicted_list = []
    
        # training on X_train, y_train by calculating Euclidean distances
        for d in range(len(X_test)):
            eucl_distances = []
            for i in range(len(X_train)):
                #eucl_distances.append([np.sqrt(np.sum(np.power(X_test[d,:] - X_train[i,:], 2))), i])
                eucl_distances.append([np.linalg.norm(X_test[d,:] - X_train[i,:]), i]) #used linalg for faster calculation time
                
            eucl_distances = sort_it(eucl_distances)
            #print(eucl_distances)
    
            targets = [y_train[eucl_distances[i][1]] for i in range(k)]
            #print(targets)
        
            predicted_list.append(max(targets, key=targets.count))

        #print(predicted_list)
        prediction['predicted_list'] = predicted_list
        prediction['accuracy'] = np.mean(prediction['predicted_list'] == y_test)
        return prediction
    
    except:
        print(sys.exc_info())

In [8]:
try :
    nn = run_prediction(X_train.iloc[indices].values, y_train.iloc[indices].values,
                        X_val.iloc[indices].values, y_val.iloc[indices].values, k=1)
    print(nn["accuracy"])
except:
    print(sys.exc_info())

(<class 'KeyboardInterrupt'>, KeyboardInterrupt(), <traceback object at 0x7ff2d1fe3690>)
(<class 'TypeError'>, TypeError("'NoneType' object is not subscriptable"), <traceback object at 0x7ff33bfedeb0>)
(<class 'TypeError'>, TypeError("'NoneType' object is not subscriptable"), <traceback object at 0x7ff33bfedeb0>)


In [18]:
def calculate_distance(X_tr, y_tr):
    train_length = X_tr.shape[0]
    same_class_dist, other_class_dist = [[math.inf for i in range(train_length)] for j in range(2)]

    for i in range(train_length-1):
        for j in range(i+1,train_length):
            distance = np.linalg.norm(X_tr[i]-X_tr[j])

            if y_tr[i]==y_tr[j]:
                if distance < same_class_dist[i]:
                    same_class_dist[i] = distance
                if distance < same_class_dist[j]:
                    same_class_dist[j] = distance
            else:
                if distance < other_class_dist[i]:
                    other_class_dist[i] = distance
                if distance < other_class_dist[j]:
                    other_class_dist[j] = distance

    return [same_class_dist, other_class_dist]

def conformal(X_train, y_train, X_test, y_test):
    predicted_list, p_values = [[] for i in range(2)]
    lenrange = len(list(set(y_train)))
    same_class_dist, other_class_dist = calculate_distance(X_train, y_train)

    for i in range(len(X_test)):
        conformity_scores = [[] for j in range(lenrange)]
        curr_testXval = X_test[i]
        for j in range(lenrange):
            new_same_dist = np.append(same_class_dist, math.inf)
            new_other_class_dist = np.append(other_class_dist, math.inf)
            extended_X = np.concatenate((X_train, [curr_testXval]), axis = 0)
            extended_y = np.concatenate((y_train, [j]), axis = 0)

            for curr_idx, curr_elem in enumerate(extended_X):
                distance = np.linalg.norm(curr_elem - curr_testXval)
                idx = len(extended_X)-1

                if distance != 0: #to avoid duplicate value
                    if j == extended_y[curr_idx]:
                        if distance < new_same_dist[idx]:
                            new_same_dist[idx] = distance
                    else:
                        if distance < new_other_class_dist[idx]:
                            new_other_class_dist[idx] = distance

                if new_same_dist[curr_idx] == 0: #to avoid duplicate value
                    conformity_scores[j].append(0)
                else:
                    conformity_scores[j].append(new_other_class_dist[curr_idx]/new_same_dist[curr_idx])

        p_vals = []
        for k in range(lenrange):
            p_vals.append(np.mean(conformity_scores[k]<=conformity_scores[k][X_train.shape[0]]))

        predicted_list.append(p_vals.index(max(p_vals)))
        p_values.append(p_vals)

    falsep = []
    for i, p in enumerate(p_values):
        sumval = 0;
        for j, q in enumerate(p):
            if j != y_test[i]:
                sumval += q
        falsep.append(sumval)

    false_p_value = np.sum(falsep)/(len(falsep)*2)
    accuracy = np.mean(predicted_list == y_test)

    print(
          "The average false p-value : {} \n"
          "The accuracy of prediction : {} \n"
          "The test error rate is : {}"
          .format(false_p_value, accuracy, 1-accuracy))

In [19]:
start_time = time.time()
try :
    conformal(X_train.iloc[indices].values, y_train.iloc[indices].values,
              X_val.values, y_val.values)
except:
    print(sys.exc_info()) 
print("\nTime elapsed : %s seconds" % (time.time() - start_time))

The average false p-value : 0.0003305785123966942 
The accuracy of prediction : 0.0 
The test error rate is : 1.0

Time elapsed : 190.9539680480957 seconds
