In [20]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
import sys
from numpy.linalg import norm
from operator import itemgetter
from fastdtw import fastdtw
import numbers
from sklearn.base import BaseEstimator
%run ./prepare.ipynb

In [21]:
def euclidean(u, v):
    if isinstance(u, numbers.Number):
        return abs(u - v)
    return norm(u - v, ord=2)

In [25]:
class KNN:
    
    def __init__(self, max_warping_window=20, use_emg=False):
        self.max_warping_window = max_warping_window
        self.use_emg = use_emg
    
    def fit(self, X, y):
        self.X = X.copy()
        if self.use_emg:
            cutted_X = [] 
            for t_series in X.copy():
                cutted_X.append(threshold_cut(t_series.copy()))
            self.X = cutted_X
            
        self.y = y.copy()
        assert(len(self.X) == len(self.y))
    
    def _DTW(self, ts_a, ts_b, d = euclidean):
        # Create cost matrix via broadcasting with large int
        ts_a, ts_b = np.array(ts_a), np.array(ts_b)
        M, N = len(ts_a), len(ts_b)
        cost = sys.maxsize * np.ones((M, N))

        # Initialize the first row and column
        cost[0, 0] = d(ts_a[0], ts_b[0])
        for i in range(1, M):
            cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])

        for j in range(1, N):
            cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])

        # Populate rest of cost matrix within window
        for i in range(1, M):
            for j in range(max(1, i - self.max_warping_window),
                            min(N, i + self.max_warping_window)):
                choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
                cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])

        # Return DTW distance given window 
        return cost[-1, -1]
    
    def _DTW_distances(self, X_test):
        n_samples = len(self.X)
        n_tests = len(X_test)
        distances = []
        
        for i in range(n_tests):
            distance = []
            for j in range(n_samples):
                x_test = X_test[i].copy()
                if self.use_emg:
                    x_test = threshold_cut(x_test)
                distance.append(self._DTW(x_test, self.X[j]))
            distances.append(np.array(distance, dtype='float64'))
        
        return np.array(distances)
                
    
    def predict(self, X_test):
        distances = self._DTW_distances(X_test.copy())
        indices = np.argsort(distances, axis=1)
        
        res_list = []
        for i in range(len(indices)):
            res_list.append(list(itemgetter(*indices[i])(self.y)))
        
        return res_list

In [26]:
class CrossValKNN(BaseEstimator, KNN):
    
    def predict(self, X_test):
        distances = self._DTW_distances(X_test.copy())
        indices = np.argsort(distances, axis=1)
        
        res_list = []
        for i in range(len(indices)):
            res_list.append(list(itemgetter(*indices[i])(self.y))[0])
        
        return res_list
    
    def get_params(self, deep=True):
        return {
                'max_warping_window': self.max_warping_window
               }