Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: graphite-project/carbon
base: master
...
head fork: slackhappy/carbon
compare: foursquare_0.9.10
Checking mergeability… Don’t worry, you can still create the pull request.
  • 16 commits
  • 13 files changed
  • 0 commit comments
  • 2 contributors
Commits on Jul 25, 2012
@slackhappy slackhappy add fallocate support (optional) to carbon 7cc6de4
Commits on Jul 26, 2012
@slackhappy slackhappy add aggregated-consistent-hashing relay rule dc5014e
@slackhappy slackhappy add an option to suppress unaggregated metrics
adds AGGREGATION_SUPPRESS_ORIGINAL (default False) to
prevent original unaggregated metric from passing
through the aggregator.  The alternative is to block the
unaggregated metric downstream.

Setting to True reverts to the pre 0.9.9 behavior which was
altered by change
http://bazaar.launchpad.net/~graphite-dev/graphite/0.9/revision/537

Related to question
https://answers.launchpad.net/graphite/+question/199769
f6fd84a
@slackhappy slackhappy improvement to passthrough
allows any unmatched metrics to go through, regardless of
setting
b2f43dc
Commits on Aug 02, 2012
@slackhappy slackhappy aggregated-consistent-hashing unmatched passthru 303496a
Commits on Aug 09, 2012
@slackhappy slackhappy add SIGQUIT handlers to the carbons 4aeafde
Commits on Aug 11, 2012
@slackhappy slackhappy improve CH routing performance when replfactor=1 98b5f13
Commits on Aug 12, 2012
@slackhappy slackhappy alternative caching scheme for aggregator
this uses less memory
33eb497
@slackhappy slackhappy use alternative hashing scheme in ACH router af5fcf8
Commits on Sep 18, 2012
@slackhappy slackhappy Add support for an aggregation rules LRU cache
A new option, AGGREGATION_RULES_CACHE_SIZE can be
specified (default infinity which turns of LRU)
which limits the aggregation rule resolving cache,
which stores the results rule regex as applied to
an incoming metric.

