Skip to content

Commit

Permalink
Merge pull request #47 from iain-buclaw-sociomantic/archive_slice
Browse files Browse the repository at this point in the history
(Another) fixed reading archived data
  • Loading branch information
deniszh committed Apr 3, 2017
2 parents 69f50cd + 7b3caf5 commit 9e7c5d0
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 12 deletions.
98 changes: 91 additions & 7 deletions ceres.py
Expand Up @@ -20,7 +20,7 @@
import struct
import json
import errno
from math import isnan
from math import ceil, isnan
from os.path import isdir, exists, join, dirname, abspath, getsize, getmtime
from glob import glob
from bisect import bisect_left
Expand All @@ -38,12 +38,15 @@
TIMESTAMP_SIZE = struct.calcsize(TIMESTAMP_FORMAT)
DATAPOINT_FORMAT = "!d"
DATAPOINT_SIZE = struct.calcsize(DATAPOINT_FORMAT)
INFINITY = float('inf')
NAN = float('nan')
PACKED_NAN = struct.pack(DATAPOINT_FORMAT, NAN)
MAX_DATA_POINTS = INFINITY
MAX_SLICE_GAP = 80
DEFAULT_TIMESTEP = 60
DEFAULT_NODE_CACHING_BEHAVIOR = 'all'
DEFAULT_SLICE_CACHING_BEHAVIOR = 'none'
SLICE_AGGREGATION_METHODS = ['average', 'sum', 'last', 'max', 'min']
SLICE_PERMS = 0o644
DIR_PERMS = 0o755

Expand Down Expand Up @@ -259,7 +262,7 @@ class CeresNode(object):
.. seealso:: :func:`setDefaultSliceCachingBehavior` to adjust caching behavior
"""
__slots__ = ('tree', 'nodePath', 'fsPath',
'metadataFile', 'timeStep',
'metadataFile', 'timeStep', 'aggregationMethod',
'sliceCache', 'sliceCachingBehavior')

def __init__(self, tree, nodePath, fsPath):
Expand All @@ -268,6 +271,7 @@ def __init__(self, tree, nodePath, fsPath):
self.fsPath = fsPath
self.metadataFile = join(fsPath, '.ceres-node')
self.timeStep = None
self.aggregationMethod = 'average'
self.sliceCache = None
self.sliceCachingBehavior = DEFAULT_SLICE_CACHING_BEHAVIOR

Expand Down Expand Up @@ -354,6 +358,8 @@ def readMetadata(self):
try:
metadata = json.load(fh)
self.timeStep = int(metadata['timeStep'])
if metadata.get('aggregationMethod'):
self.aggregationMethod = metadata['aggregationMethod']
return metadata
except (KeyError, IOError, ValueError) as e:
raise CorruptNode(self, "Unable to parse node metadata: %s" % e.args)
Expand Down Expand Up @@ -485,6 +491,8 @@ def read(self, fromTime, untilTime):
sliceBoundary = None # to know when to split up queries across slices
resultValues = []
earliestData = None
timeStep = self.timeStep
method = self.aggregationMethod

for slice in self.slices:
# If there was a prior slice covering the requested interval, dont ask for that data again
Expand All @@ -500,9 +508,21 @@ def read(self, fromTime, untilTime):
except NoData:
break

if series.timeStep != timeStep:
if len(resultValues) == 0:
# First slice holding series data, this becomes the default timeStep.
timeStep = series.timeStep
elif series.timeStep < timeStep:
# Series is at a different precision, aggregate to fit our current set.
series.values = aggregateSeries(method, series.timeStep, timeStep, series.values)
else:
# Normalize current set to fit new series data.
resultValues = aggregateSeries(method, timeStep, series.timeStep, resultValues)
timeStep = series.timeStep

earliestData = series.startTime

rightMissing = (requestUntilTime - series.endTime) // self.timeStep
rightMissing = (requestUntilTime - series.endTime) // timeStep
rightNulls = [None for i in range(rightMissing)]
resultValues = series.values + rightNulls + resultValues
break
Expand All @@ -514,9 +534,21 @@ def read(self, fromTime, untilTime):
except NoData:
continue

if series.timeStep != timeStep:
if len(resultValues) == 0:
# First slice holding series data, this becomes the default timeStep.
timeStep = series.timeStep
elif series.timeStep < timeStep:
# Series is at a different precision, aggregate to fit our current set.
series.values = aggregateSeries(method, series.timeStep, timeStep, series.values)
else:
# Normalize current set to fit new series data.
resultValues = aggregateSeries(method, timeStep, series.timeStep, resultValues)
timeStep = series.timeStep

earliestData = series.startTime

rightMissing = (requestUntilTime - series.endTime) // self.timeStep
rightMissing = (requestUntilTime - series.endTime) // timeStep
rightNulls = [None for i in range(rightMissing)]
resultValues = series.values + rightNulls + resultValues

Expand All @@ -525,16 +557,24 @@ def read(self, fromTime, untilTime):

# The end of the requested interval predates all slices
if earliestData is None:
missing = int(untilTime - fromTime) // self.timeStep
missing = int(untilTime - fromTime) // timeStep
resultValues = [None for i in range(missing)]

# Left pad nulls if the start of the requested interval predates all slices
else:
leftMissing = (earliestData - fromTime) // self.timeStep
leftMissing = (earliestData - fromTime) // timeStep
leftNulls = [None for i in range(leftMissing)]
resultValues = leftNulls + resultValues

return TimeSeriesData(fromTime, untilTime, self.timeStep, resultValues)
# Finally, normalize the entire dataset for any added left padded nulls.
# This is an optimization for reducing large sets of data.
if MAX_DATA_POINTS < len(resultValues):
factor = int(ceil(float(len(resultValues)) / float(MAX_DATA_POINTS)))
newTimeStep = factor * timeStep
resultValues = aggregateSeries(method, timeStep, newTimeStep, resultValues)
timeStep = newTimeStep

return TimeSeriesData(fromTime, untilTime, timeStep, resultValues)

def write(self, datapoints):
"""Writes datapoints to underlying slices. Datapoints that round to the same timestamp for the
Expand Down Expand Up @@ -858,6 +898,10 @@ class InvalidRequest(Exception):
pass


