Skip to content
Browse files

Merge branch 'graphite_megacarbon' into ceres_support

Conflicts:
	carbon/MANIFEST.in
	carbon/bin/carbon-client.py
	carbon/bin/carbon-daemon.py
	carbon/conf/carbon-daemons/example/aggregation-rules.conf
	carbon/conf/carbon-daemons/example/relay-rules.conf
	carbon/lib/carbon/aggregator/buffers.py
	carbon/lib/carbon/amqp_listener.py
	carbon/lib/carbon/amqp_publisher.py
	carbon/lib/carbon/cache.py
	carbon/lib/carbon/client.py
	carbon/lib/carbon/conf.py
	carbon/lib/carbon/events.py
	carbon/lib/carbon/instrumentation.py
	carbon/lib/carbon/log.py
	carbon/lib/carbon/manhole.py
	carbon/lib/carbon/protocols.py
	carbon/lib/carbon/relayrules.py
	carbon/lib/carbon/rewrite.py
	carbon/lib/carbon/routers.py
	carbon/lib/carbon/service.py
	carbon/lib/carbon/state.py
	carbon/lib/carbon/storage.py
	carbon/lib/carbon/util.py
	carbon/lib/carbon/writer.py
	carbon/lib/twisted/plugins/carbon_daemon_plugin.py
	carbon/setup.py
	docs/config-carbon.rst
	docs/overview.rst
	docs/tools.rst
	setup.py
	webapp/content/js/composer_widgets.js
	webapp/content/js/dashboard.js
	webapp/graphite/dashboard/views.py
	webapp/graphite/local_settings.py.example
	webapp/graphite/metrics/views.py
	webapp/graphite/remote_storage.py
	webapp/graphite/render/functions.py
	webapp/graphite/render/glyph.py
	webapp/graphite/render/views.py
	webapp/graphite/settings.py
	webapp/graphite/storage.py
	whisper/whisper.py
  • Loading branch information...
