In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score

from sklearn import preprocessing
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KNeighborsClassifier

## Task 1: 1-NN DTW

1. Use the DTW function in the notebook to implement a simple 1-NN classifier for time-series data.

In [2]:
#Load the data set.
df = pd.read_csv("UMD_TEST.txt",error_bad_lines=False ,header=None, delim_whitespace=True)
df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,141,142,143,144,145,146,147,148,149,150
0,1.0,0.017644,0.030949,0.050555,0.044484,0.053277,0.041576,0.030947,0.027086,0.013764,...,0.024575,0.033780,0.026589,0.013932,0.024928,0.022589,0.038248,0.049838,0.053419,0.040420
1,1.0,0.041296,0.003551,0.027470,0.013158,0.009571,0.008074,0.043743,0.040592,0.012190,...,0.060539,0.046991,0.023586,0.001562,-0.002196,0.036730,0.039027,0.007754,0.004697,0.031440
2,1.0,-0.000720,0.013283,0.029450,0.045201,0.006317,0.018805,0.028901,0.013832,0.015240,...,0.016442,0.039508,0.015171,0.034708,0.010835,0.002942,0.006924,0.029502,0.040786,0.023144
3,1.0,0.005201,0.013363,0.025733,0.026653,0.038946,0.012494,0.028303,0.032011,0.009467,...,0.006383,0.037448,0.044335,0.011143,-0.003624,0.001467,0.020991,0.027675,0.001621,0.015858
4,1.0,0.022926,0.027036,0.011668,0.019500,0.036049,-0.001297,0.019717,0.039583,0.020628,...,0.026997,0.036653,0.018117,0.018314,0.012536,0.040599,0.016590,0.032730,0.002498,0.011260
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
139,3.0,0.008298,0.017240,0.010756,0.049488,0.005331,0.036829,0.045530,0.046181,0.044288,...,0.015062,0.028319,0.028443,0.051677,0.036134,0.038030,0.047388,0.006875,0.052691,0.051594
140,3.0,0.004801,0.042964,0.034519,-0.003426,-0.003353,0.005231,0.002309,0.029905,0.011177,...,0.017705,0.045264,0.041434,0.040670,0.034613,0.001551,0.017716,0.001042,0.047787,0.020318
141,3.0,0.033658,0.038235,0.009848,0.016061,0.026287,0.029979,0.024099,0.023647,0.017566,...,0.019306,0.022595,0.014942,0.009259,0.019186,0.035905,0.031095,0.006592,0.017948,-0.003614
142,3.0,0.022805,0.020270,0.015546,0.038309,0.016303,0.011456,0.018286,0.015730,0.007319,...,0.013016,0.030138,0.025736,0.005955,0.050157,0.014435,0.005638,0.036126,0.010751,-0.005682


In [3]:
#We will define y and X
y = pd.DataFrame(df[0])
X = df.drop(columns=0)

In [4]:
#Here we will split the whole dataset into 70% training set and 30% testing set, and set the random_state = 1
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,random_state=1)

print("original range is: ",df.shape[0])
print("training range (70%):\t rows 0 to", round(X_train.shape[0]))
print("test range (30%): \t rows", round(X_train.shape[0]), "to", round(X_train.shape[0]) + X_test.shape[0])

original range is:  144
training range (70%):	 rows 0 to 100
test range (30%): 	 rows 100 to 144


In [5]:
#Reset the index
X_train.reset_index(drop=True, inplace=True)
y_train.reset_index(drop=True, inplace=True)
X_test.reset_index(drop=True, inplace=True)
y_test.reset_index(drop=True, inplace=True)

In [6]:
#Code from https://towardsdatascience.com/dynamic-time-warping-3933f25fcdd
#This code implements a simple DTW measure between two series represented as lists.

def dtw(s, t, window):
    n, m = len(s), len(t)
    w = np.max([window, abs(n-m)]) # warping cannot be less than the difference in lengths. 
    dtw_matrix = np.zeros((n+1, m+1))
    
    for i in range(n+1):
        for j in range(m+1):
            dtw_matrix[i, j] = np.inf
    dtw_matrix[0, 0] = 0
    
    for i in range(1, n+1):
        for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
            dtw_matrix[i, j] = 0
    
    for i in range(1, n+1):
        for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
            cost = abs(s[i-1] - t[j-1])
            # take last min from a square box
            last_min = np.min([dtw_matrix[i-1, j], dtw_matrix[i, j-1], dtw_matrix[i-1, j-1]])
            dtw_matrix[i, j] = cost + last_min
    return dtw_matrix[-1,-1]