class InvalidAggregationMethod(Exception):
pass


class SliceGapTooLarge(Exception):
"For internal use only"

Expand All @@ -866,6 +910,46 @@ class SliceDeleted(Exception):
pass


def aggregate(aggregationMethod, values):
# Filter out None values
knownValues = list(filter(lambda x: x is not None, values))
if len(knownValues) is 0:
return None
# Aggregate based on method
if aggregationMethod == 'average':
return float(sum(knownValues)) / float(len(knownValues))
elif aggregationMethod == 'sum':
return float(sum(knownValues))
elif aggregationMethod == 'last':
return knownValues[-1]
elif aggregationMethod == 'max':
return max(knownValues)
elif aggregationMethod == 'min':
return min(knownValues)
else:
raise InvalidAggregationMethod("Unrecognized aggregation method %s" %
aggregationMethod)


def aggregateSeries(method, oldTimeStep, newTimeStep, values):
# Aggregate current values to fit newTimeStep.
# Makes the assumption that the caller has already guaranteed
# that newTimeStep is bigger than oldTimeStep.
factor = int(newTimeStep // oldTimeStep)
newValues = []
subArr = []
for val in values:
subArr.append(val)
if len(subArr) == factor:
newValues.append(aggregate(method, subArr))
subArr = []

if len(subArr):
newValues.append(aggregate(method, subArr))

return newValues


def getTree(path):
while path not in (os.sep, ''):
if isdir(join(path, '.ceres-tree')):
Expand Down
114 changes: 109 additions & 5 deletions tests/test_ceres.py
Expand Up @@ -15,7 +15,7 @@
from ceres import DATAPOINT_SIZE, DEFAULT_NODE_CACHING_BEHAVIOR, DEFAULT_SLICE_CACHING_BEHAVIOR,\
DEFAULT_TIMESTEP, DIR_PERMS, MAX_SLICE_GAP
from ceres import getTree, CorruptNode, NoData, NodeDeleted, NodeNotFound, SliceDeleted,\
SliceGapTooLarge, TimeSeriesData
SliceGapTooLarge, TimeSeriesData, InvalidAggregationMethod


def fetch_mock_open_writes(open_mock):
Expand All @@ -38,7 +38,7 @@ def side_effect(*args, **kwargs):
result_start = max(startTime, start)
result_end = min(endTime, end)
points = (result_end - result_start) // step
return TimeSeriesData(result_start, result_end, step, [0] * points)
return TimeSeriesData(result_start, result_end, step, [float(x) for x in range(points)])

slice_mock.read.side_effect = side_effect
return slice_mock
Expand Down Expand Up @@ -675,7 +675,7 @@ def test_read_returns_empty_time_series_if_slice_has_no_data(self):

def test_read_pads_points_missing_before_series(self):
result = self.ceres_node.read(540, 1200)
self.assertEqual([None] + [0] * 10, result.values)
self.assertEqual([None, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], result.values)

def test_read_pads_points_missing_after_series(self):
result = self.ceres_node.read(1200, 1860)
Expand All @@ -688,7 +688,7 @@ def test_read_goes_across_slices(self):

def test_read_across_slices_merges_results(self):
result = self.ceres_node.read(900, 1500)
self.assertEqual([0] * 10, result.values)
self.assertEqual([0, 1, 2, 3, 4, 0, 1, 2, 3, 4], result.values)

def test_read_pads_points_missing_after_series_across_slices(self):
result = self.ceres_node.read(900, 1860)
Expand All @@ -697,7 +697,7 @@ def test_read_pads_points_missing_after_series_across_slices(self):
def test_read_pads_points_missing_between_slices(self):
self.ceres_slices[1] = make_slice_mock(600, 1140, 60)
result = self.ceres_node.read(900, 1500)
self.assertEqual([0] * 4 + [None] + [0] * 5, result.values)
self.assertEqual([0, 1, 2, 3, None, 0, 1, 2, 3, 4], result.values)


class CeresSliceTest(TestCase):
Expand Down Expand Up @@ -898,3 +898,107 @@ def test_correct_size_written_one_point_gap(self, open_mock):
self.ceres_slice.write([(420, 0)])
# one empty point, one real point = two points total written
self.assertEqual(2 * DATAPOINT_SIZE, len(fetch_mock_open_writes(open_mock)))


class CeresArchiveNodeReadTest(TestCase):
def setUp(self):
with patch('ceres.isdir', new=Mock(return_value=True)):
with patch('ceres.exists', new=Mock(return_value=True)):
self.ceres_tree = CeresTree('/graphite/storage/ceres')
self.ceres_node = CeresNode(
self.ceres_tree,
'sample_metric',
'/graphite/storage/ceres/sample_metric')
self.ceres_node.timeStep = 30

slice_configs = [
(1200, 1800, 30),
(600, 1200, 60)]

self.ceres_slices = []
for start, end, step in slice_configs:
slice_mock = make_slice_mock(start, end, step)
self.ceres_slices.append(slice_mock)

self.ceres_slices_patch = patch('ceres.CeresNode.slices', new=iter(self.ceres_slices))
self.ceres_slices_patch.start()

def tearDown(self):
self.ceres_slices_patch.stop()

def test_archives_read_loads_metadata_if_timestep_unknown(self):
with patch('ceres.CeresNode.readMetadata', new=Mock(side_effect=Exception))\
as read_metadata_mock:
self.ceres_node.timeStep = None
try: # Raise Exception as a cheap exit out of the function once we have the call we want
self.ceres_node.read(600, 660)
except Exception:
pass
read_metadata_mock.assert_called_once_with()

def test_archives_read_normalizes_from_time(self):
self.ceres_node.read(1210, 1260)
self.ceres_slices[0].read.assert_called_once_with(1200, 1260)

def test_archives_read_normalizes_until_time(self):
self.ceres_node.read(1200, 1270)
self.ceres_slices[0].read.assert_called_once_with(1200, 1260)

def test_archives_read_returns_empty_time_series_if_before_slices(self):
result = self.ceres_node.read(0, 300)
self.assertEqual([None] * 10, result.values)

def test_archives_read_returns_empty_time_series_if_slice_has_no_data(self):
self.ceres_slices[0].read.side_effect = NoData
result = self.ceres_node.read(1200, 1500)
self.assertEqual([None] * 10, result.values)

def test_archives_read_pads_points_missing_before_series(self):
result = self.ceres_node.read(300, 1200)
self.assertEqual(None, result.values[0])

def test_archives_read_pads_points_missing_after_series(self):
result = self.ceres_node.read(1200, 1860)
self.assertEqual(None, result.values[-1])

def test_archives_read_goes_across_slices(self):
self.ceres_node.read(900, 1500)
self.ceres_slices[0].read.assert_called_once_with(1200, 1500)
self.ceres_slices[1].read.assert_called_once_with(900, 1200)

def test_archives_read_across_slices_merges_results_average(self):
result = self.ceres_node.read(900, 1470)
self.assertEqual([0, 1, 2, 3, 4, 0.5, 2.5, 4.5, 6.5, 8], result.values)

def test_archives_read_across_slices_merges_results_sum(self):
self.ceres_node.aggregationMethod = 'sum'
result = self.ceres_node.read(900, 1470)
self.assertEqual([0, 1, 2, 3, 4, 1, 5, 9, 13, 8], result.values)

def test_archives_read_across_slices_merges_results_last(self):
self.ceres_node.aggregationMethod = 'last'
result = self.ceres_node.read(900, 1470)
self.assertEqual([0, 1, 2, 3, 4, 1, 3, 5, 7, 8], result.values)

def test_archives_read_across_slices_merges_results_max(self):
self.ceres_node.aggregationMethod = 'max'
result = self.ceres_node.read(900, 1470)
self.assertEqual([0, 1, 2, 3, 4, 1, 3, 5, 7, 8], result.values)

def test_archives_read_across_slices_merges_results_min(self):
self.ceres_node.aggregationMethod = 'min'
result = self.ceres_node.read(900, 1470)
self.assertEqual([0, 1, 2, 3, 4, 0, 2, 4, 6, 8], result.values)

def test_archives_invalid_aggregation_method(self):
self.ceres_node.aggregationMethod = 'invalid'
self.assertRaises(InvalidAggregationMethod, self.ceres_node.read, 900, 1500)

def test_archives_read_pads_points_missing_after_series_across_slices(self):
result = self.ceres_node.read(900, 1860)
self.assertEqual(None, result.values[-1])

def test_archives_read_pads_points_missing_between_slices(self):
self.ceres_slices[1] = make_slice_mock(600, 900, 300)
result = self.ceres_node.read(600, 1500)
self.assertEqual([0, None, 4.5], result.values)

0 comments on commit 9e7c5d0

Please sign in to comment.