Skip to content

Commit

Permalink
Merge pull request #218 from iain-buclaw-sociomantic/sync3
Browse files Browse the repository at this point in the history
Sync graphite-api with upstream graphite-web 1.0.1
  • Loading branch information
brutasse committed Jun 15, 2017
2 parents c056a8d + e9edbfe commit c57affe
Show file tree
Hide file tree
Showing 17 changed files with 1,315 additions and 100 deletions.
7 changes: 7 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ Extra sections
consistent hashing and a custom function, you need to point to the same
hashing function.

*hashing_type*
Type of metric hashing function. The default ``carbon_ch`` is Graphite's
traditional consistent-hashing implementation. Alternatively, you can use
``fnv1a_ch``, which supports the Fowler-Noll-Vo hash function (FNV-1a) hash
implementation offered by the carbon-c-relay project.
Default: ``carbon_ch``

*carbon_prefix*
Prefix for carbon's internal metrics. When querying metrics starting with
this prefix, requests are made to all carbon-cache instances instead of
Expand Down
48 changes: 45 additions & 3 deletions graphite_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def metrics_find():
until_time = None

format = RequestParams.get('format', 'treejson')
if format not in ['treejson', 'completer', 'nodelist']:
if format not in ['treejson', 'completer', 'nodelist', 'json']:
errors['format'] = 'unrecognized format: "{0}".'.format(format)

if 'query' not in RequestParams:
Expand Down Expand Up @@ -146,6 +146,9 @@ def metrics_find():
nodes = metric.path.split('.')
found.add(nodes[node_position])
return jsonify({'nodes': sorted(found)})
elif format == 'json':
content = json_nodes(matches)
return jsonify(content)

results = []
for node in matches:
Expand Down Expand Up @@ -245,6 +248,7 @@ def prune_datapoints(series, max_datapoints, start, end):

@app.route('/render', methods=methods)
def render():
start = time.time()
# Start with some defaults
errors = {}
graph_options = {
Expand Down Expand Up @@ -317,8 +321,9 @@ def render():
request_options['tzinfo'] = tzinfo

# Get the time interval for time-oriented graph types
until_time = parseATTime(RequestParams.get('until', 'now'), tzinfo)
from_time = parseATTime(RequestParams.get('from', '-1d'), tzinfo)
now = parseATTime(RequestParams.get('now', 'now'), tzinfo)
until_time = parseATTime(RequestParams.get('until', 'now'), tzinfo, now)
from_time = parseATTime(RequestParams.get('from', '-1d'), tzinfo, now)

start_time = min(from_time, until_time)
end_time = max(from_time, until_time)
Expand All @@ -327,6 +332,7 @@ def render():

request_options['startTime'] = start_time
request_options['endTime'] = end_time
request_options['now'] = now

template = dict()
for key in RequestParams.keys():
Expand All @@ -348,6 +354,8 @@ def render():
request_key = hash_request()
response = app.cache.get(request_key)
if response is not None:
logger.debug("cached response", time=(time.time() - start),
targets=targets)
return response

headers = {
Expand All @@ -362,12 +370,14 @@ def render():
context = {
'startTime': request_options['startTime'],
'endTime': request_options['endTime'],
'now': request_options['now'],
'tzinfo': request_options['tzinfo'],
'template': request_options['template'],
'data': [],
}

# Gather all data to take advantage of backends with fetch_multi
fdstart = time.time()
paths = []
for target in request_options['targets']:
if request_options['graphType'] == 'pie':
Expand All @@ -376,6 +386,7 @@ def render():
if target.strip():
paths += pathsFromTarget(context, target)
data_store = fetchData(context, paths)
logger.debug("fetched data", time=(time.time() - fdstart), paths=paths)

if request_options['graphType'] == 'pie':
for target in request_options['targets']:
Expand All @@ -401,7 +412,10 @@ def render():
for target in request_options['targets']:
if not target.strip():
continue
emstart = time.time()
series_list = evaluateTarget(context, target, data_store)
logger.debug("evaluated metric", time=(time.time() - emstart),
target=target)
context['data'].extend(series_list)

request_options['format'] = request_options.get('format')
Expand Down Expand Up @@ -454,6 +468,8 @@ def render():
response = jsonify(series_data, headers=headers)
if use_cache:
app.cache.add(request_key, response, cache_timeout)
logger.debug("rendered json", time=(time.time() - start),
targets=targets)
return response

if request_options['format'] == 'dygraph':
Expand All @@ -470,6 +486,8 @@ def render():
datapoints[i].append(point)
series_data = {'labels': labels, 'data': datapoints}

logger.debug("rendered dygraph", time=(time.time() - start),
targets=targets)
return jsonify(series_data, headers=headers)

if request_options['format'] == 'rickshaw':
Expand All @@ -480,6 +498,8 @@ def render():
for x, y in zip(timestamps, series)]
series_data.append(dict(target=series.name,
datapoints=datapoints))
logger.debug("rendered rickshaw", time=(time.time() - start),
targets=targets)
return jsonify(series_data, headers=headers)

if request_options['format'] == 'raw':
Expand All @@ -494,6 +514,8 @@ def render():
response = (response.read(), 200, headers)
if use_cache:
app.cache.add(request_key, response, cache_timeout)
logger.debug("rendered rawData", time=(time.time() - start),
targets=targets)
return response

if request_options['format'] == 'svg':
Expand Down Expand Up @@ -523,6 +545,7 @@ def render():

if use_cache:
app.cache.add(request_key, response, cache_timeout)
logger.debug("rendered graph", time=(time.time() - start), targets=targets)
return response


Expand Down Expand Up @@ -577,6 +600,25 @@ def tree_json(nodes, base_path, wildcards=False):
return results


def json_nodes(nodes):
nodes_info = []

for node in nodes:
info = {
'path': node.path,
'is_leaf': node.is_leaf,
'intervals': [],
}
if node.is_leaf:
for i in node.intervals:
interval = {'start': i.start, 'end': i.end}
info['intervals'].append(interval)

nodes_info.append(info)

return sorted(nodes_info, key=lambda item: item['path'])


def doImageRender(graphClass, graphOptions):
pngData = BytesIO()
img = graphClass(**graphOptions)
Expand Down
41 changes: 35 additions & 6 deletions graphite_api/carbonlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@
}


