-
Notifications
You must be signed in to change notification settings - Fork 72
[HWKMETRICS-613] Add an index to keep track of metrics that need to b… #785
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…e deleted. A metric becomes eligible for deletion when all the data points for the metric expire.
|
Please do not merge. I am submitting an early PR to see the impact on write performance. |
|
We removed earlier the feature for metrics_idx and now we're reintroducing the double writes? This should not happen during the writepath. It will effectively halve our writing performance (you can measure this with the JMH benchmarks, ignore the perf-stability-test). If such feature is necessary a much better way would be to do a daily scan to see which metrics are no longer used and then delete them. Same way as we do compression job, you can scan them there once and not interrupt with the write pipeline at all. |
|
I think there several options we can consider.
Options 1 and 2 minimize the reads we have to do which is generally a good thing. We never have to touch the data or data_compressed tables with these options. There is no need for the job to be fast, so if we a daily scan we can certainly throttle the reads to avoid putting to much stress on Cassandra. I guess we need to look at some performance numbers for comparison. |
|
The first option should be ruled out, I don't think 50% performance loss in the backend is acceptable. Second option would require some sort aggregation (so we don't update too often the same row) and if that state must be trusted, then it can't be lost. For those reasons, I think reading is a better choice. Just throttle it enough. It shouldn't take that much time to fetch the min/max timestamps from each metric, like we previously did with the metric definitions fetching. We really don't need to do this operation often. |
|
There is a key difference in writing to metrics_idx with respect to compaction. metrics_idx uses LCS. The new table here uses STCS, which is less I/O intensive than LCS. While I agree that option 1 will slow down write performance, I do not necessarily expect a 50% performance hit. For option 2, the aggregation is nothing more than storing the MetricId. That's it. And we can tolerate write failures. |
|
Here is an idea for a slightly different approach. The call to update the expiration index is now in three places: create metric, update tags, add data points. Create metric and update tags would stay the same as in this PR. For the data points, use the compression job. The expiration index is updated only once, when a compressed block is written. This way we leave the index in place for easy removal, but leave the data point insertion path almost unchanged. And the number of writes for data points will drastically reduced since it is done once every 2 hours for a metric with data points. Any thoughts? |
|
Doing it during the compression job makes sense since we have already queried data. |
|
LCS / STCS makes no difference in the write path as the write happens to WAL & memtable, not to SSTables yet. We do suffer a 50% reduction and even the REST-test got -40% performance (although we didn't spend twice the time in the REST-layer). Compression job could do the update, it doesn't hurt there. |
If SSD is used or if the commit log is on its own, dedicated disk, then I agree that the compaction strategy should not matter as much; however, I think it is safest to assume that the commit log is on shared storage in which case the additional I/O from LCS absolutely can make a significant different. LCS is also more CPU intensive than STCS which could also be a factor. All that aside, I like @stefannegrea's idea of doing the update during the compression job. We get the best of both worlds. We do not impact the ingest write path and we avoid performing extra reads. |
|
Well, even with the same disk being used we don't fsync on every write (only every 10 seconds), so the write is actually only to the memory at first. The memtable handling in Cassandra is just very slow.. |
|
The only problem with my idea is that String metrics will need to be excluded from purge for now since they are not compressed. |
|
I think we can handle string metrics separately. At some point, I think they ought to go in a separate table, and they are not even used in OpenShift. For v1, I say we do not worry about string metrics. For v2, let's get a separate table for string metrics, and we can decide what approach to take. |
…ta points by updating the retention index only when a compressed block is persisted.
…re properly created when data points are compressed.
… daily at a configured interval; the default is every 7 days.
|
retest this please |
| // TODO Optimization - new worker per token - use parallelism in Cassandra (with configured parallelism) | ||
| return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize) | ||
| return metricsService.compressBlock(metricIds, startOfSlice, endOfSlice, pageSize, subject) | ||
| .doOnError(t -> logger.warn("Failed to compress data", t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to call either subject.onCompleted() or subject.onError() here in the doOnError callback.
| } | ||
|
|
||
| @Override | ||
| public <T> void updateMetricExpiration(Metric<T> metric) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep things reactive and functional and return either Observable or (preferably) Completable instead of void.
|
It would be good to have a test (or tests) in DeleteExpiredMetricsJobITest that execute the repeating job. Look at CompressDataJobITest.testCompressJob() for an example on how to do this, and/or hit me up with questions. You also probably want to change the test method name from DeleteExpiredMetricsJobITest.testCompressJob to something like DeleteExpiredMetricsJobITest.testDeleteExpiredMetricsJob or DeleteExpiredMetricsJobITest.runDeleteExpiredMetricsJob. |
| @Override | ||
| public <T> void updateMetricExpiration(Metric<T> metric) { | ||
| if (!MetricType.STRING.equals(metric.getType())) { | ||
| long expiration = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we calculate expiration using the latest timestamp in the compressed block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not do any extra work because that will just change the time by an hour or two, since the job is run in daily increments it will not make much of a difference.
|
@stefannegrea, I just now thought about something we probably need to handle. What if the compression job is not running for some reason? The metrics expiration index won't get updated, and if the delete job is running, we could potentially wind up deleting metrics that should not be deleted. Considering we allow the compression job to be disabled, this scenario is entirely possible. |
…ration index reactive. Also, complete the publish subject in case of a compression error.
…ion job via the scheduler.
…ion job are not flags.
…e if there is still unexpired data for a metric before purging it.
|
Can you change the return type of DataAccess.updateMetricExpirationIndex to be Observable so it is consistent with the rest of DataAccess? |
|
In |
| } | ||
| if (!compressJobEnabled) { | ||
| expirationIndexResults = expirationIndexResults | ||
| .flatMap(r -> session |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are a couple problems here. First, I do not think that you can unconditionally use expirationIndexResults. What if compression has always been disabled? And there is no guarantee that metrics are explicitly created. I think we need some extra book keeping to know if/when metrics_expiration_idx has been updated. Secondly, you need to update metrics_expiration_idx in this call chain as you iterate through the data table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no entry in the expiration index then that is good, it means the data will automatically expire based on the TTL. We are only concerned about manually cleaning that does not have a TTL (in general that is indexes). For that case, an entry in the expiration index is created every a metric is intentionally created or when tags are inserted or updated.
All what the compress job do is extend the expiration time. If the compress job is disabled, then we have no way of reliably extending the expiration time, hence we need to query and check if there is unexpired data.
| .map(empty -> r)); | ||
| } | ||
|
|
||
| return expirationIndexResults |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be inclined to use concatMap here instead of flatMap just as a way to throttle. I would also add in some failure handling. If deleting one metric fails for whatever reason, we want to continue with the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's reusable retry mechanism in MetricsServiceImpl, use that one.
…urn Observable. The scheduler is now started after all the recurring jobs are scheduled. And log errors when metrics cannot be deleted.
| return expirationIndexResults | ||
| .flatMap(metricId -> metricsService.deleteMetric(metricId)) | ||
| .concatMap(metricId -> metricsService.deleteMetric(metricId)) | ||
| .doOnError(e -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logs the error which is good, but we still terminate the stream early. You need onErrorResumeNext or something similar.
…a delete operation fails, it will be tried again when the expiration index is reprocessed again.
…e deleted. A metric becomes eligible for deletion when all the data points for the metric expire.