Skip to content

Commit

Permalink
Now getting paths from all targets before fetching data.
Browse files Browse the repository at this point in the history
Fetching data now occurs outside of evaluateTargets. Paths are now
collected from all targets prior to evaluating targets. The paths
are then sent to fetchData to create a data store. evaluateTarget
pulls data from the data store instead of fetching data. This
allows greater usage of fetch_multi for requests that send
multiple targets.
  • Loading branch information
james absalon committed Jul 20, 2015
1 parent 2baa0b7 commit 189cfc2
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 89 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.tox
*.egg-info
*.pyc
.coverage
htmlcov
docs/_build
Expand Down
52 changes: 42 additions & 10 deletions graphite_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ def render():
'data': [],
}

# Gather all data to take advantage of backends with fetch_multi
paths = []
for target in request_options['targets']:
if request_options['graphType'] == 'pie':
if ':' in target:
continue
if target.strip():
paths += pathsFromTarget(target)
data_store = fetchData(context, paths)

if request_options['graphType'] == 'pie':
for target in request_options['targets']:
if ':' in target:
Expand All @@ -389,7 +399,7 @@ def render():
errors['target'] = "Invalid target: '{0}'.".format(target)
context['data'].append((name, value))
else:
series_list = evaluateTarget(context, target)
series_list = evaluateTarget(context, target, data_store)

for series in series_list:
func = app.functions[request_options['pieMode']]
Expand All @@ -403,7 +413,7 @@ def render():
for target in request_options['targets']:
if not target.strip():
continue
series_list = evaluateTarget(context, target)
series_list = evaluateTarget(context, target, data_store)
context['data'].extend(series_list)

request_options['format'] = request_options.get('format')
Expand Down Expand Up @@ -477,31 +487,53 @@ def render():
return response


def evaluateTarget(requestContext, target):
def pathsFromTarget(target):
paths = []
tokens = grammar.parseString(target)
pathsFromTokens(tokens, paths)
return paths


def pathsFromTokens(tokens, paths):
if tokens.expression:
pathsFromTokens(tokens.expression, paths)
elif tokens.pathExpression:
paths.append(tokens.pathExpression)
elif tokens.call:
for arg in tokens.call.args:
pathsFromTokens(arg, paths)
for kwarg in tokens.call.kwargs:
pathsFromTokens(kwarg.args[0], paths)


def evaluateTarget(requestContext, target, data_store):
tokens = grammar.parseString(target)
result = evaluateTokens(requestContext, tokens)
result = evaluateTokens(requestContext, tokens, data_store)

if isinstance(result, TimeSeries):
return [result] # we have to return a list of TimeSeries objects

return result


def evaluateTokens(requestContext, tokens):
def evaluateTokens(requestContext, tokens, data_store):
if tokens.expression:
return evaluateTokens(requestContext, tokens.expression)
return evaluateTokens(requestContext, tokens.expression, data_store)

elif tokens.pathExpression:
return fetchData(requestContext, tokens.pathExpression)
return data_store.get_series_list(tokens.pathExpression)

elif tokens.call:
func = app.functions[tokens.call.funcname]
args = [evaluateTokens(requestContext,
arg) for arg in tokens.call.args]
arg, data_store) for arg in tokens.call.args]
kwargs = dict([(kwarg.argname,
evaluateTokens(requestContext, kwarg.args[0]))
evaluateTokens(requestContext,
kwarg.args[0],
data_store))
for kwarg in tokens.call.kwargs])
return func(requestContext, *args, **kwargs)
ret = func(requestContext, *args, **kwargs)
return ret

elif tokens.number:
if tokens.number.integer:
Expand Down
61 changes: 48 additions & 13 deletions graphite_api/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from .render.attime import parseTimeOffset, parseATTime
from .render.glyph import format_units
from .render.datalib import TimeSeries
from .render.datalib import TimeSeries, fetchData
from .utils import to_seconds, epoch

NAN = float('NaN')
Expand Down Expand Up @@ -1928,13 +1928,15 @@ def useSeriesAbove(requestContext, seriesList, value, search, replace):
&target=useSeriesAbove(ganglia.metric1.reqs,10,"reqs","time")
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
newSeries = []

for series in seriesList:
newname = re.sub(search, replace, series.name)
if safeMax(series) > value:
n = evaluateTarget(requestContext, newname)
paths = pathsFromTarget(newname)
data_store = fetchData(requestContext, paths)
n = evaluateTarget(requestContext, newname, data_store)
if n is not None and len(n) > 0:
newSeries.append(n[0])

