-
Notifications
You must be signed in to change notification settings - Fork 1
/
multithreaded_tarzan.py
78 lines (66 loc) · 1.99 KB
/
multithreaded_tarzan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pandas as pd
import numpy as np
from tarzan import *
import queue
from argparse import ArgumentParser
import threading
q = queue.Queue()
threads = []
surprises = []
results = []
def main():
parser = ArgumentParser()
parser.add_argument('csv')
parser.add_argument('num_worker_threads')
args = parser.parse_args()
df = pd.read_csv(args.csv)
meters = df.columns
num_worker_threads = int(args.num_worker_threads)
for meter in df.columns[1:]:
ts = df.loc[:, ['datetime', meter]]
var_dict = {
"series": ts,
'alpha_size':16,
'window_length':8,
'feature_length':4,
'col_name':meter,
'threshold':1.5}
q.put(var_dict)
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
print(results, surprises)
def worker():
while True:
item = q.get()
if item is None:
break
try:
print(item['col_name'])
surprising_windows, surprises, scores, x = tarzan(**item)
print(x)
print(scores)
print()
surprises.append((surprising_windows, item['col_name']))
results.append((scores, item['col_name']))
except Exception as e:
#Possible exceptions include numpy.polyfit failing on NaN or other
#values. Additionally, if there isn't enough variation in the data,
#Pandas is unable to create bins to create an alphabet. This usually
#happens when the meter is unused and is all 0s.
print(e)
print(item['col_name']+ ": Excepted\n")
continue
finally:
print(item['col_name'] + ":done\n")
q.task_done()
if __name__ == '__main__':
main()