diff --git a/webapp/graphite/finders/__init__.py b/webapp/graphite/finders/__init__.py index cd7da44b4..cc7396d50 100644 --- a/webapp/graphite/finders/__init__.py +++ b/webapp/graphite/finders/__init__.py @@ -4,79 +4,85 @@ EXPAND_BRACES_RE = re.compile(r'.*(\{.*?[^\\]?\})') + def get_real_metric_path(absolute_path, metric_path): - # Support symbolic links (real_metric_path ensures proper cache queries) - real_fs_path = os.path.realpath(absolute_path) - if absolute_path != real_fs_path: - relative_fs_path = metric_path.replace('.', os.sep) - base_fs_path = os.path.dirname(absolute_path[:-len(relative_fs_path)]) - real_base_fs_path = os.path.realpath(base_fs_path) - relative_real_fs_path = real_fs_path[len(real_base_fs_path):].lstrip('/') - return fs_to_metric(relative_real_fs_path) + # Support symbolic links (real_metric_path ensures proper cache queries) + real_fs_path = os.path.realpath(absolute_path) + if absolute_path != real_fs_path: + relative_fs_path = metric_path.replace('.', os.sep) + base_fs_path = os.path.dirname(absolute_path[:-len(relative_fs_path)]) + real_base_fs_path = os.path.realpath(base_fs_path) + relative_real_fs_path = real_fs_path[len( + real_base_fs_path):].lstrip('/') + return fs_to_metric(relative_real_fs_path) - return metric_path + return metric_path def fs_to_metric(path): - dirpath = os.path.dirname(path) - filename = os.path.basename(path) - return os.path.join(dirpath, filename.split('.')[0]).replace(os.sep,'.') + dirpath = os.path.dirname(path) + filename = os.path.basename(path) + return os.path.join(dirpath, filename.split('.')[0]).replace(os.sep, '.') def _deduplicate(entries): - yielded = set() - for entry in entries: - if entry not in yielded: - yielded.add(entry) - yield entry + yielded = set() + for entry in entries: + if entry not in yielded: + yielded.add(entry) + yield entry def extract_variants(pattern): - """Extract the pattern variants (ie. {foo,bar}baz = foobaz or barbaz).""" - v1, v2 = pattern.find('{'), pattern.find('}') - if v1 > -1 and v2 > v1: - variations = pattern[v1+1:v2].split(',') - variants = [ pattern[:v1] + v + pattern[v2+1:] for v in variations ] + """Extract the pattern variants (ie. {foo,bar}baz = foobaz or barbaz).""" + v1, v2 = pattern.find('{'), pattern.find('}') + if v1 > -1 and v2 > v1: + variations = pattern[v1 + 1:v2].split(',') + variants = [pattern[:v1] + v + pattern[v2 + 1:] for v in variations] - else: - variants = [ pattern ] - return list( _deduplicate(variants) ) + else: + variants = [pattern] + return list(_deduplicate(variants)) def match_entries(entries, pattern): - # First we check for pattern variants (ie. {foo,bar}baz = foobaz or barbaz) - matching = [] + # First we check for pattern variants (ie. {foo,bar}baz = foobaz or barbaz) + matching = [] - for variant in expand_braces(pattern): - matching.extend(fnmatch.filter(entries, variant)) + for variant in expand_braces(pattern): + matching.extend(fnmatch.filter(entries, variant)) - return list(_deduplicate(matching)) + return list(_deduplicate(matching)) """ Brace expanding patch for python3 borrowed from: https://bugs.python.org/issue9584 """ + + def expand_braces(s): - res = list() - - # Used instead of s.strip('{}') because strip is greedy. - # We want to remove only ONE leading { and ONE trailing }, if both exist - def remove_outer_braces(s): - if s[0]== '{' and s[-1]=='}': - return s[1:-1] - return s - - m = EXPAND_BRACES_RE.search(s) - if m is not None: - sub = m.group(1) - open_brace, close_brace = m.span(1) - if ',' in sub: - for pat in sub.strip('{}').split(','): - res.extend(expand_braces(s[:open_brace] + pat + s[close_brace:])) + res = list() + + # Used instead of s.strip('{}') because strip is greedy. + # We want to remove only ONE leading { and ONE trailing }, if both exist + def remove_outer_braces(s): + if s[0] == '{' and s[-1] == '}': + return s[1:-1] + return s + + m = EXPAND_BRACES_RE.search(s) + if m is not None: + sub = m.group(1) + open_brace, close_brace = m.span(1) + if ',' in sub: + for pat in sub.strip('{}').split(','): + res.extend(expand_braces( + s[:open_brace] + pat + s[close_brace:])) + else: + res.extend(expand_braces( + s[:open_brace] + remove_outer_braces(sub) + s[close_brace:])) else: - res.extend(expand_braces(s[:open_brace] + remove_outer_braces(sub) + s[close_brace:])) - else: - res.append(s.replace('\\}', '}')) + res.append(s.replace('\\}', '}')) - return list(set(res)) + return list(set(res)) diff --git a/webapp/graphite/finders/ceres.py b/webapp/graphite/finders/ceres.py index d48c7443e..7f8a83377 100644 --- a/webapp/graphite/finders/ceres.py +++ b/webapp/graphite/finders/ceres.py @@ -12,26 +12,28 @@ class CeresFinder: - def __init__(self, directory=None): - directory = directory or settings.CERES_DIR - self.directory = directory - self.tree = CeresTree(directory) + def __init__(self, directory=None): + directory = directory or settings.CERES_DIR + self.directory = directory + self.tree = CeresTree(directory) - def find_nodes(self, query): + def find_nodes(self, query): - variants = extract_variants(query.pattern) + variants = extract_variants(query.pattern) - for variant in variants: - for fs_path in glob( self.tree.getFilesystemPath(variant)): - metric_path = self.tree.getNodePath(fs_path) + for variant in variants: + for fs_path in glob(self.tree.getFilesystemPath(variant)): + metric_path = self.tree.getNodePath(fs_path) - if CeresNode.isNodeDir(fs_path): - ceres_node = self.tree.getNode(metric_path) + if CeresNode.isNodeDir(fs_path): + ceres_node = self.tree.getNode(metric_path) - if ceres_node.hasDataForInterval(query.startTime, query.endTime): - real_metric_path = get_real_metric_path(fs_path, metric_path) - reader = CeresReader(ceres_node, real_metric_path) - yield LeafNode(metric_path, reader) + if ceres_node.hasDataForInterval( + query.startTime, query.endTime): + real_metric_path = get_real_metric_path( + fs_path, metric_path) + reader = CeresReader(ceres_node, real_metric_path) + yield LeafNode(metric_path, reader) - elif os.path.isdir(fs_path): - yield BranchNode(metric_path) + elif os.path.isdir(fs_path): + yield BranchNode(metric_path) diff --git a/webapp/graphite/finders/remote.py b/webapp/graphite/finders/remote.py new file mode 100644 index 000000000..6448c39f5 --- /dev/null +++ b/webapp/graphite/finders/remote.py @@ -0,0 +1,173 @@ +import time + +from urllib import urlencode +from threading import current_thread + +from django.conf import settings +from django.core.cache import cache + +from graphite.http_pool import http +from graphite.intervals import Interval, IntervalSet +from graphite.logger import log +from graphite.node import LeafNode, BranchNode +from graphite.render.hashing import compactHash +from graphite.util import unpickle, logtime, timebounds + +from graphite.readers.remote import RemoteReader + + +def prefetchRemoteData(remote_stores, requestContext, pathExpressions): + if requestContext['localOnly']: + return + + if requestContext is None: + requestContext = {} + + if pathExpressions is None: + return + + (startTime, endTime, now) = timebounds(requestContext) + log.info( + 'thread %s prefetchRemoteData:: Starting fetch_list on all backends' % + current_thread().name) + + # Go through all of the remote nodes, and launch a fetch for each one. + # Each fetch will take place in its own thread, since it's naturally + # parallel work. + for store in remote_stores: + reader = RemoteReader(store, + {'intervals': []}, + bulk_query=pathExpressions) + reader.fetch_list(startTime, endTime, now, requestContext) + + +class RemoteStore(object): + + def __init__(self, host): + self.host = host + self.last_failure = 0 + + @property + def available(self): + return time.time() - self.last_failure > settings.REMOTE_RETRY_DELAY + + def find(self, query, headers=None): + return list(FindRequest(self, query).send(headers)) + + def fail(self): + self.last_failure = time.time() + + +class FindRequest(object): + __slots__ = ('store', 'query', 'cacheKey') + + def __init__(self, store, query): + self.store = store + self.query = query + + if query.startTime: + start = query.startTime - \ + (query.startTime % settings.FIND_CACHE_DURATION) + else: + start = "" + + if query.endTime: + end = query.endTime - (query.endTime % + settings.FIND_CACHE_DURATION) + else: + end = "" + + self.cacheKey = "find:%s:%s:%s:%s" % ( + store.host, compactHash(query.pattern), start, end) + + @logtime(custom_msg=True) + def send(self, headers=None, msg_setter=None): + log.info( + "FindRequest.send(host=%s, query=%s) called" % + (self.store.host, self.query)) + + if headers is None: + headers = {} + + results = cache.get(self.cacheKey) + if results is not None: + log.info( + "FindRequest.send(host=%s, query=%s) using cached result" % + (self.store.host, self.query)) + else: + url = "%s://%s/metrics/find/" % ( + 'https' if settings.INTRACLUSTER_HTTPS else 'http', self.store.host) + + query_params = [ + ('local', '1'), + ('format', 'pickle'), + ('query', self.query.pattern), + ] + if self.query.startTime: + query_params.append(('from', self.query.startTime)) + + if self.query.endTime: + query_params.append(('until', self.query.endTime)) + + try: + result = http.request( + 'POST' if settings.REMOTE_STORE_USE_POST else 'GET', + url, + fields=query_params, + headers=headers, + timeout=settings.REMOTE_FIND_TIMEOUT) + except BaseException: + log.exception( + "FindRequest.send(host=%s, query=%s) exception during request" % + (self.store.host, self.query)) + self.store.fail() + return + + if result.status != 200: + log.exception( + "FindRequest.send(host=%s, query=%s) error response %d from %s?%s" % + (self.store.host, self.query, result.status, url, urlencode(query_params))) + self.store.fail() + return + + try: + results = unpickle.loads(result.data) + except BaseException: + log.exception( + "FindRequest.send(host=%s, query=%s) exception processing response" % + (self.store.host, self.query)) + self.store.fail() + return + + cache.set(self.cacheKey, results, settings.FIND_CACHE_DURATION) + + msg_setter( + 'host: {host}, query: {query}'.format( + host=self.store.host, + query=self.query)) + + for node_info in results: + # handle both 1.x and 0.9.x output + path = node_info.get('path') or node_info.get('metric_path') + is_leaf = node_info.get('is_leaf') or node_info.get('isLeaf') + intervals = node_info.get('intervals') or [] + if not isinstance(intervals, IntervalSet): + intervals = IntervalSet( + [Interval(interval[0], interval[1]) for interval in intervals]) + + node_info = { + 'is_leaf': is_leaf, + 'path': path, + 'intervals': intervals, + } + + if is_leaf: + reader = RemoteReader( + self.store, node_info, bulk_query=[ + self.query.pattern]) + node = LeafNode(path, reader) + else: + node = BranchNode(path) + + node.local = False + yield node diff --git a/webapp/graphite/finders/standard.py b/webapp/graphite/finders/standard.py index 8f327dc65..3121d2cc5 100644 --- a/webapp/graphite/finders/standard.py +++ b/webapp/graphite/finders/standard.py @@ -16,112 +16,138 @@ from . import fs_to_metric, get_real_metric_path, match_entries -class StandardFinder: - DATASOURCE_DELIMITER = '::RRD_DATASOURCE::' - - def __init__(self, directories=None): - directories = directories or settings.STANDARD_DIRS - self.directories = directories - - def find_nodes(self, query): - clean_pattern = query.pattern.replace('\\', '') - pattern_parts = clean_pattern.split('.') - - for root_dir in self.directories: - for absolute_path in self._find_paths(root_dir, pattern_parts): - if basename(absolute_path).startswith('.'): - continue +class StandardFinder(object): + DATASOURCE_DELIMITER = '::RRD_DATASOURCE::' + + def __init__(self, directories=None): + directories = directories or settings.STANDARD_DIRS + self.directories = directories + + def find_nodes(self, query): + clean_pattern = query.pattern.replace('\\', '') + pattern_parts = clean_pattern.split('.') + + for root_dir in self.directories: + for absolute_path in self._find_paths(root_dir, pattern_parts): + if basename(absolute_path).startswith('.'): + continue + + if self.DATASOURCE_DELIMITER in basename(absolute_path): + (absolute_path, datasource_pattern) = absolute_path.rsplit( + self.DATASOURCE_DELIMITER, 1) + else: + datasource_pattern = None + + relative_path = absolute_path[len(root_dir):].lstrip('/') + metric_path = fs_to_metric(relative_path) + real_metric_path = get_real_metric_path( + absolute_path, metric_path) + + metric_path_parts = metric_path.split('.') + for field_index in find_escaped_pattern_fields(query.pattern): + metric_path_parts[field_index] = pattern_parts[field_index].replace( + '\\', '') + metric_path = '.'.join(metric_path_parts) + + # Now we construct and yield an appropriate Node object + if isdir(absolute_path): + yield BranchNode(metric_path) + + elif isfile(absolute_path): + if absolute_path.endswith( + '.wsp') and WhisperReader.supported: + reader = WhisperReader(absolute_path, real_metric_path) + yield LeafNode(metric_path, reader) + + elif absolute_path.endswith('.wsp.gz') and GzippedWhisperReader.supported: + reader = GzippedWhisperReader( + absolute_path, real_metric_path) + yield LeafNode(metric_path, reader) + + elif absolute_path.endswith('.rrd') and RRDReader.supported: + if datasource_pattern is None: + yield BranchNode(metric_path) + + else: + for datasource_name in RRDReader.get_datasources( + absolute_path): + if match_entries( + [datasource_name], datasource_pattern): + reader = RRDReader( + absolute_path, datasource_name) + yield LeafNode(metric_path + "." + datasource_name, reader) + + def _find_paths(self, current_dir, patterns): + """Recursively generates absolute paths whose components underneath current_dir + match the corresponding pattern in patterns""" + pattern = patterns[0] + patterns = patterns[1:] + + has_wildcard = pattern.find( + '{') > -1 or pattern.find('[') > -1 or pattern.find('*') > -1 or pattern.find('?') > -1 + using_globstar = pattern == "**" + + if has_wildcard: # this avoids os.listdir() for performance + try: + entries = [x.name for x in scandir(current_dir)] + except OSError as e: + log.exception(e) + entries = [] + else: + entries = [pattern] - if self.DATASOURCE_DELIMITER in basename(absolute_path): - (absolute_path, datasource_pattern) = absolute_path.rsplit(self.DATASOURCE_DELIMITER, 1) + if using_globstar: + matching_subdirs = map(operator.itemgetter(0), walk(current_dir)) else: - datasource_pattern = None - - relative_path = absolute_path[ len(root_dir): ].lstrip('/') - metric_path = fs_to_metric(relative_path) - real_metric_path = get_real_metric_path(absolute_path, metric_path) - - metric_path_parts = metric_path.split('.') - for field_index in find_escaped_pattern_fields(query.pattern): - metric_path_parts[field_index] = pattern_parts[field_index].replace('\\', '') - metric_path = '.'.join(metric_path_parts) - - # Now we construct and yield an appropriate Node object - if isdir(absolute_path): - yield BranchNode(metric_path) - - elif isfile(absolute_path): - if absolute_path.endswith('.wsp') and WhisperReader.supported: - reader = WhisperReader(absolute_path, real_metric_path) - yield LeafNode(metric_path, reader) - - elif absolute_path.endswith('.wsp.gz') and GzippedWhisperReader.supported: - reader = GzippedWhisperReader(absolute_path, real_metric_path) - yield LeafNode(metric_path, reader) - - elif absolute_path.endswith('.rrd') and RRDReader.supported: - if datasource_pattern is None: - yield BranchNode(metric_path) - - else: - for datasource_name in RRDReader.get_datasources(absolute_path): - if match_entries([datasource_name], datasource_pattern): - reader = RRDReader(absolute_path, datasource_name) - yield LeafNode(metric_path + "." + datasource_name, reader) - - def _find_paths(self, current_dir, patterns): - """Recursively generates absolute paths whose components underneath current_dir - match the corresponding pattern in patterns""" - pattern = patterns[0] - patterns = patterns[1:] - - has_wildcard = pattern.find('{') > -1 or pattern.find('[') > -1 or pattern.find('*') > -1 or pattern.find('?') > -1 - using_globstar = pattern == "**" - - if has_wildcard: # this avoids os.listdir() for performance - try: - entries = [x.name for x in scandir(current_dir)] - except OSError as e: - log.exception(e) - entries = [] - else: - entries = [ pattern ] - - if using_globstar: - matching_subdirs = map(operator.itemgetter(0), walk(current_dir)) - else: - subdirs = [entry for entry in entries if isdir(join(current_dir, entry))] - matching_subdirs = match_entries(subdirs, pattern) - - # if this is a terminal globstar, add a pattern for all files in subdirs - if using_globstar and not patterns: - patterns = ["*"] - - if len(patterns) == 1 and RRDReader.supported: #the last pattern may apply to RRD data sources - if not has_wildcard: - entries = [ pattern + ".rrd" ] - files = [entry for entry in entries if isfile(join(current_dir, entry))] - rrd_files = match_entries(files, pattern + ".rrd") - - if rrd_files: #let's assume it does - datasource_pattern = patterns[0] - - for rrd_file in rrd_files: - absolute_path = join(current_dir, rrd_file) - yield absolute_path + self.DATASOURCE_DELIMITER + datasource_pattern - - if patterns: #we've still got more directories to traverse - for subdir in matching_subdirs: - - absolute_path = join(current_dir, subdir) - for match in self._find_paths(absolute_path, patterns): - yield match - - else: #we've got the last pattern - if not has_wildcard: - entries = [ pattern + '.wsp', pattern + '.wsp.gz', pattern + '.rrd' ] - files = [entry for entry in entries if isfile(join(current_dir, entry))] - matching_files = match_entries(files, pattern + '.*') - - for base_name in matching_files + matching_subdirs: - yield join(current_dir, base_name) + subdirs = [ + entry for entry in entries if isdir( + join( + current_dir, + entry))] + matching_subdirs = match_entries(subdirs, pattern) + + # if this is a terminal globstar, add a pattern for all files in + # subdirs + if using_globstar and not patterns: + patterns = ["*"] + + # the last pattern may apply to RRD data sources + if len(patterns) == 1 and RRDReader.supported: + if not has_wildcard: + entries = [pattern + ".rrd"] + files = [ + entry for entry in entries if isfile( + join( + current_dir, + entry))] + rrd_files = match_entries(files, pattern + ".rrd") + + if rrd_files: # let's assume it does + datasource_pattern = patterns[0] + + for rrd_file in rrd_files: + absolute_path = join(current_dir, rrd_file) + yield absolute_path + self.DATASOURCE_DELIMITER + datasource_pattern + + if patterns: # we've still got more directories to traverse + for subdir in matching_subdirs: + + absolute_path = join(current_dir, subdir) + for match in self._find_paths(absolute_path, patterns): + yield match + + else: # we've got the last pattern + if not has_wildcard: + entries = [ + pattern + '.wsp', + pattern + '.wsp.gz', + pattern + '.rrd'] + files = [ + entry for entry in entries if isfile( + join( + current_dir, + entry))] + matching_files = match_entries(files, pattern + '.*') + + for base_name in matching_files + matching_subdirs: + yield join(current_dir, base_name) diff --git a/webapp/graphite/finders/utils.py b/webapp/graphite/finders/utils.py new file mode 100644 index 000000000..5937356c1 --- /dev/null +++ b/webapp/graphite/finders/utils.py @@ -0,0 +1,12 @@ +"""Utility functions for finders.""" + +from django.conf import settings + + +def extractForwardHeaders(request): + headers = {} + for name in settings.REMOTE_STORE_FORWARD_HEADERS: + value = request.META.get('HTTP_%s' % name.upper().replace('-', '_')) + if value is not None: + headers[name] = value + return headers diff --git a/webapp/graphite/http_pool.py b/webapp/graphite/http_pool.py new file mode 100644 index 000000000..3f913d101 --- /dev/null +++ b/webapp/graphite/http_pool.py @@ -0,0 +1,4 @@ +"""Shared urllib3 pool.""" +import urllib3 + +http = urllib3.PoolManager(num_pools=10, maxsize=5) diff --git a/webapp/graphite/local_settings.py.example b/webapp/graphite/local_settings.py.example index 907bfd69d..0c04ce604 100644 --- a/webapp/graphite/local_settings.py.example +++ b/webapp/graphite/local_settings.py.example @@ -115,6 +115,15 @@ # Data directories using the "Standard" metrics finder (i.e. not Ceres) #STANDARD_DIRS = [WHISPER_DIR, RRD_DIR] # Default: set from the above variables +## Data finders +# It is possible to use an alternate storage layer than the default, Whisper, +# in order to accommodate specific needs. +# See: http://graphite.readthedocs.io/en/latest/storage-backends.html +# +# STORAGE_FINDERS = ( +# 'graphite.finders.standard.StandardFinder', +# 'graphite.finders.ceres.CeresFinder', +# ) ##################################### # Email Configuration # diff --git a/webapp/graphite/readers.py b/webapp/graphite/readers.py deleted file mode 100644 index f75a8b8e4..000000000 --- a/webapp/graphite/readers.py +++ /dev/null @@ -1,364 +0,0 @@ -import os -import sys -import time -# Use the built-in version of scandir/stat if possible, otherwise -# use the scandir module version -try: - from os import scandir, stat # noqa # pylint: disable=unused-import -except ImportError: - from scandir import scandir, stat # noqa # pylint: disable=unused-import - -from graphite.intervals import Interval, IntervalSet -from graphite.logger import log -from django.conf import settings - -try: - import whisper -except ImportError: - whisper = False - -# The parser was repalcing __readHeader with the __readHeader -# which was not working. -if bool(whisper): - whisper__readHeader = whisper.__readHeader - -try: - import ceres -except ImportError: - ceres = False - -try: - import rrdtool -except ImportError: - rrdtool = False - -try: - import gzip -except ImportError: - gzip = False - - -def CarbonLink(): - """Return a carbonlink instance.""" - # Late import to avoid pulling out too many dependencies with - # readers.py which is usually imported by plugins. - from graphite.carbonlink import CarbonLink - return CarbonLink() - - -class FetchInProgress(object): - def __init__(self, wait_callback): - self.wait_callback = wait_callback - - def waitForResults(self): - return self.wait_callback() - - -class MultiReader(object): - __slots__ = ('nodes',) - - def __init__(self, nodes): - self.nodes = nodes - - def get_intervals(self): - interval_sets = [] - for node in self.nodes: - interval_sets.extend( node.intervals.intervals ) - return IntervalSet( sorted(interval_sets) ) - - def fetch(self, startTime, endTime, now=None, requestContext=None): - # Start the fetch on each node - fetches = [] - - for n in self.nodes: - try: - fetches.append(n.fetch(startTime, endTime, now, requestContext)) - except: - log.exception("Failed to initiate subfetch for %s" % str(n)) - - def merge_results(): - results = {} - - # Wait for any asynchronous operations to complete - for i, result in enumerate(fetches): - if isinstance(result, FetchInProgress): - try: - results[i] = result.waitForResults() - except: - log.exception("Failed to complete subfetch") - results[i] = None - else: - results[i] = result - - results = [r for r in results.values() if r is not None] - if not results: - raise Exception("All sub-fetches failed") - - return reduce(self.merge, results) - - return FetchInProgress(merge_results) - - def merge(self, results1, results2): - # Ensure results1 is finer than results2 - if results1[0][2] > results2[0][2]: - results1, results2 = results2, results1 - - time_info1, values1 = results1 - time_info2, values2 = results2 - start1, end1, step1 = time_info1 - start2, end2, step2 = time_info2 - - step = step1 # finest step - start = min(start1, start2) # earliest start - end = max(end1, end2) # latest end - time_info = (start, end, step) - values = [] - - t = start - while t < end: - # Look for the finer precision value first if available - i1 = (t - start1) / step1 - - if len(values1) > i1: - v1 = values1[i1] - else: - v1 = None - - if v1 is None: - i2 = (t - start2) / step2 - - if len(values2) > i2: - v2 = values2[i2] - else: - v2 = None - - values.append(v2) - else: - values.append(v1) - - t += step - - return (time_info, values) - - -class CeresReader(object): - __slots__ = ('ceres_node', 'real_metric_path') - supported = bool(ceres) - - def __init__(self, ceres_node, real_metric_path): - self.ceres_node = ceres_node - self.real_metric_path = real_metric_path - - def get_intervals(self): - intervals = [] - for info in self.ceres_node.slice_info: - (start, end, step) = info - intervals.append( Interval(start, end) ) - - return IntervalSet(intervals) - - def fetch(self, startTime, endTime): - data = self.ceres_node.read(startTime, endTime) - time_info = (data.startTime, data.endTime, data.timeStep) - values = list(data.values) - - # Merge in data from carbon's cache - try: - cached_datapoints = CarbonLink().query(self.real_metric_path) - except: - log.exception("Failed CarbonLink query '%s'" % self.real_metric_path) - cached_datapoints = [] - - values = merge_with_cache(cached_datapoints, - data.startTime, - data.timeStep, - values) - - return time_info, values - - -class WhisperReader(object): - __slots__ = ('fs_path', 'real_metric_path') - supported = bool(whisper) - - def __init__(self, fs_path, real_metric_path): - self.fs_path = fs_path - self.real_metric_path = real_metric_path - - def get_intervals(self): - start = time.time() - whisper.info(self.fs_path)['maxRetention'] - end = max( stat(self.fs_path).st_mtime, start ) - return IntervalSet( [Interval(start, end)] ) - - def fetch(self, startTime, endTime): - try: - data = whisper.fetch(self.fs_path, startTime, endTime) - except IOError: - log.exception("Failed fetch of whisper file '%s'" % self.fs_path) - return None - if not data: - return None - - time_info, values = data - (start,end,step) = time_info - - meta_info = whisper.info(self.fs_path) - aggregation_method = meta_info['aggregationMethod'] - lowest_step = min([i['secondsPerPoint'] for i in meta_info['archives']]) - # Merge in data from carbon's cache - cached_datapoints = [] - try: - cached_datapoints = CarbonLink().query(self.real_metric_path) - except: - log.exception("Failed CarbonLink query '%s'" % self.real_metric_path) - cached_datapoints = [] - - if isinstance(cached_datapoints, dict): - cached_datapoints = cached_datapoints.items() - - values = merge_with_cache(cached_datapoints, - start, - step, - values, - aggregation_method) - - return time_info, values - - -class GzippedWhisperReader(WhisperReader): - supported = bool(whisper and gzip) - - def get_intervals(self): - fh = gzip.GzipFile(self.fs_path, 'rb') - try: - info = whisper__readHeader(fh) # evil, but necessary. - finally: - fh.close() - - start = time.time() - info['maxRetention'] - end = max( stat(self.fs_path).st_mtime, start ) - return IntervalSet( [Interval(start, end)] ) - - def fetch(self, startTime, endTime): - fh = gzip.GzipFile(self.fs_path, 'rb') - try: - return whisper.file_fetch(fh, startTime, endTime) - finally: - fh.close() - - -class RRDReader: - supported = bool(rrdtool) - - @staticmethod - def _convert_fs_path(fs_path): - if isinstance(fs_path, unicode): - fs_path = fs_path.encode(sys.getfilesystemencoding()) - return os.path.realpath(fs_path) - - def __init__(self, fs_path, datasource_name): - self.fs_path = RRDReader._convert_fs_path(fs_path) - self.datasource_name = datasource_name - - def get_intervals(self): - start = time.time() - self.get_retention(self.fs_path) - end = max( stat(self.fs_path).st_mtime, start ) - return IntervalSet( [Interval(start, end)] ) - - def fetch(self, startTime, endTime): - startString = time.strftime("%H:%M_%Y%m%d+%Ss", time.localtime(startTime)) - endString = time.strftime("%H:%M_%Y%m%d+%Ss", time.localtime(endTime)) - - if settings.FLUSHRRDCACHED: - rrdtool.flushcached(self.fs_path, '--daemon', settings.FLUSHRRDCACHED) - - (timeInfo, columns, rows) = rrdtool.fetch(self.fs_path,settings.RRD_CF,'-s' + startString,'-e' + endString) - colIndex = list(columns).index(self.datasource_name) - rows.pop() #chop off the latest value because RRD returns crazy last values sometimes - values = (row[colIndex] for row in rows) - - return (timeInfo, values) - - @staticmethod - def get_datasources(fs_path): - info = rrdtool.info(RRDReader._convert_fs_path(fs_path)) - - if 'ds' in info: - return [datasource_name for datasource_name in info['ds']] - else: - ds_keys = [ key for key in info if key.startswith('ds[') ] - datasources = set( key[3:].split(']')[0] for key in ds_keys ) - return list(datasources) - - @staticmethod - def get_retention(fs_path): - info = rrdtool.info(RRDReader._convert_fs_path(fs_path)) - if 'rra' in info: - rras = info['rra'] - else: - # Ugh, I like the old python-rrdtool api better.. - rra_count = max([ int(key[4]) for key in info if key.startswith('rra[') ]) + 1 - rras = [{}] * rra_count - for i in range(rra_count): - rras[i]['pdp_per_row'] = info['rra[%d].pdp_per_row' % i] - rras[i]['rows'] = info['rra[%d].rows' % i] - - retention_points = 0 - for rra in rras: - points = rra['pdp_per_row'] * rra['rows'] - if points > retention_points: - retention_points = points - - return retention_points * info['step'] - - -def merge_with_cache(cached_datapoints, start, step, values, func=None): - - consolidated=[] - - # Similar to the function in render/datalib:TimeSeries - def consolidate(func, values): - usable = [v for v in values if v is not None] - if not usable: return None - if func == 'sum': - return sum(usable) - if func == 'average': - return float(sum(usable)) / len(usable) - if func == 'max': - return max(usable) - if func == 'min': - return min(usable) - if func == 'last': - return usable[-1] - raise Exception("Invalid consolidation function: '%s'" % func) - - if func: - consolidated_dict = {} - for (timestamp, value) in cached_datapoints: - interval = timestamp - (timestamp % step) - if interval in consolidated_dict: - consolidated_dict[interval].append(value) - else: - consolidated_dict[interval] = [value] - for interval in consolidated_dict: - value = consolidate(func, consolidated_dict[interval]) - consolidated.append((interval, value)) - - else: - consolidated = cached_datapoints - - for (interval, value) in consolidated: - try: - i = int(interval - start) / step - if i < 0: - # cached data point is earlier then the requested data point. - # meaning we can definitely ignore the cache result. - # note that we cannot rely on the 'except' - # in this case since 'values[-n]=' - # is equivalent to 'values[len(values) - n]=' - continue - values[i] = value - except: - pass - - return values diff --git a/webapp/graphite/readers/__init__.py b/webapp/graphite/readers/__init__.py new file mode 100644 index 000000000..4232e0d57 --- /dev/null +++ b/webapp/graphite/readers/__init__.py @@ -0,0 +1,5 @@ +# Import some symbols to avoid breaking compatibility. +from graphite.readers.utils import CarbonLink, merge_with_cache, FetchInProgress, MultiReader # noqa # pylint: disable=unused-import +from graphite.readers.whisper import WhisperReader, GzippedWhisperReader # noqa # pylint: disable=unused-import +from graphite.readers.ceres import CeresReader # noqa # pylint: disable=unused-import +from graphite.readers.rrd import RRDReader # noqa # pylint: disable=unused-import diff --git a/webapp/graphite/readers/ceres.py b/webapp/graphite/readers/ceres.py new file mode 100644 index 000000000..8cab42d1b --- /dev/null +++ b/webapp/graphite/readers/ceres.py @@ -0,0 +1,48 @@ +from __future__ import absolute_import + +from graphite.intervals import Interval, IntervalSet +from graphite.logger import log + +from graphite.readers.utils import CarbonLink, merge_with_cache + +try: + import ceres +except ImportError: + ceres = False + + +class CeresReader(object): + __slots__ = ('ceres_node', 'real_metric_path') + supported = bool(ceres) + + def __init__(self, ceres_node, real_metric_path): + self.ceres_node = ceres_node + self.real_metric_path = real_metric_path + + def get_intervals(self): + intervals = [] + for info in self.ceres_node.slice_info: + (start, end, step) = info + intervals.append(Interval(start, end)) + + return IntervalSet(intervals) + + def fetch(self, startTime, endTime): + data = self.ceres_node.read(startTime, endTime) + time_info = (data.startTime, data.endTime, data.timeStep) + values = list(data.values) + + # Merge in data from carbon's cache + try: + cached_datapoints = CarbonLink().query(self.real_metric_path) + except BaseException: + log.exception("Failed CarbonLink query '%s'" % + self.real_metric_path) + cached_datapoints = [] + + values = merge_with_cache(cached_datapoints, + data.startTime, + data.timeStep, + values) + + return time_info, values diff --git a/webapp/graphite/readers/multi.py b/webapp/graphite/readers/multi.py new file mode 100644 index 000000000..e69de29bb diff --git a/webapp/graphite/readers/remote.py b/webapp/graphite/readers/remote.py new file mode 100644 index 000000000..9846dbbde --- /dev/null +++ b/webapp/graphite/readers/remote.py @@ -0,0 +1,233 @@ +import time + +from Queue import Queue +from urllib import urlencode +from threading import Lock, current_thread + +from django.conf import settings + +from graphite.http_pool import http +from graphite.readers import FetchInProgress +from graphite.logger import log +from graphite.util import unpickle +from graphite.worker_pool.pool import get_pool + + +class RemoteReader(object): + __slots__ = ( + 'store', + 'metric_path', + 'intervals', + 'bulk_query', + 'connection') + inflight_lock = Lock() + + def __init__(self, store, node_info, bulk_query=None): + self.store = store + self.metric_path = node_info.get( + 'path') or node_info.get('metric_path') + self.intervals = node_info['intervals'] + self.bulk_query = bulk_query or ( + [self.metric_path] if self.metric_path else [] + ) + self.connection = None + + def __repr__(self): + return '' % (id(self), self.store.host) + + @staticmethod + def _log(msg, logger): + logger(('thread %s at %fs ' % + (current_thread().name, time.time())) + msg) + + @classmethod + def log_debug(cls, msg): + if settings.DEBUG: + cls._log(msg, log.info) + + @classmethod + def log_error(cls, msg): + cls._log(msg, log.exception) + + def get_intervals(self): + return self.intervals + + def fetch(self, startTime, endTime, now=None, requestContext=None): + seriesList = self.fetch_list(startTime, endTime, now, requestContext) + + def _fetch(seriesList): + if seriesList is None: + return None + + for series in seriesList: + if series['name'] == self.metric_path: + time_info = ( + series['start'], + series['end'], + series['step']) + return (time_info, series['values']) + + return None + + if isinstance(seriesList, FetchInProgress): + return FetchInProgress(lambda: _fetch(seriesList.waitForResults())) + + return _fetch(seriesList) + + def fetch_list(self, startTime, endTime, now=None, requestContext=None): + t = time.time() + + query_params = [ + ('format', 'pickle'), + ('local', '1'), + ('noCache', '1'), + ('from', str(int(startTime))), + ('until', str(int(endTime))) + ] + + if len(self.bulk_query) < 1: + return [] + + for target in self.bulk_query: + query_params.append(('target', target)) + + if now is not None: + query_params.append(('now', str(int(now)))) + + query_string = urlencode(query_params) + urlpath = '/render/' + url = "%s://%s%s" % ('https' if settings.INTRACLUSTER_HTTPS else 'http', + self.store.host, urlpath) + headers = requestContext.get( + 'forwardHeaders') if requestContext else None + + cacheKey = "%s?%s" % (url, query_string) + + if requestContext is not None and 'inflight_requests' in requestContext and cacheKey in requestContext[ + 'inflight_requests']: + self.log_debug( + "RemoteReader:: Returning cached FetchInProgress %s?%s" % + (url, query_string)) + return requestContext['inflight_requests'][cacheKey] + + if requestContext is None or 'inflight_locks' not in requestContext or cacheKey not in requestContext[ + 'inflight_locks']: + with self.inflight_lock: + self.log_debug( + "RemoteReader:: Got global lock %s?%s" % + (url, query_string)) + if requestContext is None: + requestContext = {} + if 'inflight_locks' not in requestContext: + requestContext['inflight_locks'] = {} + if 'inflight_requests' not in requestContext: + requestContext['inflight_requests'] = {} + if cacheKey not in requestContext['inflight_locks']: + self.log_debug( + "RemoteReader:: Creating lock %s?%s" % + (url, query_string)) + requestContext['inflight_locks'][cacheKey] = Lock() + self.log_debug( + "RemoteReader:: Released global lock %s?%s" % + (url, query_string)) + + cacheLock = requestContext['inflight_locks'][cacheKey] + + with cacheLock: + self.log_debug( + "RemoteReader:: got url lock %s?%s" % + (url, query_string)) + + if cacheKey in requestContext['inflight_requests']: + self.log_debug( + "RemoteReader:: Returning cached FetchInProgress %s?%s" % + (url, query_string)) + return requestContext['inflight_requests'][cacheKey] + + q = Queue() + if settings.USE_WORKER_POOL: + get_pool().apply_async( + func=self._fetch, + args=[url, query_string, query_params, headers], + callback=lambda x: q.put(x), + ) + else: + q.put( + self._fetch(url, query_string, query_params, headers), + ) + + def retrieve(): + with retrieve.lock: + # if the result is known we return it directly + if hasattr(retrieve, '_result'): + results = getattr(retrieve, '_result') + self.log_debug( + 'RemoteReader:: retrieve completed (cached) %s' % + (', '.join([result['path'] for result in results])), + ) + return results + + # otherwise we get it from the queue and keep it for later + results = q.get(block=True) + + for i in range(len(results)): + results[i]['path'] = results[i]['name'] + + if not results: + self.log_debug( + 'RemoteReader:: retrieve has received no results') + + setattr(retrieve, '_result', results) + self.log_debug( + 'RemoteReader:: retrieve completed %s' % + (', '.join([result['path'] for result in results])), + ) + return results + + self.log_debug( + 'RemoteReader:: Storing FetchInProgress with cacheKey {cacheKey}' .format( + cacheKey=cacheKey), ) + retrieve.lock = Lock() + data = FetchInProgress(retrieve) + requestContext['inflight_requests'][cacheKey] = data + + self.log_debug( + "RemoteReader:: Returning %s?%s in %fs" % + (url, query_string, time.time() - t)) + return data + + def _fetch(self, url, query_string, query_params, headers): + self.log_debug( + "RemoteReader:: Starting to execute _fetch %s?%s" % + (url, query_string)) + try: + self.log_debug( + "ReadResult:: Requesting %s?%s" % + (url, query_string)) + result = http.request( + 'POST' if settings.REMOTE_STORE_USE_POST else 'GET', + url, + fields=query_params, + headers=headers, + timeout=settings.REMOTE_FETCH_TIMEOUT, + ) + + if result.status != 200: + self.store.fail() + self.log_error( + "ReadResult:: Error response %d from %s?%s" % + (result.status, url, query_string)) + data = [] + else: + data = unpickle.loads(result.data) + except Exception as err: + self.store.fail() + self.log_error( + "ReadResult:: Error requesting %s?%s: %s" % + (url, query_string, err)) + data = [] + + self.log_debug( + "RemoteReader:: Completed _fetch %s?%s" % + (url, query_string)) + return data diff --git a/webapp/graphite/readers/rrd.py b/webapp/graphite/readers/rrd.py new file mode 100644 index 000000000..cea4cff09 --- /dev/null +++ b/webapp/graphite/readers/rrd.py @@ -0,0 +1,87 @@ +import sys +import os +import time + +# Use the built-in version of scandir/stat if possible, otherwise +# use the scandir module version +try: + from os import scandir, stat # noqa # pylint: disable=unused-import +except ImportError: + from scandir import scandir, stat # noqa # pylint: disable=unused-import + +try: + import rrdtool +except ImportError: + rrdtool = False + +from django.conf import settings +from graphite.intervals import Interval, IntervalSet + + +class RRDReader: + supported = bool(rrdtool) + + @staticmethod + def _convert_fs_path(fs_path): + if isinstance(fs_path, unicode): + fs_path = fs_path.encode(sys.getfilesystemencoding()) + return os.path.realpath(fs_path) + + def __init__(self, fs_path, datasource_name): + self.fs_path = RRDReader._convert_fs_path(fs_path) + self.datasource_name = datasource_name + + def get_intervals(self): + start = time.time() - self.get_retention(self.fs_path) + end = max(stat(self.fs_path).st_mtime, start) + return IntervalSet([Interval(start, end)]) + + def fetch(self, startTime, endTime): + startString = time.strftime( + "%H:%M_%Y%m%d+%Ss", time.localtime(startTime)) + endString = time.strftime("%H:%M_%Y%m%d+%Ss", time.localtime(endTime)) + + if settings.FLUSHRRDCACHED: + rrdtool.flushcached(self.fs_path, '--daemon', + settings.FLUSHRRDCACHED) + + (timeInfo, columns, rows) = rrdtool.fetch(self.fs_path, + settings.RRD_CF, '-s' + startString, '-e' + endString) + colIndex = list(columns).index(self.datasource_name) + rows.pop() # chop off the latest value because RRD returns crazy last values sometimes + values = (row[colIndex] for row in rows) + + return (timeInfo, values) + + @staticmethod + def get_datasources(fs_path): + info = rrdtool.info(RRDReader._convert_fs_path(fs_path)) + + if 'ds' in info: + return [datasource_name for datasource_name in info['ds']] + else: + ds_keys = [key for key in info if key.startswith('ds[')] + datasources = set(key[3:].split(']')[0] for key in ds_keys) + return list(datasources) + + @staticmethod + def get_retention(fs_path): + info = rrdtool.info(RRDReader._convert_fs_path(fs_path)) + if 'rra' in info: + rras = info['rra'] + else: + # Ugh, I like the old python-rrdtool api better.. + rra_count = max([int(key[4]) + for key in info if key.startswith('rra[')]) + 1 + rras = [{}] * rra_count + for i in range(rra_count): + rras[i]['pdp_per_row'] = info['rra[%d].pdp_per_row' % i] + rras[i]['rows'] = info['rra[%d].rows' % i] + + retention_points = 0 + for rra in rras: + points = rra['pdp_per_row'] * rra['rows'] + if points > retention_points: + retention_points = points + + return retention_points * info['step'] diff --git a/webapp/graphite/readers/utils.py b/webapp/graphite/readers/utils.py new file mode 100644 index 000000000..6afec83b5 --- /dev/null +++ b/webapp/graphite/readers/utils.py @@ -0,0 +1,161 @@ +import functools + +from graphite.intervals import IntervalSet +from graphite.logger import log + + +def CarbonLink(): + """Return a carbonlink instance.""" + # Late import to avoid pulling out too many dependencies with + # readers.py which is usually imported by plugins. + from graphite.carbonlink import CarbonLink + return CarbonLink() + + +class FetchInProgress(object): + def __init__(self, wait_callback): + self.wait_callback = wait_callback + + def waitForResults(self): + return self.wait_callback() + + +def merge_with_cache(cached_datapoints, start, step, values, func=None): + + consolidated = [] + + # Similar to the function in render/datalib:TimeSeries + def consolidate(func, values): + usable = [v for v in values if v is not None] + if not usable: + return None + if func == 'sum': + return sum(usable) + if func == 'average': + return float(sum(usable)) / len(usable) + if func == 'max': + return max(usable) + if func == 'min': + return min(usable) + if func == 'last': + return usable[-1] + raise Exception("Invalid consolidation function: '%s'" % func) + + if func: + consolidated_dict = {} + for (timestamp, value) in cached_datapoints: + interval = timestamp - (timestamp % step) + if interval in consolidated_dict: + consolidated_dict[interval].append(value) + else: + consolidated_dict[interval] = [value] + for interval in consolidated_dict: + value = consolidate(func, consolidated_dict[interval]) + consolidated.append((interval, value)) + + else: + consolidated = cached_datapoints + + for (interval, value) in consolidated: + try: + i = int(interval - start) / step + if i < 0: + # cached data point is earlier then the requested data point. + # meaning we can definitely ignore the cache result. + # note that we cannot rely on the 'except' + # in this case since 'values[-n]=' + # is equivalent to 'values[len(values) - n]=' + continue + values[i] = value + except BaseException: + pass + + return values + + +class MultiReader(object): + __slots__ = ('nodes',) + + def __init__(self, nodes): + self.nodes = nodes + + def get_intervals(self): + interval_sets = [] + for node in self.nodes: + interval_sets.extend(node.intervals.intervals) + return IntervalSet(sorted(interval_sets)) + + def fetch(self, startTime, endTime, now=None, requestContext=None): + # Start the fetch on each node + fetches = [] + + for n in self.nodes: + try: + fetches.append( + n.fetch(startTime, endTime, now, requestContext)) + except BaseException: + log.exception("Failed to initiate subfetch for %s" % str(n)) + + def merge_results(): + results = {} + + # Wait for any asynchronous operations to complete + for i, result in enumerate(fetches): + if isinstance(result, FetchInProgress): + try: + results[i] = result.waitForResults() + except BaseException: + log.exception("Failed to complete subfetch") + results[i] = None + else: + results[i] = result + + results = [r for r in results.values() if r is not None] + if not results: + raise Exception("All sub-fetches failed") + + return functools.reduce(self.merge, results) + + return FetchInProgress(merge_results) + + def merge(self, results1, results2): + # Ensure results1 is finer than results2 + if results1[0][2] > results2[0][2]: + results1, results2 = results2, results1 + + time_info1, values1 = results1 + time_info2, values2 = results2 + start1, end1, step1 = time_info1 + start2, end2, step2 = time_info2 + + step = step1 # finest step + start = min(start1, start2) # earliest start + end = max(end1, end2) # latest end + time_info = (start, end, step) + values = [] + + t = start + while t < end: + # Look for the finer precision value first if available + i1 = (t - start1) / step1 + + if len(values1) > i1: + v1 = values1[i1] + else: + v1 = None + + if v1 is None: + i2 = (t - start2) / step2 + + if len(values2) > i2: + v2 = values2[i2] + else: + v2 = None + + values.append(v2) + else: + values.append(v1) + + t += step + + return (time_info, values) diff --git a/webapp/graphite/readers/whisper.py b/webapp/graphite/readers/whisper.py new file mode 100644 index 000000000..a64cd6314 --- /dev/null +++ b/webapp/graphite/readers/whisper.py @@ -0,0 +1,101 @@ +from __future__ import absolute_import +import time + +# Use the built-in version of scandir/stat if possible, otherwise +# use the scandir module version +try: + from os import scandir, stat # noqa # pylint: disable=unused-import +except ImportError: + from scandir import scandir, stat # noqa # pylint: disable=unused-import + +try: + import whisper +except ImportError: + whisper = False + +try: + import gzip +except ImportError: + gzip = False + + +from graphite.intervals import Interval, IntervalSet +from graphite.logger import log +from graphite.readers.utils import CarbonLink, merge_with_cache + +# The parser was replacing __readHeader with the __readHeader +# which was not working. +if bool(whisper): + whisper__readHeader = whisper.__readHeader + + +class WhisperReader(object): + __slots__ = ('fs_path', 'real_metric_path') + supported = bool(whisper) + + def __init__(self, fs_path, real_metric_path): + self.fs_path = fs_path + self.real_metric_path = real_metric_path + + def get_intervals(self): + start = time.time() - whisper.info(self.fs_path)['maxRetention'] + end = max(stat(self.fs_path).st_mtime, start) + return IntervalSet([Interval(start, end)]) + + def fetch(self, startTime, endTime): + try: + data = whisper.fetch(self.fs_path, startTime, endTime) + except IOError: + log.exception("Failed fetch of whisper file '%s'" % self.fs_path) + return None + if not data: + return None + + time_info, values = data + (start, end, step) = time_info + + meta_info = whisper.info(self.fs_path) + aggregation_method = meta_info['aggregationMethod'] + lowest_step = min([i['secondsPerPoint'] + for i in meta_info['archives']]) + # Merge in data from carbon's cache + cached_datapoints = [] + try: + cached_datapoints = CarbonLink().query(self.real_metric_path) + except BaseException: + log.exception("Failed CarbonLink query '%s'" % + self.real_metric_path) + cached_datapoints = [] + + if isinstance(cached_datapoints, dict): + cached_datapoints = cached_datapoints.items() + + values = merge_with_cache(cached_datapoints, + start, + step, + values, + aggregation_method) + + return time_info, values + + +class GzippedWhisperReader(WhisperReader): + supported = bool(whisper and gzip) + + def get_intervals(self): + fh = gzip.GzipFile(self.fs_path, 'rb') + try: + info = whisper__readHeader(fh) # evil, but necessary. + finally: + fh.close() + + start = time.time() - info['maxRetention'] + end = max(stat(self.fs_path).st_mtime, start) + return IntervalSet([Interval(start, end)]) + + def fetch(self, startTime, endTime): + fh = gzip.GzipFile(self.fs_path, 'rb') + try: + return whisper.file_fetch(fh, startTime, endTime) + finally: + fh.close() diff --git a/webapp/graphite/remote_storage.py b/webapp/graphite/remote_storage.py index 802901204..a7a720242 100755 --- a/webapp/graphite/remote_storage.py +++ b/webapp/graphite/remote_storage.py @@ -1,335 +1,4 @@ -import time -import urllib3 -from Queue import Queue -from urllib import urlencode -from threading import Lock, current_thread -from django.conf import settings -from django.core.cache import cache -from graphite.intervals import Interval, IntervalSet -from graphite.node import LeafNode, BranchNode -from graphite.readers import FetchInProgress -from graphite.logger import log -from graphite.util import unpickle, logtime, timebounds -from graphite.render.hashing import compactHash -from graphite.worker_pool.pool import get_pool +"""Import symbols for compatibility.""" -http = urllib3.PoolManager(num_pools=10, maxsize=5) - - -def prefetchRemoteData(remote_stores, requestContext, pathExpressions): - if requestContext['localOnly']: - return - - if requestContext is None: - requestContext = {} - - if pathExpressions is None: - return - - (startTime, endTime, now) = timebounds(requestContext) - log.info('thread %s prefetchRemoteData:: Starting fetch_list on all backends' % current_thread().name) - - # Go through all of the remote nodes, and launch a fetch for each one. - # Each fetch will take place in its own thread, since it's naturally parallel work. - for store in remote_stores: - reader = RemoteReader(store, {'intervals': []}, bulk_query=pathExpressions) - reader.fetch_list(startTime, endTime, now, requestContext) - - -class RemoteStore(object): - - def __init__(self, host): - self.host = host - self.last_failure = 0 - - @property - def available(self): - return time.time() - self.last_failure > settings.REMOTE_RETRY_DELAY - - def find(self, query, headers=None): - return list(FindRequest(self, query).send(headers)) - - def fail(self): - self.last_failure = time.time() - - -class FindRequest(object): - __slots__ = ('store', 'query', 'cacheKey') - - def __init__(self, store, query): - self.store = store - self.query = query - - if query.startTime: - start = query.startTime - (query.startTime % settings.FIND_CACHE_DURATION) - else: - start = "" - - if query.endTime: - end = query.endTime - (query.endTime % settings.FIND_CACHE_DURATION) - else: - end = "" - - self.cacheKey = "find:%s:%s:%s:%s" % (store.host, compactHash(query.pattern), start, end) - - @logtime(custom_msg=True) - def send(self, headers=None, msg_setter=None): - log.info("FindRequest.send(host=%s, query=%s) called" % (self.store.host, self.query)) - - if headers is None: - headers = {} - - results = cache.get(self.cacheKey) - if results is not None: - log.info("FindRequest.send(host=%s, query=%s) using cached result" % (self.store.host, self.query)) - else: - url = "%s://%s/metrics/find/" % ('https' if settings.INTRACLUSTER_HTTPS else 'http', self.store.host) - - query_params = [ - ('local', '1'), - ('format', 'pickle'), - ('query', self.query.pattern), - ] - if self.query.startTime: - query_params.append( ('from', self.query.startTime) ) - - if self.query.endTime: - query_params.append( ('until', self.query.endTime) ) - - try: - result = http.request('POST' if settings.REMOTE_STORE_USE_POST else 'GET', - url, fields=query_params, headers=headers, timeout=settings.REMOTE_FIND_TIMEOUT) - except: - log.exception("FindRequest.send(host=%s, query=%s) exception during request" % (self.store.host, self.query)) - self.store.fail() - return - - if result.status != 200: - log.exception("FindRequest.send(host=%s, query=%s) error response %d from %s?%s" % (self.store.host, self.query, result.status, url, urlencode(query_params))) - self.store.fail() - return - - try: - results = unpickle.loads(result.data) - except: - log.exception("FindRequest.send(host=%s, query=%s) exception processing response" % (self.store.host, self.query)) - self.store.fail() - return - - cache.set(self.cacheKey, results, settings.FIND_CACHE_DURATION) - - msg_setter('host: {host}, query: {query}'.format(host=self.store.host, query=self.query)) - - for node_info in results: - # handle both 1.x and 0.9.x output - path = node_info.get('path') or node_info.get('metric_path') - is_leaf = node_info.get('is_leaf') or node_info.get('isLeaf') - intervals = node_info.get('intervals') or [] - if not isinstance(intervals, IntervalSet): - intervals = IntervalSet([Interval(interval[0], interval[1]) for interval in intervals]) - - node_info = { - 'is_leaf': is_leaf, - 'path': path, - 'intervals': intervals, - } - - if is_leaf: - reader = RemoteReader(self.store, node_info, bulk_query=[self.query.pattern]) - node = LeafNode(path, reader) - else: - node = BranchNode(path) - - node.local = False - yield node - - -class RemoteReader(object): - __slots__ = ('store', 'metric_path', 'intervals', 'bulk_query', 'connection') - inflight_lock = Lock() - - def __init__(self, store, node_info, bulk_query=None): - self.store = store - self.metric_path = node_info.get('path') or node_info.get('metric_path') - self.intervals = node_info['intervals'] - self.bulk_query = bulk_query or ( - [self.metric_path] if self.metric_path else [] - ) - self.connection = None - - def __repr__(self): - return '' % (id(self), self.store.host) - - @staticmethod - def _log(msg, logger): - logger(('thread %s at %fs ' % (current_thread().name, time.time())) + msg) - - @classmethod - def log_debug(cls, msg): - if settings.DEBUG: - cls._log(msg, log.info) - - @classmethod - def log_error(cls, msg): - cls._log(msg, log.exception) - - def get_intervals(self): - return self.intervals - - def fetch(self, startTime, endTime, now=None, requestContext=None): - seriesList = self.fetch_list(startTime, endTime, now, requestContext) - - def _fetch(seriesList): - if seriesList is None: - return None - - for series in seriesList: - if series['name'] == self.metric_path: - time_info = (series['start'], series['end'], series['step']) - return (time_info, series['values']) - - return None - - if isinstance(seriesList, FetchInProgress): - return FetchInProgress(lambda: _fetch(seriesList.waitForResults())) - - return _fetch(seriesList) - - def fetch_list(self, startTime, endTime, now=None, requestContext=None): - t = time.time() - - query_params = [ - ('format', 'pickle'), - ('local', '1'), - ('noCache', '1'), - ('from', str( int(startTime) )), - ('until', str( int(endTime) )) - ] - - if len(self.bulk_query) < 1: - return [] - - for target in self.bulk_query: - query_params.append(('target', target)) - - if now is not None: - query_params.append(('now', str( int(now) ))) - - query_string = urlencode(query_params) - urlpath = '/render/' - url = "%s://%s%s" % ('https' if settings.INTRACLUSTER_HTTPS else 'http', self.store.host, urlpath) - headers = requestContext.get('forwardHeaders') if requestContext else None - - cacheKey = "%s?%s" % (url, query_string) - - if requestContext is not None and 'inflight_requests' in requestContext and cacheKey in requestContext['inflight_requests']: - self.log_debug("RemoteReader:: Returning cached FetchInProgress %s?%s" % (url, query_string)) - return requestContext['inflight_requests'][cacheKey] - - if requestContext is None or 'inflight_locks' not in requestContext or cacheKey not in requestContext['inflight_locks']: - with self.inflight_lock: - self.log_debug("RemoteReader:: Got global lock %s?%s" % (url, query_string)) - if requestContext is None: - requestContext = {} - if 'inflight_locks' not in requestContext: - requestContext['inflight_locks'] = {} - if 'inflight_requests' not in requestContext: - requestContext['inflight_requests'] = {} - if cacheKey not in requestContext['inflight_locks']: - self.log_debug("RemoteReader:: Creating lock %s?%s" % (url, query_string)) - requestContext['inflight_locks'][cacheKey] = Lock() - self.log_debug("RemoteReader:: Released global lock %s?%s" % (url, query_string)) - - cacheLock = requestContext['inflight_locks'][cacheKey] - - with cacheLock: - self.log_debug("RemoteReader:: got url lock %s?%s" % (url, query_string)) - - if cacheKey in requestContext['inflight_requests']: - self.log_debug("RemoteReader:: Returning cached FetchInProgress %s?%s" % (url, query_string)) - return requestContext['inflight_requests'][cacheKey] - - q = Queue() - if settings.USE_WORKER_POOL: - get_pool().apply_async( - func=self._fetch, - args=[url, query_string, query_params, headers], - callback=lambda x: q.put(x), - ) - else: - q.put( - self._fetch(url, query_string, query_params, headers), - ) - - def retrieve(): - with retrieve.lock: - # if the result is known we return it directly - if hasattr(retrieve, '_result'): - results = getattr(retrieve, '_result') - self.log_debug( - 'RemoteReader:: retrieve completed (cached) %s' % - (', '.join([result['path'] for result in results])), - ) - return results - - # otherwise we get it from the queue and keep it for later - results = q.get(block=True) - - for i in range(len(results)): - results[i]['path'] = results[i]['name'] - - if not results: - self.log_debug('RemoteReader:: retrieve has received no results') - - setattr(retrieve, '_result', results) - self.log_debug( - 'RemoteReader:: retrieve completed %s' % - (', '.join([result['path'] for result in results])), - ) - return results - - self.log_debug( - 'RemoteReader:: Storing FetchInProgress with cacheKey {cacheKey}' - .format(cacheKey=cacheKey), - ) - retrieve.lock = Lock() - data = FetchInProgress(retrieve) - requestContext['inflight_requests'][cacheKey] = data - - self.log_debug("RemoteReader:: Returning %s?%s in %fs" % (url, query_string, time.time() - t)) - return data - - def _fetch(self, url, query_string, query_params, headers): - self.log_debug("RemoteReader:: Starting to execute _fetch %s?%s" % (url, query_string)) - try: - self.log_debug("ReadResult:: Requesting %s?%s" % (url, query_string)) - result = http.request( - 'POST' if settings.REMOTE_STORE_USE_POST else 'GET', - url, - fields=query_params, - headers=headers, - timeout=settings.REMOTE_FETCH_TIMEOUT, - ) - - if result.status != 200: - self.store.fail() - self.log_error("ReadResult:: Error response %d from %s?%s" % (result.status, url, query_string)) - data = [] - else: - data = unpickle.loads(result.data) - except Exception as err: - self.store.fail() - self.log_error("ReadResult:: Error requesting %s?%s: %s" % (url, query_string, err)) - data = [] - - self.log_debug("RemoteReader:: Completed _fetch %s?%s" % (url, query_string)) - return data - - -def extractForwardHeaders(request): - headers = {} - for name in settings.REMOTE_STORE_FORWARD_HEADERS: - value = request.META.get('HTTP_%s' % name.upper().replace('-', '_')) - if value is not None: - headers[name] = value - return headers +from graphite.finders.remote import RemoteStore, prefetchRemoteData # noqa # pylint: disable=unused-import +from graphite.finders.utils import extractForwardHeaders # noqa # pylint: disable=unused-import diff --git a/webapp/graphite/storage.py b/webapp/graphite/storage.py index 3461010a4..56dc49f90 100755 --- a/webapp/graphite/storage.py +++ b/webapp/graphite/storage.py @@ -4,9 +4,9 @@ from collections import defaultdict try: - from importlib import import_module + from importlib import import_module except ImportError: # python < 2.7 compatibility - from django.utils.importlib import import_module + from django.utils.importlib import import_module from django.conf import settings @@ -20,199 +20,223 @@ def get_finder(finder_path): - module_name, class_name = finder_path.rsplit('.', 1) - module = import_module(module_name) - return getattr(module, class_name)() - - -class Store: - def __init__(self, finders=None, hosts=None): - if finders is None: - finders = [get_finder(finder_path) - for finder_path in settings.STORAGE_FINDERS] - self.finders = finders - - if hosts is None: - hosts = settings.CLUSTER_SERVERS - remote_hosts = [host for host in hosts if not settings.REMOTE_EXCLUDE_LOCAL or not is_local_interface(host)] - self.remote_stores = [ RemoteStore(host) for host in remote_hosts ] - - - def find(self, pattern, startTime=None, endTime=None, local=False, headers=None): - query = FindQuery(pattern, startTime, endTime, local) - - for match in self.find_all(query, headers): - yield match - - - def find_all(self, query, headers=None): - start = time.time() - result_queue = Queue.Queue() - jobs = [] - - # Start remote searches - if not query.local: - random.shuffle(self.remote_stores) - jobs.extend([ - (store.find, query, headers) - for store in self.remote_stores if store.available - ]) - - # Start local searches - for finder in self.finders: - jobs.append((finder.find_nodes, query)) - - if settings.USE_WORKER_POOL: - return_result = lambda x: result_queue.put(x) - for job in jobs: - get_pool().apply_async(func=job[0], args=job[1:], callback=return_result) - else: - for job in jobs: - result_queue.put(job[0](*job[1:])) - - # Group matching nodes by their path - nodes_by_path = defaultdict(list) - - deadline = start + settings.REMOTE_FIND_TIMEOUT - result_cnt = 0 - - while result_cnt < len(jobs): - wait_time = deadline - time.time() + module_name, class_name = finder_path.rsplit('.', 1) + module = import_module(module_name) + return getattr(module, class_name)() + + +class Store(object): + def __init__(self, finders=None, hosts=None): + if finders is None: + finders = [get_finder(finder_path) + for finder_path in settings.STORAGE_FINDERS] + self.finders = finders + + if hosts is None: + hosts = settings.CLUSTER_SERVERS + remote_hosts = [ + host for host in hosts if not settings.REMOTE_EXCLUDE_LOCAL or not is_local_interface(host)] + self.remote_stores = [RemoteStore(host) for host in remote_hosts] + + def find( + self, + pattern, + startTime=None, + endTime=None, + local=False, + headers=None): + query = FindQuery(pattern, startTime, endTime, local) + + for match in self.find_all(query, headers): + yield match + + def find_all(self, query, headers=None): + start = time.time() + result_queue = Queue.Queue() + jobs = [] + + # Start remote searches + if not query.local: + random.shuffle(self.remote_stores) + jobs.extend([ + (store.find, query, headers) + for store in self.remote_stores if store.available + ]) + + # Start local searches + for finder in self.finders: + jobs.append((finder.find_nodes, query)) + + if settings.USE_WORKER_POOL: + def return_result(x): + return result_queue.put(x) + for job in jobs: + get_pool().apply_async( + func=job[0], args=job[1:], callback=return_result) + else: + for job in jobs: + result_queue.put(job[0](*job[1:])) + + # Group matching nodes by their path + nodes_by_path = defaultdict(list) + + deadline = start + settings.REMOTE_FIND_TIMEOUT + result_cnt = 0 + + while result_cnt < len(jobs): + wait_time = deadline - time.time() + + try: + nodes = result_queue.get(True, wait_time) + + # ValueError could happen if due to really unlucky timing wait_time + # is negative + except (Queue.Empty, ValueError): + if time.time() > deadline: + log.info( + "Timed out in find_all after %fs" % + (settings.REMOTE_FIND_TIMEOUT)) + break + else: + continue + + log.info("Got a find result after %fs" % (time.time() - start)) + result_cnt += 1 + if nodes: + for node in nodes: + nodes_by_path[node.path].append(node) + + log.info("Got all find results in %fs" % (time.time() - start)) + + # Reduce matching nodes for each path to a minimal set + found_branch_nodes = set() + + items = list(nodes_by_path.iteritems()) + random.shuffle(items) + + for path, nodes in items: + leaf_nodes = [] + + # First we dispense with the BranchNodes + for node in nodes: + if node.is_leaf: + leaf_nodes.append(node) + # TODO need to filter branch nodes based on requested + # interval... how?!?!? + elif node.path not in found_branch_nodes: + yield node + found_branch_nodes.add(node.path) + + if not leaf_nodes: + continue + + # Fast-path when there is a single node. + if len(leaf_nodes) == 1: + yield leaf_nodes[0] + continue + + # Calculate best minimal node set + minimal_node_set = set() + covered_intervals = IntervalSet([]) + + # If the query doesn't fall entirely within the FIND_TOLERANCE window + # we disregard the window. This prevents unnecessary remote fetches + # caused when carbon's cache skews node.intervals, giving the appearance + # remote systems have data we don't have locally, which we probably + # do. + now = int(time.time()) + tolerance_window = now - settings.FIND_TOLERANCE + disregard_tolerance_window = query.interval.start < tolerance_window + prior_to_window = Interval(float('-inf'), tolerance_window) + + def measure_of_added_coverage( + node, drop_window=disregard_tolerance_window): + relevant_intervals = node.intervals.intersect_interval( + query.interval) + if drop_window: + relevant_intervals = relevant_intervals.intersect_interval( + prior_to_window) + return covered_intervals.union( + relevant_intervals).size - covered_intervals.size + + nodes_remaining = list(leaf_nodes) + + # Prefer local nodes first (and do *not* drop the tolerance window) + for node in leaf_nodes: + if node.local and measure_of_added_coverage(node, False) > 0: + nodes_remaining.remove(node) + minimal_node_set.add(node) + covered_intervals = covered_intervals.union(node.intervals) + + if settings.REMOTE_STORE_MERGE_RESULTS: + remote_nodes = [n for n in nodes_remaining if not n.local] + for node in remote_nodes: + nodes_remaining.remove(node) + minimal_node_set.add(node) + covered_intervals = covered_intervals.union(node.intervals) + else: + while nodes_remaining: + node_coverages = [(measure_of_added_coverage(n), n) + for n in nodes_remaining] + best_coverage, best_node = max(node_coverages) + + if best_coverage == 0: + break + + nodes_remaining.remove(best_node) + minimal_node_set.add(best_node) + covered_intervals = covered_intervals.union( + best_node.intervals) + + # Sometimes the requested interval falls within the caching window. + # We include the most likely node if the gap is within + # tolerance. + if not minimal_node_set: + def distance_to_requested_interval(node): + if not node.intervals: + return float('inf') + latest = sorted( + node.intervals, key=lambda i: i.end)[-1] + distance = query.interval.start - latest.end + return distance if distance >= 0 else float('inf') + + best_candidate = min( + leaf_nodes, key=distance_to_requested_interval) + if distance_to_requested_interval( + best_candidate) <= settings.FIND_TOLERANCE: + minimal_node_set.add(best_candidate) + + if len(minimal_node_set) == 1: + yield minimal_node_set.pop() + elif len(minimal_node_set) > 1: + reader = MultiReader(minimal_node_set) + yield LeafNode(path, reader) - try: - nodes = result_queue.get(True, wait_time) - # ValueError could happen if due to really unlucky timing wait_time is negative - except (Queue.Empty, ValueError): - if time.time() > deadline: - log.info("Timed out in find_all after %fs" % (settings.REMOTE_FIND_TIMEOUT)) - break +class FindQuery: + def __init__(self, pattern, startTime, endTime, local=False): + self.pattern = pattern + self.startTime = startTime + self.endTime = endTime + self.isExact = is_pattern(pattern) + self.interval = Interval( + float('-inf') if startTime is None else startTime, + float('inf') if endTime is None else endTime) + self.local = local + + def __repr__(self): + if self.startTime is None: + startString = '*' else: - continue - - log.info("Got a find result after %fs" % (time.time() - start)) - result_cnt += 1 - if nodes: - for node in nodes: - nodes_by_path[node.path].append(node) - - log.info("Got all find results in %fs" % (time.time() - start)) - - # Reduce matching nodes for each path to a minimal set - found_branch_nodes = set() - - items = list(nodes_by_path.iteritems()) - random.shuffle(items) - - for path, nodes in items: - leaf_nodes = [] - - # First we dispense with the BranchNodes - for node in nodes: - if node.is_leaf: - leaf_nodes.append(node) - elif node.path not in found_branch_nodes: #TODO need to filter branch nodes based on requested interval... how?!?!? - yield node - found_branch_nodes.add(node.path) - - if not leaf_nodes: - continue - - # Fast-path when there is a single node. - if len(leaf_nodes) == 1: - yield leaf_nodes[0] - continue - - # Calculate best minimal node set - minimal_node_set = set() - covered_intervals = IntervalSet([]) - - # If the query doesn't fall entirely within the FIND_TOLERANCE window - # we disregard the window. This prevents unnecessary remote fetches - # caused when carbon's cache skews node.intervals, giving the appearance - # remote systems have data we don't have locally, which we probably do. - now = int( time.time() ) - tolerance_window = now - settings.FIND_TOLERANCE - disregard_tolerance_window = query.interval.start < tolerance_window - prior_to_window = Interval( float('-inf'), tolerance_window ) - - def measure_of_added_coverage(node, drop_window=disregard_tolerance_window): - relevant_intervals = node.intervals.intersect_interval(query.interval) - if drop_window: - relevant_intervals = relevant_intervals.intersect_interval(prior_to_window) - return covered_intervals.union(relevant_intervals).size - covered_intervals.size - - nodes_remaining = list(leaf_nodes) - - # Prefer local nodes first (and do *not* drop the tolerance window) - for node in leaf_nodes: - if node.local and measure_of_added_coverage(node, False) > 0: - nodes_remaining.remove(node) - minimal_node_set.add(node) - covered_intervals = covered_intervals.union(node.intervals) - - if settings.REMOTE_STORE_MERGE_RESULTS: - remote_nodes = [n for n in nodes_remaining if not n.local] - for node in remote_nodes: - nodes_remaining.remove(node) - minimal_node_set.add(node) - covered_intervals = covered_intervals.union(node.intervals) - else: - while nodes_remaining: - node_coverages = [ (measure_of_added_coverage(n), n) for n in nodes_remaining ] - best_coverage, best_node = max(node_coverages) - - if best_coverage == 0: - break - - nodes_remaining.remove(best_node) - minimal_node_set.add(best_node) - covered_intervals = covered_intervals.union(best_node.intervals) - - # Sometimes the requested interval falls within the caching window. - # We include the most likely node if the gap is within tolerance. - if not minimal_node_set: - def distance_to_requested_interval(node): - if not node.intervals: - return float('inf') - latest = sorted(node.intervals, key=lambda i: i.end)[-1] - distance = query.interval.start - latest.end - return distance if distance >= 0 else float('inf') - - best_candidate = min(leaf_nodes, key=distance_to_requested_interval) - if distance_to_requested_interval(best_candidate) <= settings.FIND_TOLERANCE: - minimal_node_set.add(best_candidate) - - if len(minimal_node_set) == 1: - yield minimal_node_set.pop() - elif len(minimal_node_set) > 1: - reader = MultiReader(minimal_node_set) - yield LeafNode(path, reader) + startString = time.ctime(self.startTime) + if self.endTime is None: + endString = '*' + else: + endString = time.ctime(self.endTime) -class FindQuery: - def __init__(self, pattern, startTime, endTime, local=False): - self.pattern = pattern - self.startTime = startTime - self.endTime = endTime - self.isExact = is_pattern(pattern) - self.interval = Interval(float('-inf') if startTime is None else startTime, - float('inf') if endTime is None else endTime) - self.local = local - - - def __repr__(self): - if self.startTime is None: - startString = '*' - else: - startString = time.ctime(self.startTime) - - if self.endTime is None: - endString = '*' - else: - endString = time.ctime(self.endTime) - - return '' % (self.pattern, startString, endString) + return '' % ( + self.pattern, startString, endString) STORE = Store() diff --git a/webapp/tests/test_finders.py b/webapp/tests/test_finders.py index f0422d02a..ae391ea8e 100644 --- a/webapp/tests/test_finders.py +++ b/webapp/tests/test_finders.py @@ -1,24 +1,35 @@ +from __future__ import absolute_import + import gzip import os from os.path import join, dirname, isdir import random import shutil import time +import unittest try: from unittest.mock import patch except ImportError: from mock import patch -from .base import TestCase +try: + import ceres +except ImportError: + ceres = False +try: + import whisper +except ImportError: + whisper = False + from django.conf import settings from graphite.intervals import Interval, IntervalSet from graphite.node import LeafNode, BranchNode from graphite.storage import Store, FindQuery, get_finder from graphite.finders.standard import scandir -import ceres -import whisper +from tests.base import TestCase + class FinderTest(TestCase): def test_custom_finder(self): @@ -36,6 +47,7 @@ def test_custom_finder(self): self.assertEqual(time_info, (100, 200, 10)) self.assertEqual(len(series), 10) + class DummyReader(object): __slots__ = ('path',) @@ -243,6 +255,7 @@ class CeresFinderTest(TestCase): _listdir_counter = 0 _original_listdir = os.listdir + unittest.skipIf(not ceres, 'ceres not installed') def test_ceres_finder(self): test_dir = join(settings.CERES_DIR)