try:
import pyhash
hasher = pyhash.fnv1a_32()

def fnv32a(string, seed=0x811c9dc5):
return hasher(string, seed=seed)
except ImportError:
def fnv32a(string, seed=0x811c9dc5):
"""
FNV-1a Hash (http://isthe.com/chongo/tech/comp/fnv/) in Python.
Taken from https://gist.github.com/vaiorabbit/5670985
"""
hval = seed
fnv_32_prime = 0x01000193
uint32_max = 2 ** 32
for s in string:
hval = hval ^ ord(s)
hval = (hval * fnv_32_prime) % uint32_max
return hval


def allowed_module(module, name):
if module not in pickle_safe:
raise pickle.UnpicklingError(
Expand Down Expand Up @@ -61,25 +82,33 @@ def loads(cls, s):


class ConsistentHashRing(object):
def __init__(self, nodes, replica_count=100):
def __init__(self, nodes, replica_count=100, hash_type='carbon_ch'):
self.ring = []
self.ring_len = len(self.ring)
self.nodes = set()
self.nodes_len = len(self.nodes)
self.replica_count = replica_count
self.hash_type = hash_type
for node in nodes:
self.add_node(node)

def compute_ring_position(self, key):
big_hash = md5(str(key).encode()).hexdigest()
small_hash = int(big_hash[:4], 16)
if self.hash_type == 'fnv1a_ch':
big_hash = '{0:x}'.format(int(fnv32a(str(key))))
small_hash = int(big_hash[:4], 16) ^ int(big_hash[4:], 16)
else:
big_hash = md5(str(key).encode()).hexdigest()
small_hash = int(big_hash[:4], 16)
return small_hash

def add_node(self, key):
self.nodes.add(key)
self.nodes_len = len(self.nodes)
for i in range(self.replica_count):
replica_key = "%s:%d" % (key, i)
if self.hash_type == 'fnv1a_ch':
replica_key = "%d-%s" % (i, key[1])
else:
replica_key = "%s:%d" % (key, i)
position = self.compute_ring_position(replica_key)
entry = position, key
bisect.insort(self.ring, entry)
Expand Down Expand Up @@ -118,7 +147,7 @@ def get_nodes(self, key):
class CarbonLinkPool(object):
def __init__(self, hosts, timeout=1, retry_delay=15,
carbon_prefix='carbon', replication_factor=1,
hashing_keyfunc=lambda x: x):
hashing_keyfunc=lambda x: x, hashing_type='carbon_ch'):
self.carbon_prefix = carbon_prefix
self.retry_delay = retry_delay
self.hosts = []
Expand All @@ -140,7 +169,7 @@ def __init__(self, hosts, timeout=1, retry_delay=15,
replication_factor, len(servers)))
self.replication_factor = replication_factor