Set to an integral value to limit the size of the cache.
Uses ordereddict, which is provided with Python 2.7 and
installable for older versions.  If the import fails,
the non-LRU cache is used.
48fbc12
Commits on Sep 20, 2012
@slackhappy slackhappy max 1 destination for aggregated-CH 01dbbbd
Commits on Sep 04, 2013
@slackhappy slackhappy update util for pypy f254e7c
Commits on Mar 03, 2014
@jeffjenkins jeffjenkins Add hash type parameter with md5 and crc32 as values
crc32 is MUCH faster than md5 and seems to be even enough
in distribution.
1f29f08
@slackhappy slackhappy make HASH_TYPE configurable in carbon.conf 4ce986b
Commits on Mar 04, 2014
@slackhappy slackhappy add support for non-stable hash() HASH_TYPE 92ee7d6
@slackhappy slackhappy try not truncating hash f247530
View
17 bin/carbon-aggregator.py
@@ -27,4 +27,21 @@
from carbon.util import run_twistd_plugin
+import threading
+import traceback
+def dumpstacks(signal, frame):
+ id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
+ code = []
+ for threadId, stack in sys._current_frames().items():
+ code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ code.append(" %s" % (line.strip()))
+ print "\n".join(code)
+
+import signal
+signal.signal(signal.SIGQUIT, dumpstacks)
+
+
run_twistd_plugin(__file__)
View
17 bin/carbon-cache.py
@@ -27,4 +27,21 @@
from carbon.util import run_twistd_plugin
+import threading
+import traceback
+def dumpstacks(signal, frame):
+ id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
+ code = []
+ for threadId, stack in sys._current_frames().items():
+ code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ code.append(" %s" % (line.strip()))
+ print "\n".join(code)
+
+import signal
+signal.signal(signal.SIGQUIT, dumpstacks)
+
+
run_twistd_plugin(__file__)
View
3  bin/carbon-client.py
@@ -44,6 +44,7 @@
option_parser = OptionParser(usage="%prog [options] <host:port:instance> <host:port:instance> ...")
option_parser.add_option('--debug', action='store_true', help="Log debug info to stdout")
+option_parser.add_option('--hashtype', default='md5', help="Type of hash")
option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/module.py:myFunc)")
option_parser.add_option('--replication', type='int', default=1, help='Replication factor')
option_parser.add_option('--routing', default='consistent-hashing',
@@ -81,7 +82,7 @@
defer.setDebugging(True)
if options.routing == 'consistent-hashing':
- router = ConsistentHashingRouter(options.replication)
+ router = ConsistentHashingRouter(options.replication, hash_type=options.hashtype)
elif options.routing == 'relay':
if exists(options.relayrules):
router = RelayRulesRouter(options.relayrules)
View
17 bin/carbon-relay.py
@@ -27,4 +27,21 @@
from carbon.util import run_twistd_plugin
+import threading
+import traceback
+def dumpstacks(signal, frame):
+ id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
+ code = []
+ for threadId, stack in sys._current_frames().items():
+ code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
+ if line:
+ code.append(" %s" % (line.strip()))
+ print "\n".join(code)
+
+import signal
+signal.signal(signal.SIGQUIT, dumpstacks)
+
+
run_twistd_plugin(__file__)
View
21 conf/carbon.conf.example
@@ -100,6 +100,15 @@ WHISPER_AUTOFLUSH = False
# depending on the underlying storage configuration.
# WHISPER_SPARSE_CREATE = False
+# By default new Whisper files are created pre-allocated with the data region
+# filled with zeros to prevent fragmentation and speed up contiguous reads and
+# writes (which are common). Enabling this option will cause Whisper to create
+# fallocate the file instead. Only beneficial on a filesystem that
+# supports fallocate. It maintains the benefits of contiguous reads/writes.
+# but with a much faster creation speed. Enabling this option may
+# allow a large increase of MAX_CREATES_PER_MINUTE.
+# WHISPER_FALLOCATE_CREATE = False
+#
# Enabling this option will cause Whisper to lock each Whisper file it writes
# to with an exclusive lock (LOCK_EX, see: man 2 flock). This is useful when
# multiple carbon-cache daemons are writing to the same files
@@ -175,6 +184,11 @@ PICKLE_RECEIVER_PORT = 2014
# RELAY_METHOD = consistent-hashing
RELAY_METHOD = rules
+# If using consistent-hashing, you can choose your hashring function
+# md5 is the default function. crc32 uses less CPU, but will have
+# choose different destinations for a given metric
+# HASH_TYPE = md5
+
# If you use consistent-hashing you may want to add redundancy
# of your data by replicating every datapoint to more than
# one machine.
@@ -234,7 +248,7 @@ PICKLE_RECEIVER_PORT = 2024
# use multiple carbon-cache instances then it would look like this:
#
# DESTINATIONS = 127.0.0.1:2004:a, 127.0.0.1:2104:b
-#
+#
# The format is comma-delimited IP:PORT:INSTANCE where the :INSTANCE part is
# optional and refers to the "None" instance if omitted.
#
@@ -268,6 +282,11 @@ MAX_DATAPOINTS_PER_MESSAGE = 500
# the past MAX_AGGREGATION_INTERVALS * intervalSize seconds.
MAX_AGGREGATION_INTERVALS = 5
+# Set this to True to prevent unaggregated metrics from passing through
+# the aggregator. The default is to allow the original metric to pass
+# through, after modification by rewrite rules, if there are any.
+# AGGREGATION_SUPPRESS_ORIGINAL = False
+
# Set this to True to enable whitelisting and blacklisting of metrics in
# CONF_DIR/whitelist and CONF_DIR/blacklist. If the whitelist is missing or
# empty, all metrics will pass through
View
21 lib/carbon/aggregator/receiver.py
@@ -1,10 +1,10 @@
from carbon.instrumentation import increment
from carbon.aggregator.rules import RuleManager
from carbon.aggregator.buffers import BufferManager
+from carbon.conf import settings
from carbon.rewrite import RewriteRuleManager
from carbon import events
-
def process(metric, datapoint):
increment('datapointsReceived')
@@ -13,14 +13,8 @@ def process(metric, datapoint):
aggregate_metrics = []
- for rule in RuleManager.rules:
- aggregate_metric = rule.get_aggregate_metric(metric)
-
- if aggregate_metric is None:
- continue
- else:
- aggregate_metrics.append(aggregate_metric)
-
+ for (rule, aggregate_metric) in RuleManager.get_aggregate_metrics(metric):
+ aggregate_metrics.append(aggregate_metric)
buffer = BufferManager.get_buffer(aggregate_metric)
if not buffer.configured:
@@ -28,8 +22,9 @@ def process(metric, datapoint):
buffer.input(datapoint)
- for rule in RewriteRuleManager.postRules:
- metric = rule.apply(metric)
+ if len(aggregate_metrics) == 0 or not settings['AGGREGATION_SUPPRESS_ORIGINAL']:
+ for rule in RewriteRuleManager.postRules:
+ metric = rule.apply(metric)
- if metric not in aggregate_metrics:
- events.metricGenerated(metric, datapoint)
+ if metric not in aggregate_metrics:
+ events.metricGenerated(metric, datapoint)
View
53 lib/carbon/aggregator/rules.py
@@ -3,17 +3,34 @@
from os.path import exists, getmtime
from twisted.internet.task import LoopingCall
from carbon import log
+from carbon.conf import settings
from carbon.aggregator.buffers import BufferManager
+if settings.AGGREGATION_RULES_CACHE_SIZE != float('inf'):
+ try:
+ import ordereddict
+ USE_LRU_RULES_CACHE = True
+ except ImportError:
+ log.err("Failed to import ordereddict (needed to limit AGGREGATION_RULES_CACHE_SIZE)")
+ USE_LRU_RULES_CACHE = False
+else:
+ USE_LRU_RULES_CACHE = False
+
+
class RuleManager:
def __init__(self):
- self.rules = []
self.rules_file = None
self.read_task = LoopingCall(self.read_rules)
self.rules_last_read = 0.0
+ self.clear()
def clear(self):
+ if USE_LRU_RULES_CACHE:
+ self.cache = ordereddict.OrderedDict()
+ self.cache_max_size = int(settings.AGGREGATION_RULES_CACHE_SIZE)
+ else:
+ self.cache = {}
self.rules = []
def read_from(self, rules_file):
@@ -63,6 +80,35 @@ def parse_definition(self, line):
log.err("Failed to parse line: %s" % line)
raise
+ # return a list of (rule, result) tuples for a metric
+ def get_aggregate_metrics(self, metric_path):
+ if not self.rules:
+ return ()
+ try:
+ if USE_LRU_RULES_CACHE:
+ results = self.cache.pop(metric_path)
+ self.cache[metric_path] = results
+ return results
+ else:
+ return self.cache[metric_path]
+ except KeyError:
+ results = []
+ for rule in self.rules:
+ result = rule.get_aggregate_metric(metric_path)
+ if result != None:
+ results.append((rule, result))
+ if results:
+ self.cache[metric_path] = results
+ else:
+ self.cache[metric_path] = () # use immutable tuple singleton
+ if USE_LRU_RULES_CACHE:
+ while len(self.cache) > self.cache_max_size:
+ try:
+ self.cache.popitem(False)
+ except KeyError:
+ pass
+
+ return results
class AggregationRule:
def __init__(self, input_pattern, output_pattern, method, frequency):
@@ -77,12 +123,8 @@ def __init__(self, input_pattern, output_pattern, method, frequency):
self.aggregation_func = AGGREGATION_METHODS[method]
self.build_regex()
self.build_template()
- self.cache = {}
def get_aggregate_metric(self, metric_path):
- if metric_path in self.cache:
- return self.cache[metric_path]
-
match = self.regex.match(metric_path)
result = None
@@ -93,7 +135,6 @@ def get_aggregate_metric(self, metric_path):
except:
log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields))
- self.cache[metric_path] = result
return result
def build_regex(self):
View
24 lib/carbon/conf.py
@@ -44,9 +44,12 @@
LOG_UPDATES=True,
WHISPER_AUTOFLUSH=False,
WHISPER_SPARSE_CREATE=False,
+ WHISPER_FALLOCATE_CREATE=False,
WHISPER_LOCK_WRITES=False,
MAX_DATAPOINTS_PER_MESSAGE=500,
MAX_AGGREGATION_INTERVALS=5,
+ AGGREGATION_SUPPRESS_ORIGINAL=False,
+ AGGREGATION_RULES_CACHE_SIZE=float('inf'),
MAX_QUEUE_SIZE=1000,
ENABLE_AMQP=False,
AMQP_VERBOSE=False,
@@ -57,6 +60,7 @@
MANHOLE_USER="",
MANHOLE_PUBLIC_KEY="",
RELAY_METHOD='rules',
+ HASH_TYPE='md5',
REPLICATION_FACTOR=1,
DESTINATIONS=[],
USE_FLOW_CONTROL=True,
@@ -208,6 +212,12 @@ def postOptions(self):
log.msg("Enabling Whisper autoflush")
whisper.AUTOFLUSH = True
+ if settings.WHISPER_FALLOCATE_CREATE:
+ if whisper.CAN_FALLOCATE:
+ log.msg("Enabling Whisper fallocate support")
+ else:
+ log.err("WHISPER_FALLOCATE_CREATE is enabled but linking failed.")
+
if settings.WHISPER_LOCK_WRITES:
if whisper.CAN_LOCK:
log.msg("Enabling Whisper file locking")
@@ -351,6 +361,7 @@ class CarbonRelayOptions(CarbonCacheOptions):
optParameters = [
["rules", "", None, "Use the given relay rules file."],
+ ["aggregation-rules", "", None, "Use the given aggregation rules file."],
] + CarbonCacheOptions.optParameters
def postOptions(self):
@@ -359,9 +370,18 @@ def postOptions(self):
self["rules"] = join(settings["CONF_DIR"], "relay-rules.conf")
settings["relay-rules"] = self["rules"]
- if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing"):
+ if settings["HASH_TYPE"] not in ("md5", "crc32", "crc16", "hash32", "hash16"):
+ print ("In carbon.conf, HASH_TYPE must be either 'md5' 'crc32'. Invalid value: '%s'" %
+ settings.HASH_TYPE)
+ sys.exit(1)
+
+ if self["aggregation-rules"] is None:
+ self["aggregation-rules"] = join(settings["CONF_DIR"], "aggregation-rules.conf")
+ settings["aggregation-rules"] = self["aggregation-rules"]
+
+ if settings["RELAY_METHOD"] not in ("rules", "consistent-hashing", "aggregated-consistent-hashing"):
print ("In carbon.conf, RELAY_METHOD must be either 'rules' or "
- "'consistent-hashing'. Invalid value: '%s'" %
+ "'consistent-hashing' or 'aggregated-consistent-hashing'. Invalid value: '%s'" %
settings.RELAY_METHOD)
sys.exit(1)
View
49 lib/carbon/hashing.py
@@ -3,21 +3,42 @@
except ImportError:
from md5 import md5
import bisect
-from carbon.conf import settings
+from binascii import crc32
+def md5_key(key):
+ big_hash = md5( str(key) ).hexdigest()
+ return int(big_hash[:4], 16)
class ConsistentHashRing:
- def __init__(self, nodes, replica_count=100):
+ def __init__(self, nodes, replica_count=100, hash_type=None):
self.ring = []
self.nodes = set()
self.replica_count = replica_count
+ self.hash_type = hash_type
+
for node in nodes:
self.add_node(node)
+ # Precompute hash function
+ if self.hash_type in (None, 'md5'):
+ self.hash_function = md5_key
+ elif self.hash_type == 'crc16':
+ # The md5 is dropped into the range of 0xffff. Let's
+ # make sure crc is too
+ self.hash_function = lambda key : (crc32(key) & 0xffff)
+ elif self.hash_type == 'hash16':
+ # The md5 is dropped into the range of 0xffff. Let's
+ # make sure hash is too
+ self.hash_function = lambda key : (hash(key) & 0xffff)
+ elif self.hash_type == 'crc32':
+ self.hash_function = lambda key : (crc32(key) & 0xffffffff)
+ elif self.hash_type == 'hash32':
+ self.hash_function = lambda key : (hash(key) & 0xffffffff)
+ else:
+ raise Exception('Unsupported hash type: %s' % self.hash_type)
+
def compute_ring_position(self, key):
- big_hash = md5( str(key) ).hexdigest()
- small_hash = int(big_hash[:4], 16)
- return small_hash
+ return self.hash_function(key)
def add_node(self, node):
self.nodes.add(node)
@@ -33,14 +54,15 @@ def remove_node(self, node):
def get_node(self, key):
assert self.ring
- position = self.compute_ring_position(key)
- search_entry = (position, None)
- index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
- entry = self.ring[index]
- return entry[1]
+ node = None
+ node_iter = self.get_nodes(key)
+ node = node_iter.next()
+ node_iter.close()
+ return node
def get_nodes(self, key):
- nodes = []
+ assert self.ring
+ nodes = set()
position = self.compute_ring_position(key)
search_entry = (position, None)
index = bisect.bisect_left(self.ring, search_entry) % len(self.ring)
@@ -49,8 +71,7 @@ def get_nodes(self, key):
next_entry = self.ring[index]
(position, next_node) = next_entry
if next_node not in nodes:
- nodes.append(next_node)
+ nodes.add(next_node)
+ yield next_node
index = (index + 1) % len(self.ring)
-
- return nodes
View
52 lib/carbon/routers.py
@@ -41,10 +41,10 @@ def getDestinations(self, key):
class ConsistentHashingRouter(DatapointRouter):
- def __init__(self, replication_factor=1):
+ def __init__(self, replication_factor=1, hash_type=None):
self.replication_factor = int(replication_factor)
self.instance_ports = {} # { (server, instance) : port }
- self.ring = ConsistentHashRing([])
+ self.ring = ConsistentHashRing([], hash_type=hash_type)
def addDestination(self, destination):
(server, port, instance) = destination
@@ -62,17 +62,15 @@ def removeDestination(self, destination):
def getDestinations(self, metric):
key = self.getKey(metric)
-
- used_servers = set()
- for (server, instance) in self.ring.get_nodes(key):
- if server in used_servers:
- continue
- else:
- used_servers.add(server)
+ count = 0
+ node_iter = self.ring.get_nodes(key)
+ while count < self.replication_factor:
+ try:
+ (server, instance) = node_iter.next()
port = self.instance_ports[ (server, instance) ]
+ count += 1
yield (server, port, instance)
-
- if len(used_servers) >= self.replication_factor:
+ except StopIteration:
return
def getKey(self, metric):
@@ -88,3 +86,35 @@ def setKeyFunctionFromModule(self, keyfunc_spec):
module = imp.load_module('keyfunc_module', module_file, module_path, description)
keyfunc = getattr(module, func_name)
self.setKeyFunction(keyfunc)
+
+class AggregatedConsistentHashingRouter(DatapointRouter):
+ def __init__(self, agg_rules_manager, replication_factor=1, hash_type=None):
+ self.hash_router = ConsistentHashingRouter(replication_factor, hash_type=hash_type)
+ self.agg_rules_manager = agg_rules_manager
+
+ def addDestination(self, destination):
+ self.hash_router.addDestination(destination)
+
+ def removeDestination(self, destination):
+ self.hash_router.removeDestination(destination)
+
+ def getDestinations(self, key):
+ # resolve metric to aggregate forms
+ resolved_metrics = []
+ for (rule, aggregate_metric) in self.agg_rules_manager.get_aggregate_metrics(key):
+ resolved_metrics.append(aggregate_metric)
+ break
+
+ # if the metric will not be aggregated, send it raw
+ # (will pass through aggregation)
+ if len(resolved_metrics) == 0:
+ resolved_metrics.append(key)
+
+ # get consistent hashing destinations based on aggregate forms
+ destinations = set()
+ for resolved_metric in resolved_metrics:
+ for destination in self.hash_router.getDestinations(resolved_metric):
+ destinations.add(destination)
+
+ for destination in destinations:
+ yield destination
View
11 lib/carbon/service.py
@@ -148,7 +148,7 @@ def createAggregatorService(config):
root_service = createBaseService(config)
# Configure application components
- router = ConsistentHashingRouter()
+ router = ConsistentHashingRouter(hash_type=settings["hash-type"])
client_manager = CarbonClientManager(router)
client_manager.setServiceParent(root_service)
@@ -169,7 +169,7 @@ def createAggregatorService(config):
def createRelayService(config):
- from carbon.routers import RelayRulesRouter, ConsistentHashingRouter
+ from carbon.routers import RelayRulesRouter, ConsistentHashingRouter, AggregatedConsistentHashingRouter
from carbon.client import CarbonClientManager
from carbon.conf import settings
from carbon import events
@@ -177,10 +177,15 @@ def createRelayService(config):
root_service = createBaseService(config)
# Configure application components
+ hash_type = settings.HASH_TYPE
if settings.RELAY_METHOD == 'rules':
router = RelayRulesRouter(settings["relay-rules"])
elif settings.RELAY_METHOD == 'consistent-hashing':
- router = ConsistentHashingRouter(settings.REPLICATION_FACTOR)
+ router = ConsistentHashingRouter(settings.REPLICATION_FACTOR, hash_type=hash_type)
+ elif settings.RELAY_METHOD == 'aggregated-consistent-hashing':
+ from carbon.aggregator.rules import RuleManager
+ RuleManager.read_from(settings["aggregation-rules"])
+ router = AggregatedConsistentHashingRouter(RuleManager, settings.REPLICATION_FACTOR, hash_type=hash_type)
client_manager = CarbonClientManager(router)
client_manager.setServiceParent(root_service)
View
7 lib/carbon/util.py
@@ -1,6 +1,7 @@
import sys
import os
import pwd
+import __builtin__
from os.path import abspath, basename, dirname, join
try:
@@ -50,8 +51,10 @@ def run_twistd_plugin(filename):
return
# This isn't as evil as you might think
- __builtins__["instance"] = options.instance
- __builtins__["program"] = program
+ __builtin__.instance = options.instance
+ __builtin__.program = program
+ #__builtins__["instance"] = options.instance
+ #__builtins__["program"] = program
# Then forward applicable options to either twistd or to the plugin itself.
twistd_options = ["--no_save"]
View
2  lib/carbon/writer.py
@@ -115,7 +115,7 @@ def writeCachedDataPoints():
log.creates("creating database file %s (archive=%s xff=%s agg=%s)" %
(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod))
- whisper.create(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod, settings.WHISPER_SPARSE_CREATE)
+ whisper.create(dbFilePath, archiveConfig, xFilesFactor, aggregationMethod, settings.WHISPER_SPARSE_CREATE, settings.WHISPER_FALLOCATE_CREATE)
os.chmod(dbFilePath, 0755)
instrumentation.increment('creates')

No commit comments for this range

Something went wrong with that request. Please try again.