Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new metrics earlier #888

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/carbon/cache.py
Expand Up @@ -190,6 +190,7 @@ def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.strategy = None
self.new_metrics = []
if strategy:
self.strategy = strategy(self)
super(_MetricCache, self).__init__(dict)
Expand Down Expand Up @@ -247,6 +248,11 @@ def pop(self, metric):
def store(self, metric, datapoint):
timestamp, value = datapoint
with self.lock:
# Metric not in cache yet, push to new_metrics list so it
# can be checked if the db already exists
if metric not in self:
self.new_metrics.append(metric)

if timestamp not in self[metric]:
# Not a duplicate, hence process if cache is not full
if self.is_full:
Expand Down
101 changes: 58 additions & 43 deletions lib/carbon/writer.py
Expand Up @@ -31,6 +31,11 @@
except ImportError:
log.msg("Couldn't import signal module")

# Python 2 backwards compatibility
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError

SCHEMAS = loadStorageSchemas()
AGGREGATION_SCHEMAS = loadAggregationSchemas()
Expand Down Expand Up @@ -90,64 +95,70 @@ def getbatch(self, maxsize=1):
tagQueue = TagQueue(maxsize=settings.TAG_QUEUE_SIZE, update_interval=settings.TAG_UPDATE_INTERVAL)


def writeCachedDataPoints():
"Write datapoints until the MetricCache is completely empty"

