Skip to content

Commit

Permalink
Tag federation & py3 fixes (#2128)
Browse files Browse the repository at this point in the history
* support passing seriesByTag calls to finders that support it
* remove unused noPrefetch arg
* switch from StringIO to BytesIO
  • Loading branch information
DanCech committed Dec 6, 2017
1 parent 1be03ab commit 4c5a7c4
Show file tree
Hide file tree
Showing 16 changed files with 849 additions and 188 deletions.
68 changes: 68 additions & 0 deletions webapp/graphite/finders/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self, host):
self.url = parsed['url']
self.params = parsed['params']
self.last_failure = 0
self.tags = not self.params.get('noTags')

@property
def disabled(self):
Expand Down Expand Up @@ -178,6 +179,73 @@ def get_index(self, requestContext):

return results

def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None):
"""
Return auto-complete suggestions for tags based on the matches for the specified expressions, optionally filtered by tag prefix
"""
if limit is None:
limit = settings.TAGDB_AUTOCOMPLETE_LIMIT

fields = [
('tagPrefix', tagPrefix or ''),
('limit', str(limit)),
]
for expr in exprs:
fields.append(('expr', expr))

result = self.request(
'/tags/autoComplete/tags',
fields,
headers=requestContext.get('forwardHeaders') if requestContext else None,
timeout=settings.REMOTE_FIND_TIMEOUT)
try:
reader = codecs.getreader('utf-8')
results = json.load(reader(result))
except Exception as err:
self.fail()
log.exception(
"RemoteFinder[%s] Error decoding autocomplete tags response from %s: %s" %
(self.host, result.url_full, err))
raise Exception("Error decoding autocomplete tags response from %s: %s" % (result.url_full, err))
finally:
result.release_conn()

return results

def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None):
"""
Return auto-complete suggestions for tags and values based on the matches for the specified expressions, optionally filtered by tag and/or value prefix
"""
if limit is None:
limit = settings.TAGDB_AUTOCOMPLETE_LIMIT

fields = [
('tag', tag or ''),
('valuePrefix', valuePrefix or ''),
('limit', str(limit)),
]
for expr in exprs:
fields.append(('expr', expr))

result = self.request(
'/tags/autoComplete/values',
fields,
headers=requestContext.get('forwardHeaders') if requestContext else None,
timeout=settings.REMOTE_FIND_TIMEOUT)
try:
reader = codecs.getreader('utf-8')
results = json.load(reader(result))
except Exception as err:
self.fail()
log.exception(
"RemoteFinder[%s] Error decoding autocomplete values response from %s: %s" %
(self.host, result.url_full, err))
raise Exception("Error decoding autocomplete values response from %s: %s" % (result.url_full, err))
finally:
result.release_conn()

return results

def request(self, path, fields=None, headers=None, timeout=None):
url = "%s%s" % (self.url, path)
url_full = "%s?%s" % (url, urlencode(fields))
Expand Down
8 changes: 8 additions & 0 deletions webapp/graphite/finders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class BaseFinder(object):
local = True
# set to True if this finder shouldn't be used
disabled = False
# set to True if this finder supports seriesByTag
tags = False

def __init__(self):
"""Initialize the finder."""
Expand Down Expand Up @@ -147,3 +149,9 @@ def fetch(self, patterns, start_time, end_time, now=None, requestContext=None):
})

return result

def auto_complete_tags(self, exprs, tagPrefix=None, limit=None, requestContext=None):
return []

def auto_complete_values(self, exprs, tag, valuePrefix=None, limit=None, requestContext=None):
return []
4 changes: 2 additions & 2 deletions webapp/graphite/render/datalib.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