In [7]:
#Define a class called "my1nn()"
class my1nn():
    def __init__(self):
        self.X = None
        self.y = None
    
    def dtw(self, s, t, window):
        n, m = len(s), len(t)
        w = np.max([window, abs(n-m)]) # warping cannot be less than the difference in lengths. 
        dtw_matrix = np.zeros((n+1, m+1))
    
        for i in range(n+1):
            for j in range(m+1):
                dtw_matrix[i, j] = np.inf
        dtw_matrix[0, 0] = 0
    
        for i in range(1, n+1):
            for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
                dtw_matrix[i, j] = 0

        for i in range(1, n+1):
            for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
                cost = abs(s[i-1] - t[j-1])
                # take last min from a square box
                last_min = np.min([dtw_matrix[i-1, j], dtw_matrix[i, j-1], dtw_matrix[i-1, j-1]])
                dtw_matrix[i, j] = cost + last_min
        return dtw_matrix[-1,-1]

    #Pass the value of X, y to the class.
    def fit(self, X, y):
        self.X = X
        self.y = y
    
    #Define a function to predict the values in the testing set.
    def cal_distance(self, X, window):
        #"result_container" to store the predicting results.
        result_container = []
        
        for j in range(len(X)):
            #"Container" is created to store the distrance values from the testing element to all the training elements.
            container = []
            test_element = X.loc[j].tolist()

            for i in range(len(self.X)):
                #Append all the distance values to "container" list.
                train_element = self.X.loc[i].tolist()
                distance = self.dtw(test_element,train_element,window)
                container.append(distance)
            
            #Find the shortest distance in the 'container' list and get the index.
            index_number = container.index(min(container))
            result_container.append(self.y.loc[index_number].values[0])
        
        return(pd.DataFrame(result_container))



In [8]:
#Call the function in the class to predict the results.
m = my1nn()
m.fit(X_train, y_train)
y_result = m.cal_distance(X_test, 3)
y_result

Unnamed: 0,0
0,2.0
1,2.0
2,2.0
3,2.0
4,2.0
5,1.0
6,1.0
7,1.0
8,3.0
9,1.0


In [9]:
y_test

Unnamed: 0,0
0,2.0
1,2.0
2,2.0
3,2.0
4,2.0
5,1.0
6,1.0
7,1.0
8,3.0
9,3.0


2. Test the performance

In [10]:
#Create a funtion to test the performance.
def test_perf(test,predict):
    acc = accuracy_score(test,predict)
    print("Accuracy: {0:.2f}".format(acc))
    confusion = confusion_matrix(test,predict)
    print("Confusion matrix:\n{}".format(confusion))

In [11]:
test_perf(y_test,y_result)

Accuracy: 0.95
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 2  0  9]]


The accuracy of the self-designed 1nn model ('my1nn' class) is 0.95; The confusion matrix shows that 18 elements are correctly classify as class 1, 15 elements correctly classify as class 2, and 9 elements are correctly classify as class 3 but 2 elements are classified as class 1, which should be in class 3.

3. Given that the time-series are all the same length (150 ticks) Euclidean distance can also be used as distance metric. Compare the 1-NN DTW performance with 1-NN Euclidean. Use the scikit-learn implementation for the Euclidean model.

In [12]:
# Default 1-NN metric is Minkowski with p = 2, i.e. Euclidean

#Create a KNN classifier
forecast_KNN = KNeighborsClassifier(n_neighbors=1,metric='euclidean') 
forecast_KNN.fit(X_train,y_train)

  return self._fit(X, y)


KNeighborsClassifier(metric='euclidean', n_neighbors=1)

In [13]:
#Use knn classifier to predict the result of X_test.
y_result_knn = forecast_KNN.predict(X_test)
y_result_knn

array([2., 2., 2., 2., 2., 1., 1., 1., 3., 1., 1., 1., 2., 3., 2., 1., 2.,
       3., 1., 2., 1., 1., 2., 2., 3., 2., 3., 1., 1., 1., 3., 3., 2., 1.,
       2., 2., 3., 2., 1., 1., 3., 1., 1., 3.])

In [14]:
#Check the performance
test_perf(y_test,y_result_knn)

Accuracy: 0.95
Confusion matrix:
[[17  1  0]
 [ 0 15  0]
 [ 1  0 10]]


As we see from the performance above, the 1-NN Euclidean model shares the same accuracy with the self-designed 1NN model, which is 0.95. When 1 comes to the confusion matrix, 1 element belongs to the Class 1 was classified to Class 2 mistakenly, and 1 element belongs to the Class 3 was classified to Class 1 mistakenly.

4. Find the best value for the window parameter for this dataset.

In [15]:
#Create a list from 1 to 10, and find the optimal value for the window.
for i in list(range(1,11,1)):
    m = my1nn()
    m.fit(X_train, y_train)
    y_result = m.cal_distance(X_test, window=i)
    
    print("The performance when window = ",i,":")
    test_perf(y_test,y_result)