cache = MetricCache()
while cache:
(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break

dbFileExists = state.database.exists(metric)

if not dbFileExists:
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# If our tokenbucket doesn't have enough tokens available to create a new metric
# file then we'll just drop the metric on the ground and move on to the next
# metric.
# XXX This behavior should probably be configurable to no tdrop metrics
# when rate limitng unless our cache is too big or some other legit
# reason.
instrumentation.increment('droppedCreates')
continue
def create_database(metric):

archiveConfig = None
xFilesFactor, aggregationMethod = None, None
archiveConfig = None
xFilesFactor, aggregationMethod = None, None

for schema in SCHEMAS:
for schema in SCHEMAS:
if schema.matches(metric):
if settings.LOG_CREATES:
log.creates('new metric %s matched schema %s' % (metric, schema.name))
archiveConfig = [archive.getTuple() for archive in schema.archives]
break
if settings.LOG_CREATES:
log.creates('new metric %s matched schema %s' % (metric, schema.name))
archiveConfig = [archive.getTuple() for archive in schema.archives]
break

for schema in AGGREGATION_SCHEMAS:
for schema in AGGREGATION_SCHEMAS:
if schema.matches(metric):
if settings.LOG_CREATES:
log.creates('new metric %s matched aggregation schema %s'
% (metric, schema.name))
xFilesFactor, aggregationMethod = schema.archives
break
if settings.LOG_CREATES:
log.creates('new metric %s matched aggregation schema %s'
% (metric, schema.name))
xFilesFactor, aggregationMethod = schema.archives
break

if not archiveConfig:
if not archiveConfig:
raise Exception(("No storage schema matched the metric '%s',"
" check your storage-schemas.conf file.") % metric)

if settings.LOG_CREATES:
if settings.LOG_CREATES:
log.creates("creating database metric %s (archive=%s xff=%s agg=%s)" %
(metric, archiveConfig, xFilesFactor, aggregationMethod))
try:

try:
state.database.create(metric, archiveConfig, xFilesFactor, aggregationMethod)
if settings.ENABLE_TAGS:
tagQueue.add(metric)
tagQueue.add(metric)
instrumentation.increment('creates')
except Exception as e:
except Exception as e:
log.err()
log.msg("Error creating %s: %s" % (metric, e))
instrumentation.increment('errors')
continue


def writeCachedDataPoints():
"""Write datapoints until the MetricCache is completely empty"""

cache = MetricCache()
while cache:

# First check if there are new metrics
for new_metric in cache.new_metrics:
if not state.database.exists(new_metric):
if CREATE_BUCKET and not CREATE_BUCKET.drain(1):
# If our tokenbucket doesn't have enough tokens available to create a new metric
# file then we'll just drop the metric on the ground and move on to the next
# metric.
# XXX This behavior should probably be configurable to not drop metrics
# when rate limiting unless our cache is too big or some other legit
# reason.
instrumentation.increment('droppedCreates')
break

create_database(new_metric)

(metric, datapoints) = cache.drain_metric()
if metric is None:
# end the loop
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only create new files as long as there are also updates to process, is that what we want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I am following, new files are created first, on every loop, break is later.

Notice that the diff is kinda screwed, the part with the break is old code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imagine if there is more to get from pop_new_metric() than there is to get from drain_metric(): then the loop would do create-new, drain-old, create-new, drain-old, alternating, and stop when drain-old was done, before all new metrics were handled ...

but that seems impossible, there can't be more new metrics than total cached metrics, right ... but I understand how one might be unsure, when looking at this code in isolation.

Here's another question: is it possible for drain_metric() to return a new metric before pop_new_metric()? Imagine there are two new cached metrics and nothing else, A and B. A is received first, but B gets more datapoints. pop_new_metric() returns A, then drain_metric() returns B (then pop_new_metric returns B on the next iteration). The FileNotFound will be hit below, and B datapoints will be lost, I think, needlessly.

I have an alternate design idea: keep the original logic here in this loop, and instead modify drain_metric() to preferentially return new metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that seems impossible, there can't be more new metrics than total cached metrics, right ... but I understand how one might be unsure, when looking at this code in isolation.

Yup, that should be impossible.

Here's another question: is it possible for drain_metric() to return a new metric before pop_new_metric()? Imagine there are two new cached metrics and nothing else, A and B. A is received first, but B gets more datapoints. pop_new_metric() returns A, then drain_metric() returns B (then pop_new_metric returns B on the next iteration). The FileNotFound will be hit below, and B datapoints will be lost, I think, needlessly.

Yes, that is possible when the MAX_CREATES_PER_MINUTE limiter is hit. This is on purpose, currently the metrics get dropped as well, see the old XXX TODO part.
I'm thinking if it wouldn't be better to drain the new_metrics first before continueing with drain_metric().

I have an alternate design idea: keep the original logic here in this loop, and instead modify drain_metric() to preferentially return new metrics.

That is exactly what the naive strategy does. That one is not efficient as it does not write the metric with most datapoints resulting in small writes.

The cache doesn't know if a metric is really new (as not on disk) it can just see that it is not in the cache dict. I didn't want to add IO stuff to the cache thread to check if it is on disk, no particular reason it just felt bad to mix it. Maybe it would be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm you're exactly right ... my idea was equivalent to the existing naive strategy ...

That is an intriguing idea, doing the exists() check just before adding to new_metrics (only if the metric is not in the cache). That would give the best signal, but might be too expensive, as you worried.

I don't have any better ideas, maybe your current method will work well in practice :)


# If we've got a rate limit configured lets makes sure we enforce it
waitTime = 0
Expand All @@ -166,6 +177,10 @@ def writeCachedDataPoints():
if settings.ENABLE_TAGS:
tagQueue.update(metric)
updateTime = time.time() - t1
except FileNotFoundError: # don't log full stack trace when the db does not exist.
log.msg("Error writing %s: File does not exist (yet). " % metric +
"Increase MAX_CREATES_PER_MINUTE")
instrumentation.increment('errors')
except Exception as e:
log.err()
log.msg("Error writing to %s: %s" % (metric, e))
Expand All @@ -187,8 +202,8 @@ def writeForever():
while reactor.running:
try:
writeCachedDataPoints()
except Exception:
log.err()
except Exception as e:
log.err(e)
# Back-off on error to give the backend time to recover.
time.sleep(0.1)
else:
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
@@ -1,6 +1,6 @@
[tox]
envlist =
py{27,35,36,37,38,py}{,-pyhash},
py{27,35,36,37,38,py,py3}{,-pyhash},
lint,
benchmark

Expand Down