class TimeSeries(list):
def __init__(self, name, start, end, step, values, consolidate='average', tags=None, xFilesFactor=None):
def __init__(self, name, start, end, step, values, consolidate='average', tags=None, xFilesFactor=None, pathExpression=None):
list.__init__(self, values)
self.name = name
self.start = start
Expand All @@ -34,7 +34,7 @@ def __init__(self, name, start, end, step, values, consolidate='average', tags=N
self.consolidationFunc = consolidate
self.valuesPerPoint = 1
self.options = {}
self.pathExpression = name
self.pathExpression = pathExpression or name
self.xFilesFactor = xFilesFactor if xFilesFactor is not None else settings.DEFAULT_XFILES_FACTOR

if tags:
Expand Down
22 changes: 9 additions & 13 deletions webapp/graphite/render/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import re
import six

from graphite.render.grammar import grammar
from graphite.render.datalib import fetchData, TimeSeries, prefetchData
from graphite.storage import STORE
import six


def evaluateTarget(requestContext, targets, noPrefetch=False):
def evaluateTarget(requestContext, targets):
if not isinstance(targets, list):
targets = [targets]

if not noPrefetch:
pathExpressions = extractPathExpressions(requestContext, targets)
prefetchData(requestContext, pathExpressions)
pathExpressions = extractPathExpressions(requestContext, targets)
prefetchData(requestContext, pathExpressions)

seriesList = []

Expand Down Expand Up @@ -78,6 +76,9 @@ def evaluateTokens(requestContext, tokens, replacements=None, pipedArg=None):
# as tokens.template. this generally happens if you try to pass non-numeric/string args
raise ValueError("invalid template() syntax, only string/numeric arguments are allowed")

if tokens.call.funcname == 'seriesByTag':
return fetchData(requestContext, tokens.call.raw)

func = SeriesFunctions[tokens.call.funcname]
rawArgs = tokens.call.args or []
if pipedArg is not None:
Expand Down Expand Up @@ -145,14 +146,9 @@ def extractPathExpression(requestContext, tokens, replacements=None):
expression = expression.replace('$'+name, str(replacements[name]))
pathExpressions.add(expression)
elif tokens.call:
# if we're prefetching seriesByTag, look up the matching series and prefetch those
# if we're prefetching seriesByTag, pass the entire call back as a path expression
if tokens.call.funcname == 'seriesByTag':
if STORE.tagdb:
for series in STORE.tagdb.find_series(
tuple([t.string[1:-1] for t in tokens.call.args if t.string]),
requestContext=requestContext,
):
pathExpressions.add(series)
pathExpressions.add(tokens.call.raw)
else:
for a in tokens.call.args:
extractPathExpression(requestContext, a, replacements)
Expand Down
23 changes: 1 addition & 22 deletions webapp/graphite/render/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4303,28 +4303,7 @@ def seriesByTag(requestContext, *tagExpressions):
See :ref:`querying tagged series <querying-tagged-series>` for more detail.
"""

if STORE.tagdb is None:
log.info('seriesByTag called but no TagDB configured')
return []

taggedSeries = STORE.tagdb.find_series(tagExpressions, requestContext=requestContext)
if not taggedSeries:
return []

taggedSeriesQuery = 'group(' + ','.join(taggedSeries) + ')'

log.debug('taggedSeriesQuery %s' % taggedSeriesQuery)

seriesList = evaluateTarget(requestContext, taggedSeriesQuery, noPrefetch=True)

pathExpr = 'seriesByTag(%s)' % ','.join(['"%s"' % expr for expr in tagExpressions])
for series in seriesList:
series.pathExpression = pathExpr

log.debug('seriesByTag found [%s]' % ', '.join([series.name for series in seriesList]))

return seriesList
# the handling of seriesByTag is implemented in STORE.fetch

def groupByTags(requestContext, seriesList, callback, *tags):
"""
Expand Down
23 changes: 8 additions & 15 deletions webapp/graphite/render/glyph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
See the License for the specific language governing permissions and
limitations under the License."""

import math, itertools, re, sys
import math, itertools, re
from datetime import datetime, timedelta
from six.moves import range, zip
from six.moves.urllib.parse import unquote_plus
Expand All @@ -21,20 +21,13 @@
import pytz

from graphite.render.datalib import TimeSeries
from graphite.util import json
from graphite.util import json, BytesIO

try:
import cairocffi as cairo
except ImportError:
import cairo

# BytesIO is needed on py3 as StringIO does not operate on byte input anymore
# We could use BytesIO on py2 as well but it is slower than StringIO
if sys.version_info >= (3, 0):
from io import BytesIO as StringIO
else:
from cStringIO import StringIO

INFINITY = float('inf')

colorAliases = {
Expand Down Expand Up @@ -585,10 +578,10 @@ def setupCairo(self,outputFormat='png'):
if outputFormat == 'png':
self.surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, self.width, self.height)
elif outputFormat == 'svg':
self.surfaceData = StringIO.StringIO()
self.surfaceData = BytesIO()
self.surface = cairo.SVGSurface(self.surfaceData, self.width, self.height)
elif outputFormat == 'pdf':
self.surfaceData = StringIO.StringIO()
self.surfaceData = BytesIO()
self.surface = cairo.PDFSurface(self.surfaceData, self.width, self.height)
res_x, res_y = self.surface.get_fallback_resolution()
self.width = float(self.width / res_x) * 72
Expand Down Expand Up @@ -889,7 +882,7 @@ def output(self, fileObj):
metaData = { }

self.surface.finish()
svgData = self.surfaceData.getvalue()
svgData = str(self.surfaceData.getvalue())
self.surfaceData.close()

svgData = svgData.replace('pt"', 'px"', 2) # we expect height/width in pixels, not points
Expand All @@ -912,13 +905,13 @@ def onHeaderPath(match):
svgData += "</g>"
svgData = svgData.replace(' data-header="true"','')

fileObj.write(svgData)
fileObj.write("""<script>
fileObj.write(svgData.encode('utf-8'))
fileObj.write(("""<script>
<![CDATA[
metadata = %s
]]>
</script>
</svg>""" % json.dumps(metaData))
</svg>""" % json.dumps(metaData)).encode('utf-8'))


class LineGraph(Graph):
Expand Down
11 changes: 8 additions & 3 deletions webapp/graphite/render/grammar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Forward, Combine, Optional, Word, Literal, CaselessKeyword,
CaselessLiteral, Group, FollowedBy, LineEnd, OneOrMore, ZeroOrMore,
alphas, alphanums, printables, delimitedList, quotedString, Regex,
__version__, Suppress
__version__, Suppress, Empty
)

grammar = Forward()
Expand Down Expand Up @@ -71,14 +71,19 @@
args = delimitedList(~kwarg + arg) # lookahead to prevent failing on equals
kwargs = delimitedList(kwarg)

def setRaw(s, loc, toks):
toks[0].raw = s[toks[0].start:toks[0].end]

call = Group(
Empty().setParseAction(lambda s, l, t: l)('start') +
funcname + leftParen +
Optional(
args + Optional(
comma + kwargs
)
) + rightParen
)('call')
) + rightParen +
Empty().leaveWhitespace().setParseAction(lambda s, l, t: l)('end')
).setParseAction(setRaw)('call')

# Metric pattern (aka. pathExpression)
validMetricChars = ''.join((set(printables) - set(symbols)))
Expand Down
6 changes: 3 additions & 3 deletions webapp/graphite/render/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from graphite.compat import HttpResponse
from graphite.user_util import getProfileByUsername
from graphite.util import json, unpickle, pickle, msgpack, StringIO
from graphite.util import json, unpickle, pickle, msgpack, BytesIO
from graphite.storage import extractForwardHeaders
from graphite.logger import log
from graphite.render.evaluator import evaluateTarget
Expand Down Expand Up @@ -515,7 +515,7 @@ def delegateRendering(graphType, graphOptions, headers=None):
def renderLocalView(request):
try:
start = time()
reqParams = StringIO(request.body)
reqParams = BytesIO(request.body)
graphType = reqParams.readline().strip()
optionsPickle = reqParams.read()
reqParams.close()
Expand Down Expand Up @@ -569,7 +569,7 @@ def renderMyGraphView(request,username,graphName):


def doImageRender(graphClass, graphOptions):
pngData = StringIO()
pngData = BytesIO()
t = time()
img = graphClass(**graphOptions)
img.output(pngData)
Expand Down

0 comments on commit 4c5a7c4

Please sign in to comment.