-
Notifications
You must be signed in to change notification settings - Fork 2
/
loc.py
428 lines (398 loc) · 16.3 KB
/
loc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#!/usr/bin/python
# python Location.py <data_file>
#
# Performs location type learning on the given data file and outputs either the
# learned model, or the confusion matrices and accuracy for a 3-fold
# cross-validation test.
#
# Requires files: l.translate, locations
# Written by Diane J. Cook, Washington State University.
# Copyright (c) 2020. Washington State University (WSU). All rights reserved.
# Code and data may not be used or distributed without permission from WSU.
import math
import os.path
import sys
import joblib
import numpy
from numpy import mean
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
import features
import gps
import utils
class Location:
def __init__(self, filename=None):
""" Constructor
"""
self.lmappings = dict()
self.lmappings['other'] = 'other'
if filename is None:
self.infile = "locations"
else:
self.infile = filename
self.locations = list()
self.local = 1 # Use the local GPS values
self.cross_validation = 0 # cross validation
self.xdata = list()
self.ydata = list()
self.numseconds = 5 # Number of seconds of data to include in sequence
self.samplerate = 1 # Number of samples per second
self.samplesize = self.numseconds * self.samplerate
self.clf = RandomForestClassifier(n_estimators=50, bootstrap=True,
criterion="entropy", class_weight="balanced", max_depth=5)
@staticmethod
def read_entry(infile):
""" Parse a single line from a text file containing a sensor reading.
The format is "date time sensorname sensorname value <activitylabel|0>".
"""
try:
line = infile.readline()
x = str(str(line).strip()).split(' ', 5)
if len(x) < 6:
return True, x[0], x[1], x[2], x[3], x[4], 'None'
else:
x[5] = x[5].replace(' ', '_')
return True, x[0], x[1], x[2], x[3], x[4], x[5]
except:
return False, None, None, None, None, None, None
def map_location_name(self, name):
""" Return the location type that is associated with a specific location
name, using the stored list of location mappings.
"""
newname = self.lmappings.get(name)
if newname is None:
return 'other'
else:
return newname
@staticmethod
def generate_location_num(name):
""" Transform a location type into an index value.
"""
if name == 'attraction':
return 0
if name == 'house':
return 1
elif name == 'restaurant':
return 2
elif name == 'road':
return 3
elif name == 'service':
return 4
elif name == 'store':
return 5
elif name == 'work':
return 6
else:
return 7
def read_location_mappings(self):
""" Generate a translate list for location names.
This function assumes that file l.translate exists in the same directory
as the code. File l.translate contains an arbitrary number of lines, each
with syntax "specificType mappedType". This function maps locations of
specificType to the corresponding, more general, mappedType.
"""
with open('l.translate', "r") as file:
for line in file:
x = str(str(line).strip()).split(' ', 2)
self.lmappings[x[0]] = x[1]
def read_locations(self):
""" Read and store list of locations and corresponding location types.
This function assumes that file locations exists in the same directory
as the code. File locations contains an arbitrary number of lines, each
with syntax "latitude longitude type1 type2 type3". Open street maps
return as many as three location types associated with a lat,long
location. They can provide alternate type names or levels of abstraction.
In this function, only the first type is stored with the latitude
and longitude.
"""
read_locations_index = 0
with open('locations', "r") as file:
for line in file:
x = str(str(line).strip()).split(' ', 3)
triple = list()
triple.append(float(x[0]))
triple.append(float(x[1]))
triple.append(x[2])
self.locations.append(triple)
read_locations_index = read_locations_index + 1
return read_locations_index
def find_location(self, latitude, longitude):
""" Determine whether the input location is close (within a threshold
distance) to the locations already stored in the external list.
"""
threshold = 0.005
for triple in self.locations:
tlat = triple[0]
tlong = triple[1]
dist = math.sqrt(((tlat - latitude) * (tlat - latitude)) +
((tlong - longitude) * (tlong - longitude)))
if dist < threshold:
return triple[2]
return None
def generate_gps_features(self, latitude, longitude):
""" Generate location features.
"""
location = self.find_location(latitude, longitude)
if location is not None:
return location
else:
location = list()
location.append(latitude)
location.append(longitude)
gps_type = gps.get_location_type(location, 'locations')
location.append(gps_type)
self.locations.append(location)
return gps_type
def extract_features(self, infile):
""" Extract a feature vector that will be input to a location classifier.
"""
fs1 = list()
fs2 = list()
if not os.path.isfile(infile):
print(infile, "does not exist")
exit()
# process input file for remaining feature vector
feature_datafile = open(infile, "r")
count = 0
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
prevdt = utils.get_datetime(date, feature_time)
temp = 0
gen = 0
i = 0
yaw = list()
pitch = list()
roll = list()
rotx = list()
roty = list()
rotz = list()
accx = list()
accy = list()
accz = list()
acctotal = list()
latitude = list()
longitude = list()
alt = list()
course = list()
speed = list()
hacc = list()
vacc = list()
minlat = list()
maxlat = list()
minlong = list()
maxlong = list()
prevlabel = '0'
while valid:
dt = utils.get_datetime(date, feature_time)
delta = dt - prevdt
if (delta.seconds > 2) or (gen == 1) or \
(delta.seconds < 0) or ((count % self.samplesize) == 0):
gen = 0
i = 0
yaw = list()
pitch = list()
roll = list()
rotx = list()
roty = list()
rotz = list()
accx = list()
accy = list()
accz = list()
acctotal = list()
latitude = list()
longitude = list()
alt = list()
course = list()
speed = list()
hacc = list()
vacc = list()
minlat = list()
maxlat = list()
minlong = list()
maxlong = list()
# first line already read
yaw.append(utils.clean(float(v1), -1.6, 1.6))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
pitch.append(utils.clean(float(v1), -1.6, 1.6))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
roll.append(utils.clean(float(v1), -1.6, 1.6))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
rotx.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
roty.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
rotz.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
v1 = utils.clean(float(v1), -1.0, 1.0)
accx.append(v1)
temp = v1 * v1
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
v1 = utils.clean(float(v1), -1.0, 1.0)
accy.append(v1)
temp += v1 * v1
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
v1 = utils.clean(float(v1), -1.0, 1.0)
accz.append(v1)
temp += v1 * v1
# compute combined accuracy
temp = numpy.sqrt(temp)
acctotal.append(temp)
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
latitude.append(float(v1))
if not minlat:
minlat = float(v1)
elif float(v1) < minlat:
minlat = float(v1)
if not maxlat:
maxlat = float(v1)
elif float(v1) > maxlat:
maxlat = float(v1)
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
longitude.append(float(v1))
if not minlong:
minlong = float(v1)
elif float(v1) < minlong:
minlong = float(v1)
if not maxlong:
maxlong = float(v1)
elif float(v1) > maxlong:
maxlong = float(v1)
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
alt.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
course.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
speed.append(float(v1))
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
hacc.append(float(v1))
pdt = utils.get_datetime(date, feature_time)
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
# Handle case where last VerticalAcc value missing
if not valid:
dt = pdt
v2 = None
vacc.append(0.0)
else:
vacc.append(float(v1))
dt = utils.get_datetime(date, feature_time)
month = dt.month
dayofweek = dt.weekday()
hours = dt.hour
minutes = (dt.hour * 60) + dt.minute
seconds = (dt.hour * 3600) + (dt.minute * 60) + dt.second
distance = math.sqrt(((maxlat - minlat) * (maxlat - minlat)) +
((maxlong - minlong) * (maxlong - minlong)))
hcr = features.heading_change_rate(course, distance)
sr = features.stop_rate(latitude, longitude, distance)
trajectory = features.trajectory(latitude, longitude)
if (count % self.samplesize) == 0:
xpoint = list()
gen = 1
for i in [yaw, pitch, roll, rotx, roty, rotz,
accx, accy, accz, acctotal]:
while len(i) > self.samplesize: # remove elements ouside window
del i[0]
xpoint.extend(features.generate_statistical_features(i))
if self.local == 1:
for i in [latitude, longitude, alt]:
xpoint.extend(features.generate_statistical_features(i))
for i in [course, speed, hacc, vacc]:
xpoint.extend(features.generate_statistical_features(i))
xpoint.append(distance)
xpoint.append(hcr)
xpoint.append(sr)
xpoint.append(trajectory)
xpoint.append(month)
xpoint.append(dayofweek)
xpoint.append(hours)
xpoint.append(minutes)
xpoint.append(seconds)
place = self.generate_gps_features(mean(latitude), mean(longitude))
if place != 'None':
self.xdata.append(xpoint)
yvalue = self.map_location_name(place)
self.ydata.append(yvalue)
else:
i += 1
if not valid:
prevdt = pdt
else:
prevdt = utils.get_datetime(date, feature_time)
count += 1
if (count % 100000) == 0:
print('count', count)
valid, date, feature_time, f1, f2, v1, v2 = self.read_entry(feature_datafile)
feature_datafile.close()
def label_loc(self, clf, yaw, pitch, roll, rotx, roty, rotz, accx, accy, accz,
acctotal, latitude, longitude, alt, course, speed, hacc, vacc,
distance, hcr, sr, trajectory,
month, dayofweek, hours, minutes, seconds):
""" Use the pretrained location classifier to extract features from the
input sensor values and map the feature vector onto a location type.
"""
xpoint = list()
for i in [yaw, pitch, roll, rotx, roty, rotz, accx, accy, accz, acctotal]:
xpoint.extend(features.generate_statistical_features(i))
for i in [latitude, longitude, alt]:
xpoint.extend(features.generate_statistical_features(i))
for i in [course, speed, hacc, vacc]:
xpoint.extend(features.generate_statistical_features(i))
xpoint.append(distance)
xpoint.append(hcr)
xpoint.append(sr)
xpoint.append(trajectory)
xpoint.append(month)
xpoint.append(dayofweek)
xpoint.append(hours)
xpoint.append(minutes)
xpoint.append(seconds)
self.xdata = [xpoint]
labels = self.clf.predict(self.xdata)
return labels[0]
def train_location_model(self):
""" Train a model to map a feature vector (statistical operations
applied to sensor values and raw location values) onto a location type.
"""
aset = set(self.ydata)
if self.cross_validation > 0: # k-fold cross validation
for i in range(self.cross_validation):
numright = 0
total = 0
xtrain, xtest, ytrain, ytest = train_test_split(self.xdata,
self.ydata,
test_size=0.33,
random_state=i)
self.clf.fit(xtrain, ytrain)
newlabels = self.clf.predict(xtest)
print('newlabels', newlabels)
matrix = confusion_matrix(ytest, newlabels)
print('matrix', matrix)
for j in range(len(ytest)):
if newlabels[j] == ytest[j]:
numright += 1
total += 1
print('accuracy', float(numright) / float(total))
else: # store the learned model
self.clf.fit(self.xdata, self.ydata)
outstr = "locmodel.pkl"
joblib.dump(self.clf, outstr)
@staticmethod
def load_location_model(modelfilename):
""" Load a pretrained model that maps a feature vector
(statistical operations applied to sensor values and raw location values)
onto a location type.
"""
filename = "locmodel.pkl"
clf = joblib.load(filename)
return clf
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Supply a file of locations")
exit()
loc = Location(sys.argv[1])
loc.read_location_mappings()
locations_index = loc.read_locations()
if len(sys.argv) > 2:
datafile = sys.argv[2]
loc.extract_features(datafile)
loc.train_location_model()