self.hash_ring = ConsistentHashRing(self.hosts)
self.hash_ring = ConsistentHashRing(self.hosts, hash_type=hashing_type)
self.keyfunc = hashing_keyfunc
self.connections = {}
self.last_failure = {}
Expand Down
58 changes: 42 additions & 16 deletions graphite_api/finders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import fnmatch
import os.path
import re

EXPAND_BRACES_RE = re.compile(r'.*(\{.*?[^\\]?\})')


def get_real_metric_path(absolute_path, metric_path):
# Support symbolic links (real_metric_path ensures proper cache queries)
if os.path.islink(absolute_path):
real_fs_path = os.path.realpath(absolute_path)
real_fs_path = os.path.realpath(absolute_path)
if absolute_path != real_fs_path:
relative_fs_path = metric_path.replace('.', os.sep)
base_fs_path = absolute_path[:-len(relative_fs_path)]
relative_real_fs_path = real_fs_path[len(base_fs_path):]
abs_fs_path = os.path.dirname(absolute_path[:-len(relative_fs_path)])
base_fs_path = os.path.realpath(abs_fs_path)
relative_real_fs_path = real_fs_path[len(base_fs_path):].lstrip('/')
return fs_to_metric(relative_real_fs_path)

return metric_path
Expand Down Expand Up @@ -42,20 +46,42 @@ def extract_variants(pattern):
def match_entries(entries, pattern):
"""A drop-in replacement for fnmatch.filter that supports pattern
variants (ie. {foo,bar}baz = foobaz or barbaz)."""
v1, v2 = pattern.find('{'), pattern.find('}')
matching = []

if v1 > -1 and v2 > v1:
variations = pattern[v1+1:v2].split(',')
variants = [pattern[:v1] + v + pattern[v2+1:] for v in variations]
matching = []
for variant in expand_braces(pattern):
matching.extend(fnmatch.filter(entries, variant))

for variant in variants:
matching.extend(fnmatch.filter(entries, variant))
return list(_deduplicate(matching))

# remove dupes without changing order
return list(_deduplicate(matching))

def expand_braces(pattern):
"""Find the rightmost, innermost set of braces and, if it contains a
comma-separated list, expand its contents recursively (any of its items
may itself be a list enclosed in braces).
Return the full list of expanded strings.
"""
res = set()

# Used instead of s.strip('{}') because strip is greedy.
# We want to remove only ONE leading { and ONE trailing }, if both exist
def remove_outer_braces(s):
if s[0] == '{' and s[-1] == '}':
return s[1:-1]
return s

match = EXPAND_BRACES_RE.search(pattern)
if match is not None:
sub = match.group(1)
v1, v2 = match.span(1)
if "," in sub:
for pat in sub.strip('{}').split(','):
subpattern = pattern[:v1] + pat + pattern[v2:]
res.update(expand_braces(subpattern))
else:
subpattern = pattern[:v1] + remove_outer_braces(sub) + pattern[v2:]
res.update(expand_braces(subpattern))
else:
matching = fnmatch.filter(entries, pattern)
matching.sort()
return matching
res.add(pattern.replace('\\}', '}'))

return list(res)

0 comments on commit c57affe

Please sign in to comment.