Expand Down Expand Up @@ -2055,19 +2057,32 @@ def _fetchWithBootstrap(requestContext, seriesList, **delta_kwargs):
"""
Request the same data but with a bootstrap period at the beginning.
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
bootstrapContext = requestContext.copy()
bootstrapContext['startTime'] = (
requestContext['startTime'] - timedelta(**delta_kwargs))
bootstrapContext['endTime'] = requestContext['startTime']

bootstrapList = []

# Get all paths to fetch
paths = []
for series in seriesList:
if series.pathExpression in [b.pathExpression for b in bootstrapList]:
continue
paths.extend(pathsFromTarget(series.pathExpression))

# Fetch all paths
data_store = fetchData(bootstrapContext, paths)

for series in seriesList:
if series.pathExpression in [b.pathExpression for b in bootstrapList]:
# This pathExpression returns multiple series and we already
# fetched it
continue
bootstraps = evaluateTarget(bootstrapContext, series.pathExpression)
bootstraps = evaluateTarget(bootstrapContext,
series.pathExpression,
data_store)
bootstrapList.extend(bootstraps)

newSeriesList = []
Expand Down Expand Up @@ -2392,7 +2407,7 @@ def timeStack(requestContext, seriesList, timeShiftUnit, timeShiftStart,
# create a series for today and each of the previous 7 days
&target=timeStack(Sales.widgets.largeBlue,"1d",0,7)
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
# Default to negative. parseTimeOffset defaults to +
if timeShiftUnit[0].isdigit():
timeShiftUnit = '-' + timeShiftUnit
Expand All @@ -2409,7 +2424,10 @@ def timeStack(requestContext, seriesList, timeShiftUnit, timeShiftStart,
innerDelta = delta * shft
myContext['startTime'] = requestContext['startTime'] + innerDelta
myContext['endTime'] = requestContext['endTime'] + innerDelta
for shiftedSeries in evaluateTarget(myContext, series.pathExpression):
paths = pathsFromTarget(series.pathExpression)
for shiftedSeries in evaluateTarget(myContext,
series.pathExpression,
fetchData(myContext, paths)):
shiftedSeries.name = 'timeShift(%s, %s, %s)' % (shiftedSeries.name,
timeShiftUnit,
shft)
Expand Down Expand Up @@ -2447,7 +2465,7 @@ def timeShift(requestContext, seriesList, timeShift, resetEnd=True):
&target=timeShift(Sales.widgets.largeBlue,"+1h")
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
# Default to negative. parseTimeOffset defaults to +
if timeShift[0].isdigit():
timeShift = '-' + timeShift
Expand All @@ -2463,7 +2481,10 @@ def timeShift(requestContext, seriesList, timeShift, resetEnd=True):
# which is all we care about.
series = seriesList[0]

for shiftedSeries in evaluateTarget(myContext, series.pathExpression):
paths = pathsFromTarget(series.pathExpression)
for shiftedSeries in evaluateTarget(myContext,
series.pathExpression,
fetchData(myContext, paths)):
shiftedSeries.name = 'timeShift(%s, %s)' % (shiftedSeries.name,
timeShift)
if resetEnd:
Expand Down Expand Up @@ -2874,7 +2895,7 @@ def smartSummarize(requestContext, seriesList, intervalString, func='sum'):
"""
Smarter experimental version of summarize.
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
results = []
delta = parseTimeOffset(intervalString)
interval = to_seconds(delta)
Expand All @@ -2893,10 +2914,17 @@ def smartSummarize(requestContext, seriesList, intervalString, func='sum'):
requestContext['startTime'] = datetime(s.year, s.month, s.day, s.hour,
s.minute, tzinfo=tzinfo)

paths = []
for series in seriesList:
paths.extend(pathsFromTarget(series.pathExpression))
data_store = fetchData(requestContext, paths)

for i, series in enumerate(seriesList):
# XXX: breaks with summarize(metric.{a,b})
# each series.pathExpression == metric.{a,b}
newSeries = evaluateTarget(requestContext, series.pathExpression)[0]
newSeries = evaluateTarget(requestContext,
series.pathExpression,
data_store)[0]
series[0:len(series)] = newSeries
series.start = newSeries.start
series.end = newSeries.end
Expand Down Expand Up @@ -3076,7 +3104,7 @@ def hitcount(requestContext, seriesList, intervalString,
or coarse-grained records) and handles rarely-occurring events
gracefully.
"""
from .app import evaluateTarget
from .app import evaluateTarget, pathsFromTarget
results = []
delta = parseTimeOffset(intervalString)
interval = to_seconds(delta)
Expand All @@ -3093,9 +3121,16 @@ def hitcount(requestContext, seriesList, intervalString,
requestContext['startTime'] = datetime(s.year, s.month, s.day,
s.hour, s.minute)

# Gather all paths first, then the data
paths = []
for series in seriesList:
paths.extend(pathsFromTarget(series.pathExpression))
data_store = fetchData(requestContext, paths)

for i, series in enumerate(seriesList):
newSeries = evaluateTarget(requestContext,
series.pathExpression)[0]
series.pathExpression,
data_store)[0]
intervalCount = int((series.end - series.start) / interval)
series[0:len(series)] = newSeries
series.start = newSeries.start
Expand Down
Loading

0 comments on commit 189cfc2

Please sign in to comment.