1 parent df86969 commit f18bb3c711bd566debf3edcc6b895be9fb6210b0 @mleinart mleinart committed May 15, 2012
View
9 bin/build-index.sh
@@ -12,20 +12,23 @@ fi
WHISPER_DIR="${GRAPHITE_STORAGE_DIR}/whisper"
+CERES_DIR="${GRAPHITE_STORAGE_DIR}/ceres"
-if [ ! -d "$WHISPER_DIR" ]
+if [ ! -d "$WHISPER_DIR"] || [ ! -d "$CERES_DIR" ]
then
- echo "Fatal Error: $WHISPER_DIR does not exist."
+ echo "Fatal Error: neither $WHISPER_DIR nor $CERES_DIR exist."
exit 1
fi
INDEX_FILE="${GRAPHITE_STORAGE_DIR}/index"
TMP_INDEX="${GRAPHITE_STORAGE_DIR}/.index.tmp"
rm -f $TMP_INDEX
-cd $WHISPER_DIR
touch $INDEX_FILE
echo "[`date`] building index..."
+cd $WHISPER_DIR
find -L . -name '*.wsp' | perl -pe 's!^[^/]+/(.+)\.wsp$!$1!; s!/!.!g' > $TMP_INDEX
+cd $CERES_DIR
+find -L . -name '.ceres-node' | perl -pe 's!^[^/]+/(.+)/\.ceres-node$!$1!; s!/!.!g;' >> $TMP_INDEX
echo "[`date`] complete, switching to new index file"
mv -f $TMP_INDEX $INDEX_FILE
View
3 docs/overview.rst
@@ -1,5 +1,8 @@
Overview
=================
+
+What Graphite is and is not
+---------------------------
Graphite does two things:
1. Store numeric time-series data
View
4 setup.py
@@ -14,7 +14,7 @@
storage_dirs = []
-for subdir in ('whisper', 'rrd', 'log', 'log/webapp'):
+for subdir in ('whisper', 'ceres', 'rrd', 'log', 'log/webapp'):
storage_dirs.append( ('storage/%s' % subdir, []) )
webapp_content = {}
@@ -33,7 +33,7 @@
setup(
name='graphite-web',
- version='0.9.10-pre1',
+ version='0.10.0',
url='https://launchpad.net/graphite',
author='Chris Davis',
author_email='chrismd@gmail.com',
View
BIN webapp/content/img/favicon.ico
Binary file not shown.
View
1 webapp/content/js/browser.js
@@ -43,7 +43,6 @@ function createTreePanel(){
var node_id = node.id.replace(/^[A-Za-z]+Tree\.?/,"");
loader.baseParams.query = (node_id == "") ? "*" : (node_id + ".*");
loader.baseParams.format = 'treejson';
- loader.baseParams.contexts = '1';
loader.baseParams.path = node_id;
if (node.parentNode && node.parentNode.id == "UserGraphsTree") {
View
183 webapp/graphite/carbonlink.py
@@ -0,0 +1,183 @@
+import time
+import socket
+import struct
+import errno
+import random
+from select import select
+from django.conf import settings
+from graphite.render.hashing import ConsistentHashRing
+from graphite.logger import log
+from graphite.util import load_module
+
+try:
+ import cPickle as pickle
+except ImportError:
+ import pickle
+
+
+def load_keyfunc():
+ if settings.CARBONLINK_HASHING_KEYFUNC:
+ module_path, func_name = settings.CARBONLINK_HASHING_KEYFUNC.rsplit(':', 1)
+ return load_module(module_path, member=func_name)
+ else:
+ return lambda x: x
+
+
+class CarbonLinkPool:
+ def __init__(self, hosts, timeout):
+ self.hosts = [ (server, instance) for (server, port, instance) in hosts ]
+ self.ports = dict( ((server, instance), port) for (server, port, instance) in hosts )
+ self.timeout = float(timeout)
+ servers = set([server for (server, port, instance) in hosts])
+ if len(servers) < settings.REPLICATION_FACTOR:
+ raise Exception("REPLICATION_FACTOR=%d cannot exceed servers=%d" % (settings.REPLICATION_FACTOR, len(servers)))
+
+ self.hash_ring = ConsistentHashRing(self.hosts)
+ self.keyfunc = load_keyfunc()
+ self.connections = {}
+ self.last_failure = {}
+ # Create a connection pool for each host
+ for host in self.hosts:
+ self.connections[host] = set()
+
+ def select_host(self, metric):
+ "Returns the carbon host that has data for the given metric"
+ key = self.keyfunc(metric)
+ nodes = []
+ servers = set()
+ for node in self.hash_ring.get_nodes(key):
+ (server, instance) = node
+ if server in servers:
+ continue
+ servers.add(server)
+ nodes.append(node)
+ if len(servers) >= settings.REPLICATION_FACTOR:
+ break
+
+ available = [ n for n in nodes if self.is_available(n) ]
+ return random.choice(available or nodes)
+
+ def is_available(self, host):
+ now = time.time()
+ last_fail = self.last_failure.get(host, 0)
+ return (now - last_fail) < settings.CARBONLINK_RETRY_DELAY
+
+ def get_connection(self, host):
+ # First try to take one out of the pool for this host
+ (server, instance) = host
+ port = self.ports[host]
+ connectionPool = self.connections[host]
+ try:
+ return connectionPool.pop()
+ except KeyError:
+ pass #nothing left in the pool, gotta make a new connection
+
+ log.cache("CarbonLink creating a new socket for %s" % str(host))
+ connection = socket.socket()
+ connection.settimeout(self.timeout)
+ try:
+ connection.connect( (server, port) )
+ except:
+ self.last_failure[host] = time.time()
+ raise
+ else:
+ connection.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
+ return connection
+
+ def query(self, metric):
+ request = dict(type='cache-query', metric=metric)
+ results = self.send_request(request)
+ log.cache("CarbonLink cache-query request for %s returned %d datapoints" % (metric, len(results)))
+ return results['datapoints']
+
+ def get_metadata(self, metric, key):
+ request = dict(type='get-metadata', metric=metric, key=key)
+ results = self.send_request(request)
+ log.cache("CarbonLink get-metadata request received for %s:%s" % (metric, key))
+ return results['value']
+
+ def set_metadata(self, metric, key, value):
+ request = dict(type='set-metadata', metric=metric, key=key, value=value)
+ results = self.send_request(request)
+ log.cache("CarbonLink set-metadata request received for %s:%s" % (metric, key))
+ return results
+
+ def send_request(self, request):
+ metric = request['metric']
+ serialized_request = pickle.dumps(request, protocol=-1)
+ len_prefix = struct.pack("!L", len(serialized_request))
+ request_packet = len_prefix + serialized_request
+
+ host = self.select_host(metric)
+ conn = self.get_connection(host)
+ try:
+ conn.sendall(request_packet)
+ result = self.recv_response(conn)
+ except:
+ self.last_failure[host] = time.time()
+ raise
+ else:
+ self.connections[host].add(conn)
+ if 'error' in result:
+ raise CarbonLinkRequestError(result['error'])
+ else:
+ return result
+
+ def recv_response(self, conn):
+ len_prefix = recv_exactly(conn, 4)
+ body_size = struct.unpack("!L", len_prefix)[0]
+ body = recv_exactly(conn, body_size)
+ return pickle.loads(body)
+
+
+class CarbonLinkRequestError(Exception):
+ pass
+
+
+# Socket helper functions
+def still_connected(sock):
+ is_readable = select([sock], [], [], 0)[0]
+ if is_readable:
+ try:
+ recv_buf = sock.recv(1, socket.MSG_DONTWAIT|socket.MSG_PEEK)
+
+ except socket.error, e:
+ if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
+ return True
+ else:
+ raise
+
+ else:
+ return bool(recv_buf)
+
+ else:
+ return True
+
+
+def recv_exactly(conn, num_bytes):
+ buf = ''
+ while len(buf) < num_bytes:
+ data = conn.recv( num_bytes - len(buf) )
+ if not data:
+ raise Exception("Connection lost")
+ buf += data
+
+ return buf
+
+
+#parse hosts from local_settings.py
+hosts = []
+for host in settings.CARBONLINK_HOSTS:
+ parts = host.split(':')
+ server = parts[0]
+ port = int( parts[1] )
+ if len(parts) > 2:
+ instance = parts[2]
+ else:
+ instance = None
+
+ hosts.append( (server, int(port), instance) )
+
+
+#A shared importable singleton
+CarbonLink = CarbonLinkPool(hosts, settings.CARBONLINK_TIMEOUT)
View
6 webapp/graphite/cli/completer.py
@@ -43,11 +43,11 @@ def completePath(path, shortnames=False):
results = []
- for match in STORE.find(pattern):
+ for node in STORE.find(pattern):
if shortnames:
- results.append(match.name)
+ results.append(node.name)
else:
- results.append(match.metric_path)
+ results.append(node.path)
list_items = ["<li>%s</li>" % r for r in results]
list_element = "<ul>" + '\n'.join(list_items) + "</ul>"
View
165 webapp/graphite/finders.py
@@ -0,0 +1,165 @@
+import os
+import fnmatch
+from os.path import islink, isdir, isfile, realpath, join, dirname, basename
+from glob import glob
+from ceres import CeresTree, CeresNode, setDefaultSliceCachingBehavior
+from graphite.node import BranchNode, LeafNode
+from graphite.readers import CeresReader, WhisperReader, GzippedWhisperReader, RRDReader
+from graphite.util import find_escaped_pattern_fields
+
+
+#setDefaultSliceCachingBehavior('all')
+
+
+class CeresFinder:
+ def __init__(self, directory):
+ self.directory = directory
+ self.tree = CeresTree(directory)
+
+ def find_nodes(self, query):
+ for fs_path in glob( self.tree.getFilesystemPath(query.pattern) ):
+ metric_path = self.tree.getNodePath(fs_path)
+
+ if CeresNode.isNodeDir(fs_path):
+ ceres_node = self.tree.getNode(metric_path)
+
+ if ceres_node.hasDataForInterval(query.startTime, query.endTime):
+ real_metric_path = get_real_metric_path(fs_path, metric_path)
+ reader = CeresReader(ceres_node, real_metric_path)
+ yield LeafNode(metric_path, reader)
+
+ elif isdir(fs_path):
+ yield BranchNode(metric_path)
+
+
+class StandardFinder:
+ DATASOURCE_DELIMETER = '::RRD_DATASOURCE::'
+
+ def __init__(self, directories):
+ self.directories = directories
+
+ def find_nodes(self, query):
+ clean_pattern = query.pattern.replace('\\', '')
+ pattern_parts = clean_pattern.split('.')
+
+ for root_dir in self.directories:
+ for absolute_path in self._find_paths(root_dir, pattern_parts):
+ if basename(absolute_path).startswith('.'):
+ continue
+
+ if self.DATASOURCE_DELIMETER in basename(absolute_path):
+ (absolute_path, datasource_pattern) = absolute_path.rsplit(self.DATASOURCE_DELIMETER, 1)
+ else:
+ datasource_pattern = None
+
+ relative_path = absolute_path[ len(root_dir): ].lstrip('/')
+ metric_path = fs_to_metric(relative_path)
+ real_metric_path = get_real_metric_path(absolute_path, metric_path)
+
+ metric_path_parts = metric_path.split('.')
+ for field_index in find_escaped_pattern_fields(query.pattern):
+ metric_path_parts[field_index] = pattern_parts[field_index].replace('\\', '')
+ metric_path = '.'.join(metric_path_parts)
+
+ # Now we construct and yield an appropriate Node object
+ if isdir(absolute_path):
+ yield BranchNode(metric_path)
+
+ elif isfile(absolute_path):
+ if absolute_path.endswith('.wsp') and WhisperReader.supported:
+ reader = WhisperReader(absolute_path)
+ yield LeafNode(metric_path, reader)
+
+ elif absolute_path.endswith('.wsp.gz') and GzippedWhisperReader.supported:
+ reader = GzippedWhisperReader(absolute_path)
+ yield LeafNode(metric_path, reader)
+
+ elif absolute_path.endswith('.rrd') and RRDReader.supported:
+ if datasource_pattern is None:
+ yield BranchNode(metric_path)
+
+ else:
+ for datasource_name in RRDReader.get_datasources(absolute_path):
+ if match_entries([datasource_name], datasource_pattern):
+ reader = RRDReader(absolute_path, datasource_name)
+ yield LeafNode(metric_path, reader)
+
+ def _find_paths(self, current_dir, patterns):
+ """Recursively generates absolute paths whose components underneath current_dir
+ match the corresponding pattern in patterns"""
+ pattern = patterns[0]
+ patterns = patterns[1:]
+ entries = os.listdir(current_dir)
+
+ subdirs = [e for e in entries if isdir( join(current_dir,e) )]
+ matching_subdirs = match_entries(subdirs, pattern)
+
+ if len(patterns) == 1 and RRDReader.supported: #the last pattern may apply to RRD data sources
+ files = [e for e in entries if isfile( join(current_dir,e) )]
+ rrd_files = match_entries(files, pattern + ".rrd")
+
+ if rrd_files: #let's assume it does
+ datasource_pattern = patterns[0]
+
+ for rrd_file in rrd_files:
+ absolute_path = join(current_dir, rrd_file)
+ yield absolute_path + self.DATASOURCE_DELIMETER + datasource_pattern
+
+ if patterns: #we've still got more directories to traverse
+ for subdir in matching_subdirs:
+
+ absolute_path = join(current_dir, subdir)
+ for match in self._find_paths(absolute_path, patterns):
+ yield match
+
+ else: #we've got the last pattern
+ files = [e for e in entries if isfile( join(current_dir,e) )]
+ matching_files = match_entries(files, pattern + '.*')
+
+ for basename in matching_subdirs + matching_files:
+ yield join(current_dir, basename)
+
+
+def fs_to_metric(path):
+ dirpath = dirname(path)
+ filename = basename(path)
+ return join(dirpath, filename.split('.')[0]).replace('/','.')
+
+
+def get_real_metric_path(absolute_path, metric_path):
+ # Support symbolic links (real_metric_path ensures proper cache queries)
+ if islink(absolute_path):
+ real_fs_path = realpath(absolute_path)
+ relative_fs_path = metric_path.replace('.', '/')
+ base_fs_path = absolute_path[ :-len(relative_fs_path) ]
+ relative_real_fs_path = real_fs_path[ len(base_fs_path): ]
+ return fs_to_metric( relative_real_fs_path )
+
+ return metric_path
+
+def _deduplicate(entries):
+ yielded = set()
+ for entry in entries:
+ if entry not in yielded:
+ yielded.add(entry)
+ yield entry
+
+def match_entries(entries, pattern):
+ """A drop-in replacement for fnmatch.filter that supports pattern
+ variants (ie. {foo,bar}baz = foobaz or barbaz)."""
+ v1, v2 = pattern.find('{'), pattern.find('}')
+
+ if v1 > -1 and v2 > v1:
+ variations = pattern[v1+1:v2].split(',')
+ variants = [ pattern[:v1] + v + pattern[v2+1:] for v in variations ]
+ matching = []
+
+ for variant in variants:
+ matching.extend( fnmatch.filter(entries, variant) )
+
+ return list( _deduplicate(matching) ) #remove dupes without changing order
+
+ else:
+ matching = fnmatch.filter(entries, pattern)
+ matching.sort()
+ return matching
View
132 webapp/graphite/intervals.py
@@ -0,0 +1,132 @@
+INFINITY = float('inf')
+NEGATIVE_INFINITY = -INFINITY
+
+
+class IntervalSet:
+ __slots__ = ('intervals', 'size')
+
+ def __init__(self, intervals, disjoint=False):
+ self.intervals = intervals
+
+ if not disjoint:
+ self.intervals = union_overlapping(self.intervals)
+
+ self.size = sum(i.size for i in self.intervals)
+
+ def __repr__(self):
+ return repr(self.intervals)
+
+ def __iter__(self):
+ return iter(self.intervals)
+
+ def __nonzero__(self):
+ return self.size != 0
+
+ def __sub__(self, other):
+ return self.intersect( other.complement() )
+
+ def complement(self):
+ complementary = []
+ cursor = NEGATIVE_INFINITY
+
+ for interval in self.intervals:
+ if cursor < interval.start:
+ complementary.append( Interval(cursor, interval.start) )
+ cursor = interval.end
+
+ if cursor < INFINITY:
+ complementary.append( Interval(cursor, INFINITY) )
+
+ return IntervalSet(complementary, disjoint=True)
+
+ def intersect(self, other): #XXX The last major bottleneck. Factorial-time hell.
+ # Then again, this function is entirely unused...
+ if (not self) or (not other):
+ return IntervalSet([])
+
+ #earliest = max(self.intervals[0].start, other.intervals[0].start)
+ #latest = min(self.intervals[-1].end, other.intervals[-1].end)
+
+ #mine = [i for i in self.intervals if i.start >= earliest and i.end <= latest]
+ #theirs = [i for i in other.intervals if i.start >= earliest and i.end <= latest]
+
+ intersections = [x for x in (i.intersect(j)
+ for i in self.intervals
+ for j in other.intervals)
+ if x]
+
+ return IntervalSet(intersections, disjoint=True)
+
+ def intersect_interval(self, interval):
+ intersections = [x for x in (i.intersect(interval)
+ for i in self.intervals)
+ if x]
+ return IntervalSet(intersections, disjoint=True)
+
+ def union(self, other):
+ return IntervalSet( sorted(self.intervals + other.intervals) )
+
+
+
+class Interval:
+ __slots__ = ('start', 'end', 'tuple', 'size')
+
+ def __init__(self, start, end):
+ if end - start < 0:
+ raise ValueError("Invalid interval start=%s end=%s" % (start, end))
+
+ self.start = start
+ self.end = end
+ self.tuple = (start, end)
+ self.size = self.end - self.start
+
+ def __eq__(self, other):
+ return self.tuple == other.tuple
+
+ def __hash__(self):
+ return hash( self.tuple )
+
+ def __cmp__(self, other):
+ return cmp(self.start, other.start)
+
+ def __len__(self):
+ raise TypeError("len() doesn't support infinite values, use the 'size' attribute instead")
+
+ def __nonzero__(self):
+ return self.size != 0
+
+ def __repr__(self):
+ return '<Interval: %s>' % str(self.tuple)
+
+ def intersect(self, other):
+ start = max(self.start, other.start)
+ end = min(self.end, other.end)
+
+ if end > start:
+ return Interval(start, end)
+
+ def overlaps(self, other):
+ earlier = self if self.start <= other.start else other
+ later = self if earlier is other else other
+ return earlier.end >= later.start
+
+ def union(self, other):
+ if not self.overlaps(other):
+ raise TypeError("Union of disjoint intervals is not an interval")
+
+ start = min(self.start, other.start)
+ end = max(self.end, other.end)
+ return Interval(start, end)
+
+
+def union_overlapping(intervals):
+ """Union any overlapping intervals in the given set."""
+ disjoint_intervals = []
+
+ for interval in intervals:
+ if not disjoint_intervals or interval.start > disjoint_intervals[-1].end:
+ disjoint_intervals.append(interval)
+ elif interval.end > disjoint_intervals[-1].end:
+ disjoint_intervals[-1] = Interval(disjoint_intervals[-1].start, interval.end)
+
+ return disjoint_intervals
View
33 webapp/graphite/local_settings.py.example
@@ -21,6 +21,9 @@ except NameError:
# Override this to provide documentation specific to your Graphite deployment
#DOCUMENTATION_URL = "http://graphite.readthedocs.org/"
+# Metric data and graphs are cached for one minute by default
+#DEFAULT_CACHE_DURATION = 60
+
# Logging
#LOG_RENDERING_PERFORMANCE = True
#LOG_CACHE_PERFORMANCE = True
@@ -55,10 +58,12 @@ except NameError:
#GRAPHTEMPLATES_CONF = '/opt/graphite/conf/graphTemplates.conf'
## Data directories
-# NOTE: If any directory is unreadable in DATA_DIRS it will break metric browsing
+# NOTE: If any directory is unreadable in STANDARD_DIRS it will break metric browsing
+#CERES_DIR = '/opt/graphite/storage/ceres'
#WHISPER_DIR = '/opt/graphite/storage/whisper'
#RRD_DIR = '/opt/graphite/storage/rrd'
-#DATA_DIRS = [WHISPER_DIR, RRD_DIR] # Default: set from the above variables
+# Data directories using the "Standard" finder (i.e. not Ceres)
+#STANDARD_DIRS = [WHISPER_DIR, RRD_DIR] # Default: set from the above variables
#LOG_DIR = '/opt/graphite/storage/log/webapp'
#INDEX_FILE = '/opt/graphite/storage/index' # Search index file
@@ -165,10 +170,16 @@ except NameError:
#CLUSTER_SERVERS = ["10.0.2.2:80", "10.0.2.3:80"]
## These are timeout values (in seconds) for requests to remote webapps
-#REMOTE_STORE_FETCH_TIMEOUT = 6 # Timeout to fetch series data
-#REMOTE_STORE_FIND_TIMEOUT = 2.5 # Timeout for metric find requests
-#REMOTE_STORE_RETRY_DELAY = 60 # Time before retrying a failed remote webapp
-#REMOTE_FIND_CACHE_DURATION = 300 # Time to cache remote metric find results
+#REMOTE_FIND_TIMEOUT = 3.0 # Timeout for metric find requests
+#REMOTE_FETCH_TIMEOUT = 6.0 # Timeout to fetch series data
+#REMOTE_RETRY_DELAY = 60.0 # Time before retrying a failed remote webapp
+#REMOTE_READER_CACHE_SIZE_LIMIT = 1000 # Maximum number of remote URL queries to cache
+#FIND_CACHE_DURATION = 300 # Time to cache remote metric find results
+# If the query doesn't fall entirely within the FIND_TOLERANCE window
+# we disregard the window. This prevents unnecessary remote fetches
+# caused when carbon's cache skews node.intervals, giving the appearance
+# remote systems have data we don't have locally, which we probably do.
+#FIND_TOLERANCE = 2 * FIND_CACHE_DURATION
## Remote rendering settings
# Set to True to enable rendering of Graphs on a remote webapp
@@ -187,6 +198,16 @@ except NameError:
# You *should* use 127.0.0.1 here in most cases
#CARBONLINK_HOSTS = ["127.0.0.1:7002:a", "127.0.0.1:7102:b", "127.0.0.1:7202:c"]
#CARBONLINK_TIMEOUT = 1.0
+#CARBONLINK_RETRY_DELAY = 15 # Seconds to blacklist a failed remote server
+
+# A "keyfunc" is a user-defined python function that is given a metric name
+# and returns a string that should be used when hashing the metric name.
+# This is important when your hashing has to respect certain metric groupings.
+#CARBONLINK_HASHING_KEYFUNC = "/opt/graphite/bin/keyfuncs.py:my_keyfunc"
+
+# The replication factor to use with consistent hashing
+# This should usually match the value configured in Carbon
+#REPLICATION_FACTOR = 1
# This lists the memcached servers that will be used by this webapp.
# If you have a cluster of webapps you should ensure all of them
View
96 webapp/graphite/metrics/views.py
@@ -18,9 +18,9 @@
from graphite.account.models import Profile
from graphite.util import getProfile, getProfileByUsername, defaultUser, json
from graphite.logger import log
-from graphite.storage import STORE, LOCAL_STORE
+from graphite.storage import STORE
from graphite.metrics.search import searcher
-from graphite.render.datalib import CarbonLink
+from graphite.carbonlink import CarbonLink
import fnmatch, os
try:
@@ -61,47 +61,20 @@ def search_view(request):
return HttpResponse(result_data, mimetype='application/json')
-def context_view(request):
- if request.method == 'GET':
- contexts = []
-
- if not 'metric' not in request.GET:
- return HttpResponse('{ "error" : "missing required parameter \"metric\"" }', mimetype='application/json')
-
- for metric in request.GET.getlist('metric'):
- try:
- context = STORE.get(metric).context
- except:
- contexts.append({ 'metric' : metric, 'error' : 'failed to retrieve context', 'traceback' : traceback.format_exc() })
- else:
- contexts.append({ 'metric' : metric, 'context' : context })
-
- content = json.dumps( { 'contexts' : contexts } )
- return HttpResponse(content, mimetype='application/json')
-
- elif request.method == 'POST':
-
- if 'metric' not in request.POST:
- return HttpResponse('{ "error" : "missing required parameter \"metric\"" }', mimetype='application/json')
-
- newContext = dict( item for item in request.POST.items() if item[0] != 'metric' )
-
- for metric in request.POST.getlist('metric'):
- STORE.get(metric).updateContext(newContext)
-
- return HttpResponse('{ "success" : true }', mimetype='application/json')
-
- else:
- return HttpResponseBadRequest("invalid method, must be GET or POST")
-
-
def find_view(request):
"View for finding metrics matching a given pattern"
profile = getProfile(request)
format = request.REQUEST.get('format', 'treejson')
local_only = int( request.REQUEST.get('local', 0) )
- contexts = int( request.REQUEST.get('contexts', 0) )
wildcards = int( request.REQUEST.get('wildcards', 0) )
+ fromTime = int( request.REQUEST.get('from', -1) )
+ untilTime = int( request.REQUEST.get('until', -1) )
+
+ if fromTime == -1:
+ fromTime = None
+ if untilTime == -1:
+ untilTime = None
+
automatic_variants = int( request.REQUEST.get('automatic_variants', 0) )
try:
@@ -114,11 +87,6 @@ def find_view(request):
else:
base_path = ''
- if local_only:
- store = LOCAL_STORE
- else:
- store = STORE
-
if format == 'completer':
query = query.replace('..', '*.')
if not query.endswith('*'):
@@ -132,28 +100,27 @@ def find_view(request):
query = '.'.join(query_parts)
try:
- matches = list( store.find(query) )
+ matches = list( STORE.find(query, fromTime, untilTime, local=local_only) )
except:
log.exception()
raise
log.info('find_view query=%s local_only=%s matches=%d' % (query, local_only, len(matches)))
matches.sort(key=lambda node: node.name)
+ log.info("received remote find request: pattern=%s from=%s until=%s local_only=%s format=%s matches=%d" % (query, fromTime, untilTime, local_only, format, len(matches)))
if format == 'treejson':
- content = tree_json(matches, base_path, wildcards=profile.advancedUI or wildcards, contexts=contexts)
+ content = tree_json(matches, base_path, wildcards=profile.advancedUI or wildcards)
response = HttpResponse(content, mimetype='application/json')
elif format == 'pickle':
- content = pickle_nodes(matches, contexts=contexts)
+ content = pickle_nodes(matches)
response = HttpResponse(content, mimetype='application/pickle')
elif format == 'completer':
- #if len(matches) == 1 and (not matches[0].isLeaf()) and query == matches[0].metric_path + '*': # auto-complete children
- # matches = list( store.find(query + '.*') )
results = []
for node in matches:
- node_info = dict(path=node.metric_path, name=node.name, is_leaf=str(int(node.isLeaf())))
+ node_info = dict(path=node.path, name=node.name, is_leaf=str(int(node.is_leaf)))
if not node.isLeaf():
node_info['path'] += '.'
results.append(node_info)
@@ -179,15 +146,10 @@ def expand_view(request):
group_by_expr = int( request.REQUEST.get('groupByExpr', 0) )
leaves_only = int( request.REQUEST.get('leavesOnly', 0) )
- if local_only:
- store = LOCAL_STORE
- else:
- store = STORE
-
results = {}
for query in request.REQUEST.getlist('query'):
results[query] = set()
- for node in store.find(query):
+ for node in STORE.find(query, local=local_only):
if node.isLeaf() or not leaves_only:
results[query].add( node.metric_path )
@@ -257,7 +219,7 @@ def set_metadata_view(request):
return HttpResponse(json.dumps(results), mimetype='application/json')
-def tree_json(nodes, base_path, wildcards=False, contexts=False):
+def tree_json(nodes, base_path, wildcards=False):
results = []
branchNode = {
@@ -275,7 +237,7 @@ def tree_json(nodes, base_path, wildcards=False, contexts=False):
if len(nodes) > 1 and wildcards:
wildcardNode = {'text' : '*', 'id' : base_path + '*'}
- if any(not n.isLeaf() for n in nodes):
+ if any(not n.is_leaf for n in nodes):
wildcardNode.update(branchNode)
else:
@@ -296,12 +258,7 @@ def tree_json(nodes, base_path, wildcards=False, contexts=False):
'id' : base_path + str(node.name),
}
- if contexts:
- resultNode['context'] = node.context
- else:
- resultNode['context'] = {}
-
- if node.isLeaf():
+ if node.is_leaf:
resultNode.update(leafNode)
results_leaf.append(resultNode)
else:
@@ -313,12 +270,17 @@ def tree_json(nodes, base_path, wildcards=False, contexts=False):
return json.dumps(results)
-def pickle_nodes(nodes, contexts=False):
- if contexts:
- return pickle.dumps([ { 'metric_path' : n.metric_path, 'isLeaf' : n.isLeaf(), 'intervals' : n.getIntervals(), 'context' : n.context } for n in nodes ])
+def pickle_nodes(nodes):
+ nodes_info = []
- else:
- return pickle.dumps([ { 'metric_path' : n.metric_path, 'isLeaf' : n.isLeaf(), 'intervals' : n.getIntervals()} for n in nodes ])
+ for node in nodes:
+ info = dict(path=node.path, is_leaf=node.is_leaf)
+ if node.is_leaf:
+ info['intervals'] = node.intervals
+
+ nodes_info.append(info)
+
+ return pickle.dumps(nodes_info, protocol=-1)
def any(iterable): #python2.4 compatibility
View
33 webapp/graphite/node.py
@@ -0,0 +1,33 @@
+
+
+class Node(object):
+ __slots__ = ('name', 'path', 'local', 'is_leaf')
+
+ def __init__(self, path):
+ self.path = path
+ self.name = path.split('.')[-1]
+ self.local = True
+ self.is_leaf = False
+
+ def __repr__(self):
+ return '<%s[%x]: %s>' % (self.__class__.__name__, id(self), self.path)
+
+
+class BranchNode(Node):
+ pass
+
+
+class LeafNode(Node):
+ __slots__ = ('reader', 'intervals')
+
+ def __init__(self, path, reader):
+ Node.__init__(self, path)
+ self.reader = reader
+ self.intervals = reader.get_intervals()
+ self.is_leaf = True
+
+ def fetch(self, startTime, endTime):
+ return self.reader.fetch(startTime, endTime)
+
+ def __repr__(self):
+ return '<LeafNode[%x]: %s (%s)>' % (id(self), self.path, self.reader)
View
240 webapp/graphite/readers.py
@@ -0,0 +1,240 @@
+import os
+import time
+from graphite.node import LeafNode, BranchNode
+from graphite.intervals import Interval, IntervalSet
+from graphite.carbonlink import CarbonLink
+from graphite.logger import log
+
+
+try:
+ import whisper
+except ImportError:
+ whisper = False
+
+try:
+ import rrdtool
+except ImportError:
+ rrdtool = False
+
+try:
+ import gzip
+except ImportError:
+ gzip = False
+
+
+class FetchInProgress(object):
+ def __init__(self, wait_callback):
+ self.wait_callback = wait_callback
+
+ def waitForResults(self):
+ return self.wait_callback()
+
+
+class MultiReader(object):
+ __slots__ = ('nodes',)
+
+ def __init__(self, nodes):
+ self.nodes = nodes
+
+ def get_intervals(self):
+ interval_sets = []
+ for node in self.nodes:
+ interval_sets.extend( node.intervals.intervals )
+ return IntervalSet( sorted(interval_sets) )
+
+ def fetch(self, startTime, endTime):
+ # Start the fetch on each node
+ results = [ n.fetch(startTime, endTime) for n in self.nodes ]
+
+ # Wait for any asynchronous operations to complete
+ for i, result in enumerate(results):
+ if isinstance(result, FetchInProgress):
+ try:
+ results[i] = result.waitForResults()
+ except:
+ log.exception("Failed to complete subfetch")
+ results[i] = None
+
+ results = [r for r in results if r is not None]
+ if not results:
+ raise Exception("All sub-fetches failed")
+
+ return reduce(self.merge, results)
+
+ def merge(self, results1, results2):
+ # Ensure results1 is finer than results2
+ if results1[0][2] > results2[0][2]:
+ results1, results2 = results2, results1
+
+ time_info1, values1 = results1
+ time_info2, values2 = results2
+ start1, end1, step1 = time_info1
+ start2, end2, step2 = time_info2
+
+ step = step1 # finest step
+ start = min(start1, start2) # earliest start
+ end = max(end1, end2) # latest end
+ time_info = (start, end, step)
+ values = []
+
+ t = start
+ while t < end:
+ # Look for the finer precision value first if available
+ i1 = (t - start1) / step1
+
+ if len(values1) > i1:
+ v1 = values1[i1]
+ else:
+ v1 = None
+
+ if v1 is None:
+ i2 = (t - start2) / step2
+
+ if len(values2) > i2:
+ v2 = values2[i2]
+ else:
+ v2 = None
+
+ values.append(v2)
+ else:
+ values.append(v1)
+
+ t += step
+
+ return (time_info, values)
+
+
+class CeresReader(object):
+ __slots__ = ('ceres_node', 'real_metric_path')
+ supported = True
+
+ def __init__(self, ceres_node, real_metric_path):
+ self.ceres_node = ceres_node
+ self.real_metric_path = real_metric_path
+
+ def get_intervals(self):
+ intervals = []
+ for info in self.ceres_node.slice_info:
+ (start, end, step) = info
+ intervals.append( Interval(start, end) )
+
+ return IntervalSet(intervals)
+
+ def fetch(self, startTime, endTime):
+ data = self.ceres_node.read(startTime, endTime)
+ time_info = (data.startTime, data.endTime, data.timeStep)
+ values = list(data.values)
+
+ # Merge in data from carbon's cache
+ try:
+ cached_datapoints = CarbonLink.query(self.real_metric_path)
+ except:
+ log.exception("Failed CarbonLink query '%s'" % self.real_metric_path)
+ cached_datapoints = []
+
+ for (timestamp, value) in cached_datapoints:
+ interval = timestamp - (timestamp % data.timeStep)
+
+ try:
+ i = int(interval - data.startTime) / data.timeStep
+ values[i] = value
+ except:
+ pass
+
+ return (time_info, values)
+
+
+class WhisperReader(object):
+ __slots__ = ('fs_path',)
+ supported = bool(whisper)
+
+ def __init__(self, fs_path):
+ self.fs_path = fs_path
+
+ def get_intervals(self):
+ start = time.time() - whisper.info(self.fs_path)['maxRetention']
+ end = max( os.stat(self.fs_path).st_mtime, start )
+ return IntervalSet( [Interval(start, end)] )
+
+ def fetch(self, startTime, endTime):
+ return whisper.fetch(self.fs_path, startTime, endTime)
+
+
+class GzippedWhisperReader(WhisperReader):
+ supported = bool(whisper and gzip)
+
+ def get_intervals(self):
+ fh = gzip.GzipFile(self.fs_path, 'rb')
+ try:
+ info = whisper.__readHeader(fh) # evil, but necessary.
+ finally:
+ fh.close()
+
+ start = time.time() - info['maxRetention']
+ end = max( os.stat(self.fs_path).st_mtime, start )
+ return IntervalSet( [Interval(start, end)] )
+
+ def fetch(self, startTime, endTime):
+ fh = gzip.GzipFile(self.fs_path, 'rb')
+ try:
+ return whisper.file_fetch(fh, startTime, endTime)
+ finally:
+ fh.close()
+
+
+class RRDReader:
+ supported = bool(rrdtool)
+
+ def __init__(self, fs_path, datasource_name):
+ self.fs_path = fs_path
+ self.datasource_name = datasource_name
+
+ def get_intervals(self):
+ start = time.time() - self.get_retention(self.fs_path)
+ end = max( os.stat(self.fs_path).st_mtime, start )
+ return IntervalSet( [Interval(start, end)] )
+
+ def fetch(self, startTime, endTime):
+ startString = time.strftime("%H:%M_%Y%m%d+%Ss", time.localtime(startTime))
+ endString = time.strftime("%H:%M_%Y%m%d+%Ss", time.localtime(endTime))
+
+ if settings.FLUSHRRDCACHED:
+ rrdtool.flushcached(self.fs_path, '--daemon', settings.FLUSHRRDCACHED)
+
+ (timeInfo, columns, rows) = rrdtool.fetch(self.fs_path,'AVERAGE','-s' + startString,'-e' + endString)
+ colIndex = list(columns).index(self.datasource_name)
+ rows.pop() #chop off the latest value because RRD returns crazy last values sometimes
+ values = (row[colIndex] for row in rows)
+
+ return (timeInfo, values)
+
+ @staticmethod
+ def get_datasources(fs_path):
+ info = rrdtool.info(fs_path)
+
+ if 'ds' in info:
+ return [datasource_name for datasource_name in info['ds']]
+ else:
+ ds_keys = [ key for key in info if key.startswith('ds[') ]
+ datasources = set( key[3:].split(']')[0] for key in ds_keys )
+ return list(datasources)
+
+ @staticmethod
+ def get_retention(fs_path):
+ if 'rra' in info:
+ rras = info['rra']
+ else:
+ # Ugh, I like the old python-rrdtool api better..
+ rra_keys = max([ int(key[4]) for key in info if key.startswith('rra[') ]) + 1
+ rras = [{}] * rra_count
+ for i in range(rra_count):
+ rras[i]['pdp_per_row'] = info['rra[%d].pdp_per_row' % i]
+ rras[i]['rows'] = info['rra[%d].rows' % i]
+
+ retention_points = 0
+ for rra in rras:
+ points = rra['pdp_per_row'] * rra['rows']
+ if points > retention_points:
+ retention_points = points
+
+ return retention_points * info['step']
View
246 webapp/graphite/remote_storage.py
@@ -2,141 +2,249 @@
import time
import httplib
from urllib import urlencode
-from django.core.cache import cache
+from threading import Lock, Event
from django.conf import settings
-from graphite.render.hashing import compactHash
+from django.core.cache import cache
+from graphite.node import LeafNode, BranchNode
+from graphite.intervals import Interval, IntervalSet
+from graphite.readers import FetchInProgress
+from graphite.logger import log
try:
import cPickle as pickle
except ImportError:
import pickle
-
class RemoteStore(object):
lastFailure = 0.0
- retryDelay = settings.REMOTE_STORE_RETRY_DELAY
- available = property(lambda self: time.time() - self.lastFailure > self.retryDelay)
+ available = property(lambda self: time.time() - self.lastFailure > settings.REMOTE_RETRY_DELAY)
def __init__(self, host):
self.host = host
-
def find(self, query):
request = FindRequest(self, query)
request.send()
return request
-
def fail(self):
self.lastFailure = time.time()
-
-class FindRequest:
- suppressErrors = True
+class FindRequest(object):
+ __slots__ = ('store', 'query', 'connection',
+ 'failed', 'cacheKey', 'cachedResult')
def __init__(self, store, query):
self.store = store
self.query = query
self.connection = None
- self.cacheKey = compactHash('find:%s:%s' % (self.store.host, query))
- self.cachedResults = None
+ self.failed = False
+
+ if query.startTime:
+ start = query.startTime - (query.startTime % settings.FIND_CACHE_DURATION)
+ else:
+ start = ""
+ if query.endTime:
+ end = query.endTime - (query.endTime % settings.FIND_CACHE_DURATION)
+ else:
+ end = ""
+
+ self.cacheKey = "find:%s:%s:%s:%s" % (store.host, query.pattern, start, end)
+ self.cachedResult = None
def send(self):
- self.cachedResults = cache.get(self.cacheKey)
+ log.info("FindRequest.send(host=%s, query=%s) called" % (self.store.host, self.query))
- if self.cachedResults:
+ self.cachedResult = cache.get(self.cacheKey)
+ if self.cachedResult is not None:
+ log.info("FindRequest(host=%s, query=%s) using cached result" % (self.store.host, self.query))
return
self.connection = HTTPConnectionWithTimeout(self.store.host)
- self.connection.timeout = settings.REMOTE_STORE_FIND_TIMEOUT
+ self.connection.timeout = settings.REMOTE_FIND_TIMEOUT
query_params = [
('local', '1'),
('format', 'pickle'),
- ('query', self.query),
+ ('query', self.query.pattern),
]
+ if self.query.startTime:
+ query_params.append( ('from', self.query.startTime) )
+
+ if self.query.endTime:
+ query_params.append( ('until', self.query.endTime) )
+
query_string = urlencode(query_params)
try:
self.connection.request('GET', '/metrics/find/?' + query_string)
except:
+ log.exception("FindRequest.send(host=%s, query=%s) exception during request" % (self.store.host, self.query))
self.store.fail()
- if not self.suppressErrors:
- raise
-
+ self.failed = True
def get_results(self):
- if self.cachedResults:
- return self.cachedResults
-
- if not self.connection:
- self.send()
+ if self.failed:
+ return
- try:
- response = self.connection.getresponse()
- assert response.status == 200, "received error response %s - %s" % (response.status, response.reason)
- result_data = response.read()
- results = pickle.loads(result_data)
+ if self.cachedResult is not None:
+ results = self.cachedResult
+ else:
+ if self.connection is None:
+ self.send()
- except:
- self.store.fail()
- if not self.suppressErrors:
- raise
+ try:
+ response = self.connection.getresponse()
+ assert response.status == 200, "received error response %s - %s" % (response.status, response.reason)
+ result_data = response.read()
+ results = pickle.loads(result_data)
+
+ except:
+ log.exception("FindRequest.get_results(host=%s, query=%s) exception processing response" % (self.store.host, self.query))
+ self.store.fail()
+ return
+
+ cache.set(self.cacheKey, results, settings.FIND_CACHE_DURATION)
+
+ for node_info in results:
+ if 'isLeaf' in node_info: # 0.9.7 hacks
+ node_info['is_leaf'] = node_info.pop('isLeaf')
+ node_info['path'] = node_info.pop('metric_path')
+ node_info['intervals'] = IntervalSet([ Interval(*args) for args in node_info['intervals'] ])
+
+ if node_info.get('is_leaf'):
+ reader = RemoteReader(self.store, node_info, bulk_query=self.query.pattern)
+ node = LeafNode(node_info['path'], reader)
else:
- results = []
-
- resultNodes = [ RemoteNode(self.store, node['metric_path'], node['isLeaf']) for node in results ]
- cache.set(self.cacheKey, resultNodes, settings.REMOTE_FIND_CACHE_DURATION)
- self.cachedResults = resultNodes
- return resultNodes
+ node = BranchNode(node_info['path'])
+ node.local = False
+ yield node
-class RemoteNode:
- context = {}
+class RemoteReader(object):
+ __slots__ = ('store', 'metric_path', 'intervals', 'query')
+ cache_lock = Lock()
+ request_cache = {}
+ request_locks = {}
+ request_times = {}
- def __init__(self, store, metric_path, isLeaf):
+ def __init__(self, store, node_info, bulk_query=None):
self.store = store
- self.fs_path = None
- self.metric_path = metric_path
- self.real_metric = metric_path
- self.name = metric_path.split('.')[-1]
- self.__isLeaf = isLeaf
+ self.metric_path = node_info['path']
+ self.intervals = node_info['intervals']
+ self.query = bulk_query or node_info['path']
+ def __repr__(self):
+ return '<RemoteReader[%x]: %s>' % (id(self), self.store.host)
- def fetch(self, startTime, endTime):
- if not self.__isLeaf:
- return []
+ def get_intervals(self):
+ return self.intervals
+ def fetch(self, startTime, endTime):
query_params = [
- ('target', self.metric_path),
+ ('target', self.query),
('format', 'pickle'),
+ ('local', '1'),
+ ('noCache', '1'),
('from', str( int(startTime) )),
('until', str( int(endTime) ))
]
query_string = urlencode(query_params)
+ urlpath = '/render/?' + query_string
+ url = "http://%s%s" % (self.store.host, urlpath)
+
+ # Quick cache check up front
+ self.clean_cache()
+ cached_results = self.request_cache.get(url)
+ if cached_results:
+ for series in cached_results:
+ if series['name'] == self.metric_path:
+ time_info = (series['start'], series['end'], series['step'])
+ return (time_info, series['values'])
+
+ # Synchronize with other RemoteReaders using the same bulk query.
+ # Despite our use of thread synchronization primitives, the common
+ # case is for synchronizing asynchronous fetch operations within
+ # a single thread.
+ (request_lock, wait_lock, completion_event) = self.get_request_locks(url)
+
+ if request_lock.acquire(False): # we only send the request the first time we're called
+ try:
+ log.info("RemoteReader.request_data :: requesting %s" % url)
+ connection = HTTPConnectionWithTimeout(self.store.host)
+ connection.timeout = settings.REMOTE_FETCH_TIMEOUT
+ connection.request('GET', urlpath)
+ except:
+ completion_event.set()
+ self.store.fail()
+ log.exception("Error requesting %s" % url)
+ raise
- connection = HTTPConnectionWithTimeout(self.store.host)
- connection.timeout = settings.REMOTE_STORE_FETCH_TIMEOUT
- connection.request('GET', '/render/?' + query_string)
- response = connection.getresponse()
- assert response.status == 200, "Failed to retrieve remote data: %d %s" % (response.status, response.reason)
- rawData = response.read()
-
- seriesList = pickle.loads(rawData)
- assert len(seriesList) == 1, "Invalid result: seriesList=%s" % str(seriesList)
- series = seriesList[0]
-
- timeInfo = (series['start'], series['end'], series['step'])
- return (timeInfo, series['values'])
-
-
- def isLeaf(self):
- return self.__isLeaf
-
+ def wait_for_results():
+ if wait_lock.acquire(False): # the FetchInProgress that gets waited on waits for the actual completion
+ try:
+ response = connection.getresponse()
+ if response.status != 200:
+ raise Exception("Error response %d %s from %s" % (response.status, response.reason, url))
+
+ pickled_response = response.read()
+ results = pickle.loads(pickled_response)
+ self.cache_lock.acquire()
+ self.request_cache[url] = results
+ self.cache_lock.release()
+ completion_event.set()
+ return results
+ except:
+ completion_event.set()
+ self.store.fail()
+ log.exception("Error requesting %s" % url)
+ raise
+
+ else: # otherwise we just wait on the completion_event
+ completion_event.wait(settings.REMOTE_FETCH_TIMEOUT)
+ cached_results = self.request_cache.get(url)
+ if cached_results is None:
+ raise Exception("Passive remote fetch failed to find cached results")
+ else:
+ return cached_results
+
+ def extract_my_results():
+ for series in wait_for_results():
+ if series['name'] == self.metric_path:
+ time_info = (series['start'], series['end'], series['step'])
+ return (time_info, series['values'])
+
+ return FetchInProgress(extract_my_results)
+
+ def clean_cache(self):
+ self.cache_lock.acquire()
+ try:
+ if len(self.request_locks) >= settings.REMOTE_READER_CACHE_SIZE_LIMIT:
+ log.info("RemoteReader.request_data :: clearing old from request_cache and request_locks")
+ now = time.time()
+ for url, timestamp in self.request_times.items():
+ age = now - timestamp
+ if age >= (2 * settings.REMOTE_FETCH_TIMEOUT):
+ del self.request_locks[url]
+ del self.request_times[url]
+ if url in self.request_cache:
+ del self.request_cache[url]
+ finally:
+ self.cache_lock.release()
+
+ def get_request_locks(self, url):
+ self.cache_lock.acquire()
+ try:
+ if url not in self.request_locks:
+ self.request_locks[url] = (Lock(), Lock(), Event())
+ self.request_times[url] = time.time()
+ return self.request_locks[url]
+ finally:
+ self.cache_lock.release()
# This is a hack to put a timeout in the connect() of an HTTP request.
View
204 webapp/graphite/render/datalib.py
@@ -12,27 +12,19 @@
See the License for the specific language governing permissions and
limitations under the License."""
-import socket
-import struct
import time
-from django.conf import settings
from graphite.logger import log
-from graphite.storage import STORE, LOCAL_STORE
-from graphite.render.hashing import ConsistentHashRing
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
+from graphite.storage import STORE
+from graphite.readers import FetchInProgress
class TimeSeries(list):
def __init__(self, name, start, end, step, values, consolidate='average'):
+ list.__init__(self, values)
self.name = name
self.start = start
self.end = end
self.step = step
- list.__init__(self,values)
self.consolidationFunc = consolidate
self.valuesPerPoint = 1
self.options = {}
@@ -91,179 +83,49 @@ def getInfo(self):
}
-
-class CarbonLinkPool:
- def __init__(self, hosts, timeout):
- self.hosts = [ (server, instance) for (server, port, instance) in hosts ]
- self.ports = dict( ((server, instance), port) for (server, port, instance) in hosts )
- self.timeout = float(timeout)
- self.hash_ring = ConsistentHashRing(self.hosts)
- self.connections = {}
- self.last_failure = {}
- # Create a connection pool for each host
- for host in self.hosts:
- self.connections[host] = set()
-
- def select_host(self, metric):
- "Returns the carbon host that has data for the given metric"
- return self.hash_ring.get_node(metric)
-
- def get_connection(self, host):
- # First try to take one out of the pool for this host
- (server, instance) = host
- port = self.ports[host]
- connectionPool = self.connections[host]
- try:
- return connectionPool.pop()
- except KeyError:
- pass #nothing left in the pool, gotta make a new connection
-
- log.cache("CarbonLink creating a new socket for %s" % str(host))
- connection = socket.socket()
- connection.settimeout(self.timeout)
- try:
- connection.connect( (server, port) )
- except:
- self.last_failure[host] = time.time()
- raise
- else:
- connection.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1 )
- return connection
-
- def query(self, metric):
- request = dict(type='cache-query', metric=metric)
- results = self.send_request(request)
- log.cache("CarbonLink cache-query request for %s returned %d datapoints" % (metric, len(results)))
- return results['datapoints']
-
- def get_metadata(self, metric, key):
- request = dict(type='get-metadata', metric=metric, key=key)
- results = self.send_request(request)
- log.cache("CarbonLink get-metadata request received for %s:%s" % (metric, key))
- return results['value']
-
- def set_metadata(self, metric, key, value):
- request = dict(type='set-metadata', metric=metric, key=key, value=value)
- results = self.send_request(request)
- log.cache("CarbonLink set-metadata request received for %s:%s" % (metric, key))
- return results
-
- def send_request(self, request):
- metric = request['metric']
- serialized_request = pickle.dumps(request, protocol=-1)
- len_prefix = struct.pack("!L", len(serialized_request))
- request_packet = len_prefix + serialized_request
-
- host = self.select_host(metric)
- conn = self.get_connection(host)
- try:
- conn.sendall(request_packet)
- result = self.recv_response(conn)
- except:
- self.last_failure[host] = time.time()
- raise
- else:
- self.connections[host].add(conn)
- if 'error' in result:
- raise CarbonLinkRequestError(result['error'])
- else:
- return result
-
- def recv_response(self, conn):
- len_prefix = recv_exactly(conn, 4)
- body_size = struct.unpack("!L", len_prefix)[0]
- body = recv_exactly(conn, body_size)
- return pickle.loads(body)
-
-
-# Utilities
-class CarbonLinkRequestError(Exception):
- pass
-
-def recv_exactly(conn, num_bytes):
- buf = ''
- while len(buf) < num_bytes:
- data = conn.recv( num_bytes - len(buf) )
- if not data:
- raise Exception("Connection lost")
- buf += data
-
- return buf
-
-#parse hosts from local_settings.py
-hosts = []
-for host in settings.CARBONLINK_HOSTS:
- parts = host.split(':')
- server = parts[0]
- port = int( parts[1] )
- if len(parts) > 2:
- instance = parts[2]
- else:
- instance = None
-
- hosts.append( (server, int(port), instance) )
-
-
-#A shared importable singleton
-CarbonLink = CarbonLinkPool(hosts, settings.CARBONLINK_TIMEOUT)
-
-
# Data retrieval API
def fetchData(requestContext, pathExpr):
+
seriesList = []
- startTime = requestContext['startTime']
- endTime = requestContext['endTime']
-
- if requestContext['localOnly']:
- store = LOCAL_STORE
- else:
- store = STORE
-
- for dbFile in store.find(pathExpr):
- log.metric_access(dbFile.metric_path)
- dbResults = dbFile.fetch( timestamp(startTime), timestamp(endTime) )
- try:
- cachedResults = CarbonLink.query(dbFile.real_metric)
- results = mergeResults(dbResults, cachedResults)
- except:
- log.exception()
- results = dbResults
+ startTime = int( time.mktime( requestContext['startTime'].timetuple() ) )
+ endTime = int( time.mktime( requestContext['endTime'].timetuple() ) )
+
+ matching_nodes = STORE.find(pathExpr, startTime, endTime, local=requestContext['localOnly'])
+ fetches = [(node, node.fetch(startTime, endTime)) for node in matching_nodes if node.is_leaf]
+
+ for node, results in fetches:
+ if isinstance(results, FetchInProgress):
+ results = results.waitForResults()
if not results:
+ log.info("render.datalib.fetchData :: no results for %s.fetch(%s, %s)" % (node, startTime, endTime))
continue
- (timeInfo,values) = results
- (start,end,step) = timeInfo
- series = TimeSeries(dbFile.metric_path, start, end, step, values)
+ (timeInfo, values) = results
+ (start, end, step) = timeInfo
+
+ series = TimeSeries(node.path, start, end, step, values)
series.pathExpression = pathExpr #hack to pass expressions through to render functions
seriesList.append(series)
- return seriesList
-
+ # Prune empty series with duplicate metric paths to avoid showing empty graph elements for old whisper data
+ names = set([ series.name for series in seriesList ])
+ for name in names:
+ series_with_duplicate_names = [ series for series in seriesList if series.name == name ]
+ empty_duplicates = [ series for series in series_with_duplicate_names if not nonempty(series) ]
-def mergeResults(dbResults, cacheResults):
- cacheResults = list(cacheResults)
+ if series_with_duplicate_names == empty_duplicates and len(empty_duplicates) > 0: # if they're all empty
+ empty_duplicates.pop() # make sure we leave one in seriesList
- if not dbResults:
- return cacheResults
- elif not cacheResults:
- return dbResults
+ for series in empty_duplicates:
+ seriesList.remove(series)
- (timeInfo,values) = dbResults
- (start,end,step) = timeInfo
-
- for (timestamp, value) in cacheResults:
- interval = timestamp - (timestamp % step)
-
- try:
- i = int(interval - start) / step
- values[i] = value
- except:
- pass
+ return seriesList
- return (timeInfo,values)
+def nonempty(series):
+ for value in series:
+ if value is not None:
+ return True
-def timestamp(datetime):
- "Convert a datetime object into epoch time"
- return time.mktime( datetime.timetuple() )
+ return False
View
7 webapp/graphite/render/functions.py
@@ -21,8 +21,9 @@
import time
from graphite.logger import log
-from graphite.render.datalib import fetchData, TimeSeries, timestamp
+from graphite.render.datalib import TimeSeries
from graphite.render.attime import parseTimeOffset
+from graphite.util import timestamp
from graphite.events import models
#XXX format_units() should go somewhere else
@@ -38,6 +39,9 @@
HOUR = 3600
MINUTE = 60
+NAN = float('NaN')
+INF = float('inf')
+
#Utility functions
def safeSum(values):
safeValues = [v for v in values if v is not None]
@@ -2370,6 +2374,7 @@ def pieMinimum(requestContext, series):
'minSeries' : minSeries,
'maxSeries' : maxSeries,
'rangeOfSeries': rangeOfSeries,
+ 'percentileOfSeries': percentileOfSeries,
# Transform functions
'scale' : scale,
View
4 webapp/graphite/render/glyph.py
@@ -97,6 +97,8 @@
dict(seconds=16000, minorGridUnit=DAY, minorGridStep=7, majorGridUnit=DAY, majorGridStep=14, labelUnit=DAY, labelStep=14, format="%m/%d", maxInterval=365*DAY),
dict(seconds=32000, minorGridUnit=DAY, minorGridStep=15, majorGridUnit=DAY, majorGridStep=30, labelUnit=DAY, labelStep=30, format="%m/%d", maxInterval=365*DAY),
dict(seconds=64000, minorGridUnit=DAY, minorGridStep=30, majorGridUnit=DAY, majorGridStep=60, labelUnit=DAY, labelStep=60, format="%m/%d %Y"),
+ dict(seconds=100000,minorGridUnit=DAY, minorGridStep=60, majorGridUnit=DAY, majorGridStep=120,labelUnit=DAY, labelStep=120, format="%m/%d %Y"),
+ dict(seconds=120000,minorGridUnit=DAY, minorGridStep=120,majorGridUnit=DAY, majorGridStep=240,labelUnit=DAY, labelStep=240, format="%m/%d %Y"),
)
UnitSystems = {
@@ -1655,7 +1657,7 @@ def logrange(base, scale_min, scale_max):
if scale_min > 0:
current = math.floor(math.log(scale_min, base))
factor = current
- while current <= scale_max:
+ while current < scale_max:
current = math.pow(base, factor)
yield current
factor += 1
View
26 webapp/graphite/render/hashing.py
@@ -53,33 +53,51 @@ def compactHash(string):
return hash.hexdigest()
-
class ConsistentHashRing:
def __init__(self, nodes, replica_count=100):
self.ring = []
+ self.nodes = set()
self.replica_count = replica_count
for node in nodes:
self.add_node(node)
def compute_ring_position(self, key):
big_hash = md5( str(key) ).hexdigest()
- small_hash = int(big_hash[:4], 16)
+ small_hash = int(big_hash[:4], 16)
return small_hash
def add_node(self, key):
+ self.nodes.add(key)
for i in range(self.replica_count):
replica_key = "%s:%d" % (key, i)
position = self.compute_ring_position(replica_key)
entry = (position, key)
bisect.insort(self.ring, entry)
def remove_node(self, key):
+ self.nodes.discard(key)
self.ring = [entry for entry in self.ring if entry[1] != key]
def get_node(self, key):
+ assert self.ring
position = self.compute_ring_position(key)
search_entry = (position, None)
- index = bisect.bisect_left(self.ring, search_entry)
- index %= len(self.ring)
+ index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
entry = self.ring[index]
return entry[1]
+
+ def get_nodes(self, key):
+ nodes = []
+ position = self.compute_ring_position(key)
+ search_entry = (position, None)
+ index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
+ last_index = (index - 1) % len(self.ring)
+ while len(nodes) < len(self.nodes) and index != last_index:
+ next_entry = self.ring[index]
+ (position, next_node) = next_entry
+ if next_node not in nodes:
+ nodes.append(next_node)
+
+ index = (index + 1) % len(self.ring)
+
+ return nodes
View
3 webapp/graphite/render/views.py
@@ -100,6 +100,8 @@ def renderView(request):
requestContext['data'] = data = cachedData
else: # Have to actually retrieve the data now
for target in requestOptions['targets']:
+ if not target.strip():
+ continue
t = time()
seriesList = evaluateTarget(requestContext, target)
log.rendering("Retrieval of %s took %.6f" % (target, time() - t))
@@ -108,6 +110,7 @@ def renderView(request):
if useCache:
cache.set(dataKey, data, cacheTimeout)
+ # If data is all we needed, we're done
format = requestOptions.get('format')
if format == 'csv':
response = HttpResponse(mimetype='text/csv')
View
51 webapp/graphite/settings.py
@@ -17,11 +17,6 @@
from django import VERSION as DJANGO_VERSION
from os.path import join, dirname, abspath
-try:
- import rrdtool
-except ImportError:
- rrdtool = False
-
WEBAPP_VERSION = '0.9.10-pre1'
DEBUG = False
JAVASCRIPT_DEBUG = False
@@ -42,36 +37,41 @@
WHITELIST_FILE = ''
INDEX_FILE = ''
LOG_DIR = ''
+CERES_DIR = ''
WHISPER_DIR = ''
RRD_DIR = ''
-DATA_DIRS = []
+STANDARD_DIRS = []
CLUSTER_SERVERS = []
sys.path.insert(0, WEBAPP_DIR)
# Allow local versions of the libs shipped in thirdparty to take precedence
sys.path.append(THIRDPARTY_DIR)
-# Memcache settings
+# Cluster settings
+CLUSTER_SERVERS = []
+REMOTE_FIND_TIMEOUT = 3.0
+REMOTE_FETCH_TIMEOUT = 6.0
+REMOTE_RETRY_DELAY = 60.0
+REMOTE_READER_CACHE_SIZE_LIMIT = 1000
+CARBONLINK_HOSTS = ["127.0.0.1:7002"]
+CARBONLINK_TIMEOUT = 1.0
+CARBONLINK_HASHING_KEYFUNC = None
+CARBONLINK_RETRY_DELAY = 15
+REPLICATION_FACTOR = 1
MEMCACHE_HOSTS = []
+FIND_CACHE_DURATION = 300
+FIND_TOLERANCE = 2 * FIND_CACHE_DURATION
DEFAULT_CACHE_DURATION = 60 #metric data and graphs are cached for one minute by default
LOG_CACHE_PERFORMANCE = False
-# Remote store settings
-REMOTE_STORE_FETCH_TIMEOUT = 6
-REMOTE_STORE_FIND_TIMEOUT = 2.5
-REMOTE_STORE_RETRY_DELAY = 60
-REMOTE_FIND_CACHE_DURATION = 300
-
#Remote rendering settings
REMOTE_RENDERING = False #if True, rendering is delegated to RENDERING_HOSTS
RENDERING_HOSTS = []
REMOTE_RENDER_CONNECT_TIMEOUT = 1.0
LOG_RENDERING_PERFORMANCE = False
#Miscellaneous settings
-CARBONLINK_HOSTS = ["127.0.0.1:7002"]
-CARBONLINK_TIMEOUT = 1.0
SMTP_SERVER = "localhost"
DOCUMENTATION_URL = "http://graphite.readthedocs.org/"
ALLOW_ANONYMOUS_CLI = True
@@ -216,13 +216,23 @@
LOG_DIR = join(STORAGE_DIR, 'log', 'webapp')
if not WHISPER_DIR:
WHISPER_DIR = join(STORAGE_DIR, 'whisper/')
+if not CERES_DIR:
+ CERES_DIR = join(STORAGE_DIR, 'ceres/')
if not RRD_DIR:
RRD_DIR = join(STORAGE_DIR, 'rrd/')
-if not DATA_DIRS:
- if rrdtool and os.path.exists(RRD_DIR):
- DATA_DIRS = [WHISPER_DIR, RRD_DIR]
- else:
- DATA_DIRS = [WHISPER_DIR]
+if not STANDARD_DIRS:
+ try:
+ import whisper
+ if os.path.exists(WHISPER_DIR):
+ STANDARD_DIRS.append(WHISPER_DIR)
+ except ImportError:
+ print >> sys.stderr, "WARNING: whisper module could not be loaded, whisper support disabled"
+ try:
+ import rrdtool
+ if os.path.exists(RRD_DIR):
+ STANDARD_DIRS.append(RRD_DIR)
+ except ImportError:
+ pass
# Default sqlite db file
# This is set here so that a user-set STORAGE_DIR is available
@@ -244,4 +254,3 @@
if USE_LDAP_AUTH:
AUTHENTICATION_BACKENDS.insert(0,'graphite.account.ldapBackend.LDAPBackend')
-
View
523 webapp/graphite/storage.py
@@ -1,411 +1,154 @@
-import os, time, fnmatch, socket, errno
-from os.path import isdir, isfile, join, exists, splitext, basename, realpath
-import whisper
-from graphite.remote_storage import RemoteStore
+import time
from django.conf import settings
-
-try:
- import rrdtool
-except ImportError:
- rrdtool = False
-
-try:
- import gzip
-except ImportError:
- gzip = False
-
-try:
- import cPickle as pickle
-except ImportError:
- import pickle
-
-
-DATASOURCE_DELIMETER = '::RRD_DATASOURCE::'
-
+from graphite.logger import log
+from graphite.util import is_local_interface, is_pattern
+from graphite.remote_storage import RemoteStore
+from graphite.node import LeafNode
+from graphite.intervals import Interval, IntervalSet
+from graphite.readers import MultiReader
+from graphite.finders import CeresFinder, StandardFinder
class Store:
- def __init__(self, directories=[], remote_hosts=[]):
- self.directories = directories
- self.remote_hosts = remote_hosts
- self.remote_stores = [ RemoteStore(host) for host in remote_hosts if not is_local_interface(host) ]
-
- if not (directories or remote_hosts):
- raise valueError("directories and remote_hosts cannot both be empty")
-
-
- def get(self, metric_path): #Deprecated
- for directory in self.directories:
- relative_fs_path = metric_path.replace('.', '/') + '.wsp'
- absolute_fs_path = join(directory, relative_fs_path)
-
- if exists(absolute_fs_path):
- return WhisperFile(absolute_fs_path, metric_path)
-
+ def __init__(self, finders, hosts=[]):
+ self.finders = finders
+ remote_hosts = [host for host in hosts if not is_local_interface(host)]
+ self.remote_stores = [ RemoteStore(host) for host in remote_hosts ]
- def find(self, query):
- if is_pattern(query):
- for match in self.find_all(query):
- yield match
+ def find(self, pattern, startTime=None, endTime=None, local=False):
+ query = FindQuery(pattern, startTime, endTime)
- else:
- match = self.find_first(query)
-
- if match is not None:
- yield match
-
-
- def find_first(self, query):
- # Search locally first
- for directory in self.directories:
- for match in find(directory, query):
- return match
-
- # If nothing found earch remotely
- remote_requests = [ r.find(query) for r in self.remote_stores if r.available ]
-
- for request in remote_requests:
- for match in request.get_results():
- return match
-
-
- def find_all(self, query):
# Start remote searches
- found = set()
- remote_requests = [ r.find(query) for r in self.remote_stores if r.available ]
+ if not local:
+ remote_requests = [ r.find(query) for r in self.remote_stores if r.available ]
+
+ matching_nodes = set()
# Search locally
- for directory in self.directories:
- for match in find(directory, query):
- if match.metric_path not in found:
- yield match
- found.add(match.metric_path)
+ for finder in self.finders:
+ for node in finder.find_nodes(query):
+ #log.info("find() :: local :: %s" % node)
+ matching_nodes.add(node)
# Gather remote search results
- for request in remote_requests:
- for match in request.get_results():
-
- if match.metric_path not in found:
- yield match
- found.add(match.metric_path)
-
-
-def is_local_interface(host):
- if ':' in host:
- host = host.split(':',1)[0]
-
- for port in xrange(1025, 65535):
- try:
- sock = socket.socket()
- sock.bind( (host,port) )
- sock.close()
-
- except socket.error, e:
- if e.args[0] == errno.EADDRNOTAVAIL:
- return False
- else:
+ if not local:
+ for request in remote_requests:
+ for node in request.get_results():
+ #log.info("find() :: remote :: %s from %s" % (node,request.store.host))
+ matching_nodes.add(node)
+
+ # Group matching nodes by their path
+ nodes_by_path = {}
+ for node in matching_nodes:
+ if node.path not in nodes_by_path:
+ nodes_by_path[node.path] = []
+
+ nodes_by_path[node.path].append(node)
+
+ # Reduce matching nodes for each path to a minimal set
+ found_branch_nodes = set()
+
+ for path, nodes in nodes_by_path.iteritems():
+ leaf_nodes = []
+
+ # First we dispense with the BranchNodes
+ for node in nodes:
+ if node.is_leaf:
+ leaf_nodes.append(node)
+ elif node.path not in found_branch_nodes: #TODO need to filter branch nodes based on requested interval... how?!?!?
+ yield node
+ found_branch_nodes.add(node.path)
+
+ if not leaf_nodes:
continue
+ # Calculate best minimal node set
+ minimal_node_set = set()
+ covered_intervals = IntervalSet([])
+
+ # If the query doesn't fall entirely within the FIND_TOLERANCE window
+ # we disregard the window. This prevents unnecessary remote fetches
+ # caused when carbon's cache skews node.intervals, giving the appearance
+ # remote systems have data we don't have locally, which we probably do.
+ now = int( time.time() )
+ tolerance_window = now - settings.FIND_TOLERANCE
+ disregard_tolerance_window = query.interval.start < tolerance_window
+ prior_to_window = Interval( float('-inf'), tolerance_window )
+
+ def measure_of_added_coverage(node, drop_window=disregard_tolerance_window):
+ relevant_intervals = node.intervals.intersect_interval(query.interval)
+ if drop_window:
+ relevant_intervals = relevant_intervals.intersect_interval(prior_to_window)
+ return covered_intervals.union(relevant_intervals).size - covered_intervals.size
+
+ nodes_remaining = list(leaf_nodes)
+
+ # Prefer local nodes first (and do *not* drop the tolerance window)
+ for node in leaf_nodes:
+ if node.local and measure_of_added_coverage(node, False) > 0:
+ nodes_remaining.remove(node)
+ minimal_node_set.add(node)
+ covered_intervals = covered_intervals.union(node.intervals)
+
+ while nodes_remaining:
+ node_coverages = [ (measure_of_added_coverage(n), n) for n in nodes_remaining ]
+ best_coverage, best_node = max(node_coverages)
+
+ if best_coverage == 0:
+ break
+
+ nodes_remaining.remove(best_node)
+ minimal_node_set.add(best_node)
+ covered_intervals = covered_intervals.union(best_node.intervals)
+
+ # Sometimes the requested interval falls within the caching window.
+ # We include the most likely node if the gap is within tolerance.
+ if not minimal_node_set:
+ def distance_to_requested_interval(node):
+ latest = sorted(node.intervals, key=lambda i: i.end)[-1]
+ distance = query.interval.start - latest.end
+ return distance if distance >= 0 else float('inf')
+
+ best_candidate = min(leaf_nodes, key=distance_to_requested_interval)
+ if distance_to_requested_interval(best_candidate) <= settings.FIND_TOLERANCE:
+ minimal_node_set.add(best_candidate)
+
+ if len(minimal_node_set) == 1:
+ yield minimal_node_set.pop()
+ elif len(minimal_node_set) > 1:
+ reader = MultiReader(minimal_node_set)
@dkulikovsky
dkulikovsky added a note Jul 15, 2014

I'm trying to get MultiReader work for my custom storage and i failed.
I've added some debug, but i can't catch any invocations of this fuction. Even with standart CeresReader no MultiReader invocations happens.
Can you explain how can i use MultiReader?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ yield LeafNode(path, reader)
+
+
+
+class FindQuery:
+ def __init__(self, pattern, startTime, endTime):
+ self.pattern = pattern
+ self.startTime = startTime
+ self.endTime = endTime
+ self.isExact = is_pattern(pattern)
+ self.interval = Interval(float('-inf') if startTime is None else startTime,
+ float('inf') if endTime is None else endTime)
+
+
+ def __repr__(self):
+ if self.startTime is None:
+ startString = '*'
else:
- return True
-
- raise Exception("Failed all attempts at binding to interface %s, last exception was %s" % (host, e))
-
-
-def is_pattern(s):
- return '*' in s or '?' in s or '[' in s or '{' in s
-
-def is_escaped_pattern(s):
- for symbol in '*?[{':
- i = s.find(symbol)
- if i > 0:
- if s[i-1] == '\\':
- return True
- return False
-
-def find_escaped_pattern_fields(pattern_string):
- pattern_parts = pattern_string.split('.')
- for index,part in enumerate(pattern_parts):
- if is_escaped_pattern(part):
- yield index
-
+ startString = time.ctime(self.startTime)
-def find(root_dir, pattern):
- "Generates nodes beneath root_dir matching the given pattern"
- clean_pattern = pattern.replace('\\', '