Skip to content

Commit

Permalink
Merge pull request #92 from penpen/master
Browse files Browse the repository at this point in the history
support for 'avg_zero' aggregation
  • Loading branch information
SEJeff committed Jun 17, 2015
2 parents 4a1a615 + 17aee5f commit 75e35fd
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 4 deletions.
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
django
2 changes: 1 addition & 1 deletion bin/whisper-resize.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
if 1.0*len(non_none)/len(newvalues) >= xff:
newdatapoints.append([tinterval[0],
whisper.aggregate(aggregationMethod,
non_none)])
non_none, newvalues)])
whisper.update_many(newfile, newdatapoints)
else:
print('Migrating data without aggregation...')
Expand Down
7 changes: 7 additions & 0 deletions test_whisper.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def test_aggregate(self):
self.assertEqual(whisper.aggregate('sum', [10, 2, 3, 4]), 19)
# average of the list elements
self.assertEqual(whisper.aggregate('average', [1, 2, 3, 4]), 2.5)
avg_zero = [1, 2, 3, 4, None, None, None, None]
non_null = [i for i in avg_zero if i is not None]
self.assertEqual(whisper.aggregate('avg_zero', non_null, avg_zero), 1.25)
# avg_zero without neighborValues

with self.assertRaises(whisper.InvalidAggregationMethod):
whisper.aggregate('avg_zero', non_null)

with AssertRaisesException(whisper.InvalidAggregationMethod('Unrecognized aggregation method derp')):
whisper.aggregate('derp', [12, 2, 3123, 1])
Expand Down
12 changes: 9 additions & 3 deletions whisper.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def _py_fallocate(fd, offset, len_):
2: 'sum',
3: 'last',
4: 'max',
5: 'min'
5: 'min',
6: 'avg_zero'
})
aggregationMethodToType = dict([[v, k] for k, v in aggregationTypeToMethod.items()])
aggregationMethods = aggregationTypeToMethod.values()
Expand Down Expand Up @@ -442,7 +443,7 @@ def create(path, archiveList, xFilesFactor=None, aggregationMethod=None, sparse=
raise


def aggregate(aggregationMethod, knownValues):
def aggregate(aggregationMethod, knownValues, neighborValues=None):
if aggregationMethod == 'average':
return float(sum(knownValues)) / float(len(knownValues))
elif aggregationMethod == 'sum':
Expand All @@ -453,6 +454,11 @@ def aggregate(aggregationMethod, knownValues):
return max(knownValues)
elif aggregationMethod == 'min':
return min(knownValues)
elif aggregationMethod == 'avg_zero':
if not neighborValues:
raise InvalidAggregationMethod("Using avg_zero without neighborValues")
values = [x or 0 for x in neighborValues]
return float(sum(values)) / float(len(values))
else:
raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
aggregationMethod)
Expand Down Expand Up @@ -516,7 +522,7 @@ def __propagate(fh, header, timestamp, higher, lower):

knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: # We have enough data to propagate a value!
aggregateValue = aggregate(aggregationMethod, knownValues)
aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
Expand Down

0 comments on commit 75e35fd

Please sign in to comment.