From 1026ef0479425cf6726a7d99caa94321220444bc Mon Sep 17 00:00:00 2001 From: Piotr Date: Thu, 16 Apr 2020 21:48:10 +0200 Subject: [PATCH 1/5] Create new metrics earlier In large Graphite installations the queue's can get really long. It can take an hour for Graphite to write all metrics in queue. New db files are created when the metric is written, which can take too long. This separates the creation of metrics from writing data to them and moves the creation to an earlier moment. Whenever a new metric is received it's name is pushed to a new_metric list. The first step in the writer loop is to check if there are new metrics received and creates them if they don't exist on disk yet. After the creation the writer continues as usual with writing metrics from the queue but it does not check if the file already exists, to prevent that the check occurs twice and has impact on IO. If the file does not exists at thie point it is logged. Fixes: https://github.com/graphite-project/graphite-web/issues/629 --- lib/carbon/cache.py | 10 ++++++ lib/carbon/writer.py | 79 ++++++++++++++++++++++++-------------------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e2..51a7fa911 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -190,6 +190,7 @@ def __init__(self, strategy=None): self.lock = threading.Lock() self.size = 0 self.strategy = None + self.new_metrics = [] if strategy: self.strategy = strategy(self) super(_MetricCache, self).__init__(dict) @@ -236,6 +237,10 @@ def get_datapoints(self, metric): """Return a list of currently cached datapoints sorted by timestamp""" return sorted(self.get(metric, {}).items(), key=by_timestamp) + def pop_new_metric(self): + # return first seen metric + return self.new_metrics.pop(0) + def pop(self, metric): with self.lock: datapoint_index = defaultdict.pop(self, metric) @@ -247,6 +252,11 @@ def pop(self, metric): def store(self, metric, datapoint): timestamp, value = datapoint with self.lock: + # Metric not in cache yet, push to new_metrics list so it + # can be checked if the db already exists + if metric not in self: + self.new_metrics.append(metric) + if timestamp not in self[metric]: # Not a duplicate, hence process if cache is not full if self.is_full: diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 7b63cba0a..951837762 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -90,64 +90,67 @@ def getbatch(self, maxsize=1): tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL) -def writeCachedDataPoints(): - "Write datapoints until the MetricCache is completely empty" - - cache = MetricCache() - while cache: - (metric, datapoints) = cache.drain_metric() - if metric is None: - # end the loop - break - - dbFileExists = state.database.exists(metric) - - if not dbFileExists: - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): +def create_database(metric): + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): # If our tokenbucket doesn't have enough tokens available to create a new metric # file then we'll just drop the metric on the ground and move on to the next # metric. - # XXX This behavior should probably be configurable to no tdrop metrics - # when rate limitng unless our cache is too big or some other legit + # XXX This behavior should probably be configurable to not drop metrics + # when rate limiting unless our cache is too big or some other legit # reason. instrumentation.increment('droppedCreates') - continue + return - archiveConfig = None - xFilesFactor, aggregationMethod = None, None + archiveConfig = None + xFilesFactor, aggregationMethod = None, None - for schema in SCHEMAS: + for schema in SCHEMAS: if schema.matches(metric): - if settings.LOG_CREATES: - log.creates('new metric %s matched schema %s' % (metric, schema.name)) - archiveConfig = [archive.getTuple() for archive in schema.archives] - break + if settings.LOG_CREATES: + log.creates('new metric %s matched schema %s' % (metric, schema.name)) + archiveConfig = [archive.getTuple() for archive in schema.archives] + break - for schema in AGGREGATION_SCHEMAS: + for schema in AGGREGATION_SCHEMAS: if schema.matches(metric): - if settings.LOG_CREATES: - log.creates('new metric %s matched aggregation schema %s' - % (metric, schema.name)) - xFilesFactor, aggregationMethod = schema.archives - break + if settings.LOG_CREATES: + log.creates('new metric %s matched aggregation schema %s' + % (metric, schema.name)) + xFilesFactor, aggregationMethod = schema.archives + break - if not archiveConfig: + if not archiveConfig: raise Exception(("No storage schema matched the metric '%s'," " check your storage-schemas.conf file.") % metric) - if settings.LOG_CREATES: + if settings.LOG_CREATES: log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" % (metric, archiveConfig, xFilesFactor, aggregationMethod)) - try: + + try: state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) if settings.ENABLE_TAGS: - tagQueue.add(metric) + tagQueue.add(metric) instrumentation.increment('creates') - except Exception as e: + except Exception as e: log.err() log.msg("Error creating %s: %s" % (metric, e)) instrumentation.increment('errors') - continue + + +def writeCachedDataPoints(): + "Write datapoints until the MetricCache is completely empty" + + cache = MetricCache() + while cache: + new_metric = cache.pop_new_metric() + if not state.database.exists(new_metric): + create_database(new_metric) + + (metric, datapoints) = cache.drain_metric() + if metric is None: + # end the loop + break # If we've got a rate limit configured lets makes sure we enforce it waitTime = 0 @@ -166,6 +169,10 @@ def writeCachedDataPoints(): if settings.ENABLE_TAGS: tagQueue.update(metric) updateTime = time.time() - t1 + except FileNotFoundError: # don't log full stack trace when the db does not exist. + log.msg("Error writing %s: File does not exist (yet). " % metric + + "Increase MAX_CREATES_PER_MINUTE") + instrumentation.increment('errors') except Exception as e: log.err() log.msg("Error writing to %s: %s" % (metric, e)) From 1adf10ccbc7ddadf6ee494af1475eb2ff80306dc Mon Sep 17 00:00:00 2001 From: Piotr Date: Fri, 17 Apr 2020 08:55:13 +0200 Subject: [PATCH 2/5] fix py2 FileNotFoundError handle indexexception log exceptions in writer --- lib/carbon/cache.py | 7 +++++-- lib/carbon/writer.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 51a7fa911..7bb932e5f 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -238,8 +238,11 @@ def get_datapoints(self, metric): return sorted(self.get(metric, {}).items(), key=by_timestamp) def pop_new_metric(self): - # return first seen metric - return self.new_metrics.pop(0) + # return first seen new metric or None if empty + try: + return self.new_metrics.pop(0) + except IndexError: + return None def pop(self, metric): with self.lock: diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 951837762..6c25fa11a 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -31,6 +31,11 @@ except ImportError: log.msg("Couldn't import signal module") +# Python 2 backwards compatibility +try: + FileNotFoundError +except NameError: + FileNotFoundError = IOError SCHEMAS = loadStorageSchemas() AGGREGATION_SCHEMAS = loadAggregationSchemas() @@ -144,7 +149,7 @@ def writeCachedDataPoints(): cache = MetricCache() while cache: new_metric = cache.pop_new_metric() - if not state.database.exists(new_metric): + if new_metric and not state.database.exists(new_metric): create_database(new_metric) (metric, datapoints) = cache.drain_metric() @@ -194,8 +199,8 @@ def writeForever(): while reactor.running: try: writeCachedDataPoints() - except Exception: - log.err() + except Exception as e: + log.err(e) # Back-off on error to give the backend time to recover. time.sleep(0.1) else: From 9de42464a74bde281737aac5cd23b4751949ec1a Mon Sep 17 00:00:00 2001 From: Piotr Date: Wed, 22 Apr 2020 09:09:04 +0200 Subject: [PATCH 3/5] test pypy3 --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index f03581dd9..282e9dc98 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py{27,35,36,37,38,py}{,-pyhash}, + py{27,35,36,37,38,py,py3}{,-pyhash}, lint, benchmark From b9dbd1b461d8e0e09bde847e7f24d7a31e01dd16 Mon Sep 17 00:00:00 2001 From: Piotr Date: Wed, 22 Apr 2020 09:09:39 +0200 Subject: [PATCH 4/5] loop through all new_metrics before moving on to drain_metric() --- lib/carbon/cache.py | 12 ++++++++---- lib/carbon/exceptions.py | 4 ++++ lib/carbon/writer.py | 30 +++++++++++++++++------------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 7bb932e5f..568760849 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -237,12 +237,16 @@ def get_datapoints(self, metric): """Return a list of currently cached datapoints sorted by timestamp""" return sorted(self.get(metric, {}).items(), key=by_timestamp) + def get_new_metrics(self): + yield self.new_metrics.pop() + def pop_new_metric(self): # return first seen new metric or None if empty - try: - return self.new_metrics.pop(0) - except IndexError: - return None + #try: + # return self.new_metrics.pop(0) + #except IndexError: + # return None + return next(self.get_new_metrics()) def pop(self, metric): with self.lock: diff --git a/lib/carbon/exceptions.py b/lib/carbon/exceptions.py index 4e184277e..768f751ca 100644 --- a/lib/carbon/exceptions.py +++ b/lib/carbon/exceptions.py @@ -1,2 +1,6 @@ class CarbonConfigException(Exception): """Raised when a carbon daemon is improperly configured""" + +class CarbonCreatesLimiterException(Exception): + """Raised when limitor is hit""" + pass diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 6c25fa11a..9d49cda1a 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -21,6 +21,7 @@ from carbon.conf import settings from carbon import log, instrumentation from carbon.util import TokenBucket +from carbon.exceptions import CarbonCreatesLimiterException from twisted.internet import reactor from twisted.internet.task import LoopingCall @@ -96,15 +97,6 @@ def getbatch(self, maxsize=1): def create_database(metric): - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): - # If our tokenbucket doesn't have enough tokens available to create a new metric - # file then we'll just drop the metric on the ground and move on to the next - # metric. - # XXX This behavior should probably be configurable to not drop metrics - # when rate limiting unless our cache is too big or some other legit - # reason. - instrumentation.increment('droppedCreates') - return archiveConfig = None xFilesFactor, aggregationMethod = None, None @@ -144,13 +136,25 @@ def create_database(metric): def writeCachedDataPoints(): - "Write datapoints until the MetricCache is completely empty" + """Write datapoints until the MetricCache is completely empty""" cache = MetricCache() while cache: - new_metric = cache.pop_new_metric() - if new_metric and not state.database.exists(new_metric): - create_database(new_metric) + + # First check if there are new metrics + for new_metric in cache.new_metrics: + if not state.database.exists(new_metric): + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): + # If our tokenbucket doesn't have enough tokens available to create a new metric + # file then we'll just drop the metric on the ground and move on to the next + # metric. + # XXX This behavior should probably be configurable to not drop metrics + # when rate limiting unless our cache is too big or some other legit + # reason. + instrumentation.increment('droppedCreates') + break + + create_database(new_metric) (metric, datapoints) = cache.drain_metric() if metric is None: From 896b0a6b4788a92fac54d0298246c2207a5ba869 Mon Sep 17 00:00:00 2001 From: Piotr Date: Mon, 27 Apr 2020 19:27:13 +0200 Subject: [PATCH 5/5] clean up --- lib/carbon/cache.py | 11 ----------- lib/carbon/exceptions.py | 4 ---- lib/carbon/writer.py | 1 - 3 files changed, 16 deletions(-) diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index 568760849..b97a02048 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -237,17 +237,6 @@ def get_datapoints(self, metric): """Return a list of currently cached datapoints sorted by timestamp""" return sorted(self.get(metric, {}).items(), key=by_timestamp) - def get_new_metrics(self): - yield self.new_metrics.pop() - - def pop_new_metric(self): - # return first seen new metric or None if empty - #try: - # return self.new_metrics.pop(0) - #except IndexError: - # return None - return next(self.get_new_metrics()) - def pop(self, metric): with self.lock: datapoint_index = defaultdict.pop(self, metric) diff --git a/lib/carbon/exceptions.py b/lib/carbon/exceptions.py index 768f751ca..4e184277e 100644 --- a/lib/carbon/exceptions.py +++ b/lib/carbon/exceptions.py @@ -1,6 +1,2 @@ class CarbonConfigException(Exception): """Raised when a carbon daemon is improperly configured""" - -class CarbonCreatesLimiterException(Exception): - """Raised when limitor is hit""" - pass diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 9d49cda1a..4f354d2dd 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -21,7 +21,6 @@ from carbon.conf import settings from carbon import log, instrumentation from carbon.util import TokenBucket -from carbon.exceptions import CarbonCreatesLimiterException from twisted.internet import reactor from twisted.internet.task import LoopingCall