-
Notifications
You must be signed in to change notification settings - Fork 0
/
knn.py
executable file
·215 lines (186 loc) · 6.77 KB
/
knn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
__author__ = 'deeksha'
from sklearn import cross_validation
import csv
import sys
from sklearn import preprocessing
import numpy as np
def is_float(str):
try:
float(str)
return True
except ValueError:
return False
def is_int(str):
try:
int(str)
return True
except ValueError:
return False
def cvrt_to_num_if_can(str):
''' If str is really a number,
convert it to same, preferring ints
'''
if is_int(str):
return int(str)
elif is_float(str):
return float(str)
return str
#extract data from a CSV file
def extractData(fileName, targetName, delim =','):
'''Given the name of a file, the name of the target variable column,
and optionally the column deliminator (',' is the default),
Return a sklearn-style dictionary
'''
try:
in_file = open(fileName, 'rUb')
reader = csv.reader(in_file, delimiter=delim, quotechar='"')
except IOError as e:
print "I/O error({0}): {1}".format(e.errno, e.strerror)
raise
except ValueError:
print "Could not convert data."
raise
except:
print "Unexpected error:", sys.exc_info()[0]
raise
#initialization
dataDict = {}
dataDict['feature_names'] = []
dataDict['target_names'] = []
dataDict['target'] =[ ]
dataDict['data'] = []
fieldNames = []
#read the header row
for row in reader:
for field in row:
if field != '':
fieldNames.append(field)
break
#find the index of the target value, if exists
try:
targetIdx = fieldNames.index(targetName)
except ValueError:
print "Target %s not in fields %s" %(targetName, fieldNames)
raise
fieldNames = fieldNames[:targetIdx] + fieldNames[targetIdx+1:]
dataDict['feature_names'] = fieldNames
#read the data
for row in reader:
#We may want to later have more sophistication if values are missing,
# but for now we fill the example with "None"
rowData = [None for i in range(len(fieldNames))]
#add one to length because the target is also there
if len(row) != len(fieldNames)+1:
print "found a bad row? ",row
dataIdx = 0
for colIdx in range(len(row)):
if colIdx == targetIdx:
tVal = cvrt_to_num_if_can(row[colIdx])
dataDict['target'].append(tVal)
elif row[colIdx] != r'\N' and row[colIdx] != "":
rowData[dataIdx] = cvrt_to_num_if_can(row[colIdx])
dataIdx += 1
dataDict['data'].append(rowData)
#get unique targets
dataDict['target_names'] = list(set(dataDict['target']))
return dataDict
#split the data into training and testing splits
def split(inData, TrainProp=0.9):
''' Given an sklearn dataset, return two lists:
1. the first TrainProp proportion of the data, [X_train, Y_Train]
2. the second (1-TrainProp) proportion of the data, [X_test, Y_test]
'''
x_data = inData['data']
y_data = inData['target']
n_samples = len(x_data)
slice_size = int(TrainProp * n_samples)
x_train = x_data[:slice_size]
y_train = y_data[:slice_size]
x_test = x_data[slice_size:]
y_test = y_data[slice_size:]
return [x_train, y_train], [x_test, y_test]
#function to calculate the euclidean distance
def euclidean_distance(querypoint,data_array):
distance = []
querypoint = np.array(querypoint)
for array in data_array:
distance.append(np.sqrt(np.sum((querypoint-array)**2)))
return distance
#function which return the majority class of the k nearest neighbour
def k_nearest_neighbours(n_neighbours, training_data, queryPoint, rescale):
x_train = training_data[0]
x_train = np.array(x_train)
if rescale == "TRUE":
scaler = preprocessing.StandardScaler().fit(x_train)
x_train = scaler.transform(x_train)
scaler = preprocessing.StandardScaler().fit(queryPoint)
queryPoint = scaler.transform(queryPoint)
distance = euclidean_distance(queryPoint,x_train)
y_train = training_data[1]
y_train= np.array(y_train)
tuple = zip(distance,y_train)
sorted_tuple = sorted(tuple)
knn = sorted_tuple[:n_neighbours]
knn = [x[1] for x in knn]
knn = max(set(knn), key=knn.count)
return knn
#function which return the k classifier for fixed querypoint
def question3a(fileName, target):
dataDict = extractData(fileName, target, delim =',')
x_train = dataDict['data']
y_train = dataDict['target']
data = [x_train, y_train]
n_neighbours = [1, 3, 5]
rescale = "TRUE"
queryPoint = [2.5, 2.5]
for k in n_neighbours:
knn = k_nearest_neighbours(k, data, queryPoint, rescale)
print "For k:", k, "Classifier:", knn
def question3b(fileName, target):
dataDict = extractData(fileName, target, delim =',')
[x_train, y_train], [x_test, y_test] = split(dataDict, TrainProp=0.9)
training_data = [x_train, y_train]
n_neighbours = 1
rescale = "TRUE"
error = 0
for i in range(len(x_test)):
query = x_test[i]
target = y_test[i]
knn = k_nearest_neighbours(n_neighbours, training_data, query, rescale)
if knn <> target:
error += 1
score = float(len(x_test)-error)/len(x_test)
print score
def kfold(fileName, target):
rescale = "FALSE"
dataDict = extractData(fileName, target, delim =',')
perm = list(np.random.permutation(len(dataDict['data'])))
x_train = [dataDict['data'][p] for p in perm]
y_train = [dataDict['target'][p] for p in perm]
data = [x_train, y_train]
n_neighbours = filter(lambda x: x % 2, range(0, 13))
k_fold = cross_validation.KFold(n=len(x_train), n_folds=5, indices=True)
for k in n_neighbours:
scores = []
for train, test in k_fold:
error = 0
training_data_x = [data[0][t] for t in list(train)]
training_data_y = [data[1][t] for t in list(train)]
training_data = [training_data_x, training_data_y]
test_data_x = [data[0][t] for t in list(test)]
test_data_y = [data[1][t] for t in list(test)]
for i in range(len(test_data_x)):
query = test_data_x[i]
target = test_data_y[i]
knn = k_nearest_neighbours(k, training_data, query, rescale)
if knn <> target:
error += 1
score = float(len(test_data_x)-error)/len(test_data_x)
scores.append(score)
print "For k:", k, "Score:", np.average(scores)
#function that returns result to question 3a
question3a( "MLHW3.csv" ,"y")
# function that returns result to question 4
kfold( "wineHeaders.csv" ,"Target")
#function which returns the answer to question 3b
#question3b( "BioResponseKaggleTrain.csv","Activity")