In [1]:
#made by: Eloise Bisbee 8/12/19 eloise.bisbee@tufts.edu
#This program is a handwritten k nearest neighbors classifier
#that takes data from clean_comedy_data_ts.csv and trains and
#tests the classifier.

#it is expected that_clean_comedy_data_ts.csv has columns:
#PerformanceId, JokeId, TimeStamp, Pitch,Intensity, HumanScore, HumanScorePostJokeOnly

#the program prints the percent of the test data correctly classified
#in decimal form

import numpy as np
import matplotlib.pyplot as plt
import csv
from matplotlib.colors import ListedColormap
from sklearn import neighbors, datasets
from scipy.spatial.distance import euclidean
from sklearn.model_selection import train_test_split
from fastdtw import fastdtw

#this function constructs a similarity matrix of elements
#of the X array passed in, which is expected to have time
#series data. The similarity matrix is constructed based
#on euclidean dynamic time warping distance, and is returned
def SimMatrix(X):
    sim_matrix = []
    for i in range(0, len(X)):
        sim_matrix.append([0] * len(X))
    for i in range(0, len(X)):
        #print(str(i))
        for j in range(0, len(X)):
            distance, path = fastdtw(np.array(X[i]), np.array(X[j]), dist=euclidean)
            sim_matrix[i][j] = distance
            #print(distance)
    return sim_matrix

#adapted from https://www.geeksforgeeks.org/python-program-for-insertion-sort/
#this program takes an array arr and a secondary array y that are expected
#to be an array of distances to be ordered in sorted order (arr) and then 
#an array y corresponding to the ground truth ratings of the distances, that
#is to be ordered in the same way as arr.
def ModifiedInsertionSort(arr, y):
    for i in range(1, len(arr)): 
        key = arr[i]
        y_key = y[i]
        # Move elements of arr[0..i-1], that are 
        # greater than key, to one position ahead 
        # of their current position 
        j = i-1
        while j >=0 and key < arr[j] : 
                arr[j+1] = arr[j] 
                y[j+1] = y[j]
                j -= 1
        arr[j+1] = key
        y[j+1] = y_key
    return(arr, y)

#This class is the k nearest neighbors classifier
class Knn:
    #init takes X (train time series data), 
    #y (ground truth ratings) and neighbs, which
    #is the number of neighbors to be used in 
    #classification
    def __init__(self, X, y, neighbs):
        if neighbs > len(X):
            self.n = len(X)
        else:
            self.n = neighbs
        self.data = X
        self.truth = y
    #prints all attributes of self    
    def printAll(self):
        print(self.n)
        print(self.data)
        print(self.truth)
    #takes a point and returns a classification, which is just the 
    #classification of the n closest points to the point passed in
    def classify(self, point):
        neighbs = []
        neighbs_y = []
        for i in range(0, len(X)):
            if i < self.n:
                distance, path = fastdtw(np.array(X[i]), np.array(point), dist=euclidean)
                neighbs.append(distance)
                neighbs_y.append(y[i])
                neighbs, neighbs_y = ModifiedInsertionSort(neighbs, neighbs_y)
            else:
                distance, path = fastdtw(np.array(X[i]), np.array(point), dist=euclidean)
                if distance < neighbs[self.n - 1]:
                    neighbs[self.n - 1] = distance
                    neighbs_y[self.n - 1] = y[i]
                    neighbs, neighbs_y = ModifiedInsertionSort(neighbs, neighbs_y)
        neg_one_count = 0
        zero_count = 0
        one_count = 0
        for i in range(0, len(neighbs_y)):
            if neighbs_y[i] == -1:
                neg_one_count += 1
            elif neighbs_y[i] == 0:
                zero_count += 1
            else:
                one_count += 1
        max_count = max(neg_one_count, zero_count, one_count)
        if max_count == neg_one_count:
            return -1
        elif max_count == zero_count:
            return 0
        else:
            return 1
    #Takes an array X of test data and y of ground truth for X,
    #returns the number of points in X that the classifier 
    #correctly classifies
    def accuracyScore(self, X, y):
        num_correct = 0
        for i in range(0, len(X)):
            guess = self.classify(X[i])
            if guess == y[i]:
                num_correct += 1
        return float(num_correct) / len(X)
            
                       
#read in csv file
raw_com_data = []
com_data_cols = []
with open('clean_comedy_data_ts.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    count = 0
    curr_per = -1
    curr_joke = -1
    data_index = -1
    for row in csv_reader:
        if count == 0:
            com_data_cols = row 
        else:
            if curr_per != int(row[0]) or curr_joke != int(row[1]):
                curr_per = int(row[0])
                curr_joke = int(row[1])
                temp = []
                temp_time = []
                raw_com_data.append([int(row[0]), int(row[1]), temp_time, temp, int(row[5]), int(row[6])])
                temp_time.append(float(row[2]))
                temp.append([float(row[3]), float(row[4])])
                data_index += 1
            else:
                curr_per = int(row[0])
                curr_joke = int(row[1])
                raw_com_data[data_index][2].append(float(row[2]))
                raw_com_data[data_index][3].append([float(row[3]), float(row[4])])               
        count += 1
#makes X and y
y = []
X = []
for i in range(0, 300):
    X.append(raw_com_data[i][3])
    y.append(raw_com_data[i][5])
#makes X and y test
y_test = []
X_test = []
for i in range(301, 500):
    X_test.append(raw_com_data[i][3])
    y_test.append(raw_com_data[i][5])
#this creates the classifer and prints the accuracy score
clf = Knn(X, y, 10)
#clf.printAll()
print(clf.accuracyScore(X_test, y_test))



0.5979899497487438
