diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e2..b97a02048 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -190,6 +190,7 @@ def __init__(self, strategy=None): self.lock = threading.Lock() self.size = 0 self.strategy = None + self.new_metrics = [] if strategy: self.strategy = strategy(self) super(_MetricCache, self).__init__(dict) @@ -247,6 +248,11 @@ def pop(self, metric): def store(self, metric, datapoint): timestamp, value = datapoint with self.lock: + # Metric not in cache yet, push to new_metrics list so it + # can be checked if the db already exists + if metric not in self: + self.new_metrics.append(metric) + if timestamp not in self[metric]: # Not a duplicate, hence process if cache is not full if self.is_full: diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 7b63cba0a..4f354d2dd 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -31,6 +31,11 @@ except ImportError: log.msg("Couldn't import signal module") +# Python 2 backwards compatibility +try: + FileNotFoundError +except NameError: + FileNotFoundError = IOError SCHEMAS = loadStorageSchemas() AGGREGATION_SCHEMAS = loadAggregationSchemas() @@ -90,64 +95,70 @@ def getbatch(self, maxsize=1): tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL) -def writeCachedDataPoints(): - "Write datapoints until the MetricCache is completely empty" - - cache = MetricCache() - while cache: - (metric, datapoints) = cache.drain_metric() - if metric is None: - # end the loop - break - - dbFileExists = state.database.exists(metric) - - if not dbFileExists: - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): - # If our tokenbucket doesn't have enough tokens available to create a new metric - # file then we'll just drop the metric on the ground and move on to the next - # metric. - # XXX This behavior should probably be configurable to no tdrop metrics - # when rate limitng unless our cache is too big or some other legit - # reason. - instrumentation.increment('droppedCreates') - continue +def create_database(metric): - archiveConfig = None - xFilesFactor, aggregationMethod = None, None + archiveConfig = None + xFilesFactor, aggregationMethod = None, None - for schema in SCHEMAS: + for schema in SCHEMAS: if schema.matches(metric): - if settings.LOG_CREATES: - log.creates('new metric %s matched schema %s' % (metric, schema.name)) - archiveConfig = [archive.getTuple() for archive in schema.archives] - break + if settings.LOG_CREATES: + log.creates('new metric %s matched schema %s' % (metric, schema.name)) + archiveConfig = [archive.getTuple() for archive in schema.archives] + break - for schema in AGGREGATION_SCHEMAS: + for schema in AGGREGATION_SCHEMAS: if schema.matches(metric): - if settings.LOG_CREATES: - log.creates('new metric %s matched aggregation schema %s' - % (metric, schema.name)) - xFilesFactor, aggregationMethod = schema.archives - break + if settings.LOG_CREATES: + log.creates('new metric %s matched aggregation schema %s' + % (metric, schema.name)) + xFilesFactor, aggregationMethod = schema.archives + break - if not archiveConfig: + if not archiveConfig: raise Exception(("No storage schema matched the metric '%s'," " check your storage-schemas.conf file.") % metric) - if settings.LOG_CREATES: + if settings.LOG_CREATES: log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" % (metric, archiveConfig, xFilesFactor, aggregationMethod)) - try: + + try: state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod) if settings.ENABLE_TAGS: - tagQueue.add(metric) + tagQueue.add(metric) instrumentation.increment('creates') - except Exception as e: + except Exception as e: log.err() log.msg("Error creating %s: %s" % (metric, e)) instrumentation.increment('errors') - continue + + +def writeCachedDataPoints(): + """Write datapoints until the MetricCache is completely empty""" + + cache = MetricCache() + while cache: + + # First check if there are new metrics + for new_metric in cache.new_metrics: + if not state.database.exists(new_metric): + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): + # If our tokenbucket doesn't have enough tokens available to create a new metric + # file then we'll just drop the metric on the ground and move on to the next + # metric. + # XXX This behavior should probably be configurable to not drop metrics + # when rate limiting unless our cache is too big or some other legit + # reason. + instrumentation.increment('droppedCreates') + break + + create_database(new_metric) + + (metric, datapoints) = cache.drain_metric() + if metric is None: + # end the loop + break # If we've got a rate limit configured lets makes sure we enforce it waitTime = 0 @@ -166,6 +177,10 @@ def writeCachedDataPoints(): if settings.ENABLE_TAGS: tagQueue.update(metric) updateTime = time.time() - t1 + except FileNotFoundError: # don't log full stack trace when the db does not exist. + log.msg("Error writing %s: File does not exist (yet). " % metric + + "Increase MAX_CREATES_PER_MINUTE") + instrumentation.increment('errors') except Exception as e: log.err() log.msg("Error writing to %s: %s" % (metric, e)) @@ -187,8 +202,8 @@ def writeForever(): while reactor.running: try: writeCachedDataPoints() - except Exception: - log.err() + except Exception as e: + log.err(e) # Back-off on error to give the backend time to recover. time.sleep(0.1) else: diff --git a/tox.ini b/tox.ini index f03581dd9..282e9dc98 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] envlist = - py{27,35,36,37,38,py}{,-pyhash}, + py{27,35,36,37,38,py,py3}{,-pyhash}, lint, benchmark