-
Notifications
You must be signed in to change notification settings - Fork 327
/
whisper-resize.py
executable file
·183 lines (154 loc) · 5.87 KB
/
whisper-resize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python
import os
import sys
import time
import bisect
import signal
import optparse
import traceback
try:
import whisper
except ImportError:
raise SystemExit('[ERROR] Please make sure whisper is installed properly')
# Ignore SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
now = int(time.time())
option_parser = optparse.OptionParser(
usage='''%prog path timePerPoint:timeToStore [timePerPoint:timeToStore]*
timePerPoint and timeToStore specify lengths of time, for example:
60:1440 60 seconds per datapoint, 1440 datapoints = 1 day of retention
15m:8 15 minutes per datapoint, 8 datapoints = 2 hours of retention
1h:7d 1 hour per datapoint, 7 days of retention
12h:2y 12 hours per datapoint, 2 years of retention
''')
option_parser.add_option(
'--xFilesFactor', default=None,
type='float', help="Change the xFilesFactor")
option_parser.add_option(
'--aggregationMethod', default=None,
type='string', help="Change the aggregation function (%s)" %
', '.join(whisper.aggregationMethods))
option_parser.add_option(
'--force', default=False, action='store_true',
help="Perform a destructive change")
option_parser.add_option(
'--newfile', default=None, action='store',
help="Create a new database file without removing the existing one")
option_parser.add_option(
'--nobackup', action='store_true',
help='Delete the .bak file after successful execution')
option_parser.add_option(
'--aggregate', action='store_true',
help='Try to aggregate the values to fit the new archive better.'
' Note that this will make things slower and use more memory.')
(options, args) = option_parser.parse_args()
if len(args) < 2:
option_parser.print_help()
sys.exit(1)
path = args[0]
if not os.path.exists(path):
sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path)
option_parser.print_help()
sys.exit(1)
info = whisper.info(path)
new_archives = [whisper.parseRetentionDef(retentionDef)
for retentionDef in args[1:]]
old_archives = info['archives']
# sort by precision, lowest to highest
old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=True)
if options.xFilesFactor is None:
xff = info['xFilesFactor']
else:
xff = options.xFilesFactor
if options.aggregationMethod is None:
aggregationMethod = info['aggregationMethod']
else:
aggregationMethod = options.aggregationMethod
print('Retrieving all data from the archives')
for archive in old_archives:
fromTime = now - archive['retention'] + archive['secondsPerPoint']
untilTime = now
timeinfo, values = whisper.fetch(path, fromTime, untilTime)
archive['data'] = (timeinfo, values)
if options.newfile is None:
tmpfile = path + '.tmp'
if os.path.exists(tmpfile):
print('Removing previous temporary database file: %s' % tmpfile)
os.unlink(tmpfile)
newfile = tmpfile
else:
newfile = options.newfile
print('Creating new whisper database: %s' % newfile)
whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod)
size = os.stat(newfile).st_size
print('Created: %s (%d bytes)' % (newfile, size))
if options.aggregate:
# This is where data will be interpolated (best effort)
print('Migrating data with aggregation...')
all_datapoints = []
for archive in old_archives:
# Loading all datapoints into memory for fast querying
timeinfo, values = archive['data']
new_datapoints = zip(range(*timeinfo), values)
if all_datapoints:
last_timestamp = all_datapoints[-1][0]
slice_end = 0
for i, (timestamp, value) in enumerate(new_datapoints):
if timestamp > last_timestamp:
slice_end = i
break
all_datapoints += new_datapoints[i:]
else:
all_datapoints += new_datapoints
oldtimestamps = map(lambda p: p[0], all_datapoints)
oldvalues = map(lambda p: p[1], all_datapoints)
print("oldtimestamps: %s" % oldtimestamps)
# Simply cleaning up some used memory
del all_datapoints
new_info = whisper.info(newfile)
new_archives = new_info['archives']
for archive in new_archives:
step = archive['secondsPerPoint']
fromTime = now - archive['retention'] + now % step
untilTime = now + now % step + step
print("(%s,%s,%s)" % (fromTime, untilTime, step))
timepoints_to_update = range(fromTime, untilTime, step)
print("timepoints_to_update: %s" % timepoints_to_update)
newdatapoints = []
for tinterval in zip(timepoints_to_update[:-1], timepoints_to_update[1:]):
# TODO: Setting lo= parameter for 'lefti' based on righti from previous
# iteration. Obviously, this can only be done if
# timepoints_to_update is always updated. Is it?
lefti = bisect.bisect_left(oldtimestamps, tinterval[0])
righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti)
newvalues = oldvalues[lefti:righti]
if newvalues:
non_none = filter(lambda x: x is not None, newvalues)
if 1.0 * len(non_none) / len(newvalues) >= xff:
newdatapoints.append([tinterval[0],
whisper.aggregate(aggregationMethod,
non_none, newvalues)])
whisper.update_many(newfile, newdatapoints)
else:
print('Migrating data without aggregation...')
for archive in old_archives:
timeinfo, values = archive['data']
datapoints = zip(range(*timeinfo), values)
datapoints = filter(lambda p: p[1] is not None, datapoints)
whisper.update_many(newfile, datapoints)
if options.newfile is not None:
sys.exit(0)
backup = path + '.bak'
print('Renaming old database to: %s' % backup)
os.rename(path, backup)
try:
print('Renaming new database to: %s' % path)
os.rename(tmpfile, path)
except (OSError):
traceback.print_exc()
print('\nOperation failed, restoring backup')
os.rename(backup, path)
sys.exit(1)
if options.nobackup:
print("Unlinking backup: %s" % backup)
os.unlink(backup)