The performance when window =  1 :
Accuracy: 0.95
Confusion matrix:
[[17  1  0]
 [ 0 15  0]
 [ 1  0 10]]
The performance when window =  2 :
Accuracy: 0.98
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 1  0 10]]
The performance when window =  3 :
Accuracy: 0.95
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 2  0  9]]
The performance when window =  4 :
Accuracy: 0.98
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 1  0 10]]
The performance when window =  5 :
Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]
The performance when window =  6 :
Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]
The performance when window =  7 :
Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]
The performance when window =  8 :
Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]
The performance when window =  9 :
Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]
The performance when window =  10 :
Accuracy: 1.00
Conf

We can see that when the window value went beyond 5, the Accuracy reached to 100%. Therefore, a window value that greater than 5 could bring us the optimal accuracy for the provided dataset using the self-designed 1NN model.

## Task 2: k-NN DTW

1. The k-NN classifier in scikit-learn has a metric parameter that allows foruser-defined distance metrics. Adapt the DTW code provided so that it can be incorporated in a scikit-learn k-NN classifier.

In [51]:
#Based on the fingings about the optimal window value, we update our DTW function
def dtw(s, t, window=5):
    n, m = len(s), len(t)
    w = np.max([window, abs(n-m)]) # warping cannot be less than the difference in lengths. 
    dtw_matrix = np.zeros((n+1, m+1))
    
    for i in range(n+1):
        for j in range(m+1):
            dtw_matrix[i, j] = np.inf
    dtw_matrix[0, 0] = 0
    
    for i in range(1, n+1):
        for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
            dtw_matrix[i, j] = 0
    
    for i in range(1, n+1):
        for j in range(np.max([1, i-w]), np.min([m, i+w])+1):
            cost = abs(s[i-1] - t[j-1])
            # take last min from a square box
            last_min = np.min([dtw_matrix[i-1, j], dtw_matrix[i, j-1], dtw_matrix[i-1, j-1]])
            dtw_matrix[i, j] = cost + last_min
    return dtw_matrix[-1,-1]

In [52]:
# create a 1NN classifier that adapt the DTW code
forecast_KNN_new = KNeighborsClassifier(n_neighbors=1,metric=dtw) 
forecast_KNN_new.fit(X_train,y_train)

  return self._fit(X, y)


KNeighborsClassifier(metric=<function dtw at 0x7fda6a953d30>, n_neighbors=1)

In [53]:
#Predict the result using the DTW incorporated KNN model
y_result_knn_new = forecast_KNN_new.predict(X_test)
y_result_knn_new

array([2., 2., 2., 2., 2., 1., 1., 1., 3., 3., 1., 1., 2., 3., 2., 1., 2.,
       3., 1., 2., 1., 1., 2., 2., 3., 2., 3., 1., 1., 1., 3., 3., 1., 1.,
       2., 2., 3., 2., 1., 1., 3., 1., 1., 3.])

2. Test the performance of this classifier and compare with the 1-NN results from Task 1. Verify that the 1-NN results are consistent.

In [28]:
#The peformance of self-defined 1NN model when window = 5
m = my1nn()
m.fit(X_train, y_train)
y_result = m.cal_distance(X_test, window=5)
test_perf(y_test,y_result)

Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]


In [54]:
#The performance of DTW-incorporated model.
test_perf(y_test,y_result_knn_new)

Accuracy: 1.00
Confusion matrix:
[[18  0  0]
 [ 0 15  0]
 [ 0  0 11]]


From the perfomance above, we can see that the accuracy value and the Confusion matrix are exactly the same. It proves that our self-designed 1NN classifier model functions in the same way as the KNN classifier model.

3. Compare with k-NN Euclidean.

In [55]:
#Create a KNN classifier and set the n_neighbor as 3.
forecast_3NN = KNeighborsClassifier(n_neighbors=3,metric='euclidean') 
forecast_3NN.fit(X_train,y_train)

#Predict the result
y_result_3NN = forecast_3NN.predict(X_test)

  return self._fit(X, y)


In [56]:
# create a 3NN classifier that adapt the DTW code
forecast_3NN_DTW = KNeighborsClassifier(n_neighbors=3,metric=dtw) 
forecast_3NN_DTW.fit(X_train,y_train)

#Predict the result
y_result_3NN_DTW = forecast_3NN_DTW.predict(X_test)

  return self._fit(X, y)


In [57]:
#Check the performance
test_perf(y_test,y_result_3NN)

Accuracy: 0.89
Confusion matrix:
[[16  2  0]
 [ 0 15  0]
 [ 1  2  8]]


In [58]:
test_perf(y_test,y_result_3NN_DTW)

Accuracy: 0.95
Confusion matrix:
[[18  0  0]
 [ 0 14  1]
 [ 0  1 10]]


From the performance above, we can see that the DTW-incorporated classifier has better performance than the Euclidean KNN classifier when we set the n to 3.