-
Notifications
You must be signed in to change notification settings - Fork 445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable background refresh for the scan server tablet metadata cache #4551
Conversation
This adds a property to configure the scan server tablet metadata Caffeine cache to refresh cached tablet metadata in the background on cache hits after the refresh time has passed. The refresh time is expressed as a percentage of the expiration time. This allows the cached entries to refresh before expiration if they are frequently used so that scans will not be blocked waiting on a refresh on expiration. Entries still expire if no cache hits come after the refresh time and expiration time passes. See: https://github.com/ben-manes/caffeine/wiki/Refresh This closes apache#4544
// Add a sleep that is long enough that the configured refresh interval passes if | ||
// the test has been set to use one. | ||
Thread.sleep(1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw, if you can mock time to avoid flaky tests then Caffeine's time source is configurable (docs). It may not be practical if the code wasn't designed for that, of course. If it is then Guava's FakeTicker
is very handy, as is using a caller-runs executor (Runnable::run
) or Awaitility
for coordinating concurrent calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip for using the mock time, I don't think it would be easy to set in this case but would be useful for other cases for sure. We also have a Wait.waitFor() utility which could be used here instead of sleep by scanning in a loop probably, at least to check for the refresh case (wouldn't help so much for checking it didn't refresh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh neat. What are the benefits of that utility versus Awaitility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awaitility
looks to be a lot better but I didn't even know about it until you pointed it out. The Wait
utility class is just a simple utility to wait for a condition to turn true that we added for help with testing but it appears like we could replace it with Awaitility
and get a lot more features.
long cacheRefresh = (long) (cacheExpiration * cacheRefreshPercentage); | ||
LOG.debug("Tablet metadata refresh percentage set to {}, refresh time set to {} ms", | ||
cacheRefreshPercentage, cacheRefresh); | ||
builder.refreshAfterWrite(cacheRefresh, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What thread will run the refresh operation? Based on comments by @ben-manes on #4544 it seems like the JVM wide thread pool may be used, which may not be best for I/O heavy refresh that scan server will do. If that is the case may want to use a thread pool created in the scan server for refresh.
Looking at the javadocs, when an error happens in a background refresh it seems like Caffeine will log this somewhere possibly using JVM logging APIs. Not sure where that would go. If these will not go to slf4j then we may want to have these background refresh task try/catch/log into a slf4j logger we know about.
Was unsure if this code should be builder = builder.refreshAfterWrite(...)
. Looked at the javadoc and it mentions this
is returned, so do not need to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these will not go to slf4j then we may want to have these background refresh task try/catch/log into a slf4j logger we know about.
By default the platform logger redirects to java.util.logging
(jul), which if not configured would go to the console. You can redirect the platform logger using slf4j-jdk-platform-logging
, jul (with small overhead) using jul-to-slf4j
, and console using sysout-over-slf4j. It is usually good to configure the redirects in an application to ensure everything goes to your preferred logger (log4j2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is usually good to configure the redirects in an application to ensure everything goes to your preferred logger (log4j2).
Thanks for the tip @ben-manes. Need to look into the default configs for Accumulo and see if any handling is being setup for jul->log4j2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np! I really like having the console logger setup too just as a failsafe from someone naively using System.out
or e.printStackTrace()
. There are linters for that, but still nice to have as a failsafe for 3rd party code.
"Tablet metadata cache refresh percentage is '%s' but must be less than 1", | ||
cacheRefreshPercentage); | ||
|
||
var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may be able to supply an executor with:
var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.systemScheduler()).recordStats().executor(context.getScheduledExecutor());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a scheduled thread pool is not the correct type, then something like
Executor refresher = context.threadPools().getPoolBuilder("scan.server.cache.refresh")
.numCoreThreads(1).numMaxThreads(4).build();
var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.systemScheduler()).recordStats().executor(refresher);
would create a thread pool - not sure what the numbers should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you want refresh to be included in this doc snippet's limit?
accumulo/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
Lines 229 to 231 in e16976c
// Note: The way to control the number of concurrent scans that a ScanServer will | |
// perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number | |
// of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would create a thread pool - not sure what the numbers should be.
One factor that I do not fully understand ATM is bulk updates w.r.t. to refresh operation in Caffeine. A comment was made on #4544 stating that caffeine refresh may be able to batch refreshes. If so, then that could work nicely with using an Accumulo batch scanner to refresh lots of tablets metadata in one go and would also generally mean we probably only need few threads for refresh.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expiration should be staggered based on access - so that could point to using a small number of threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to aggregate individual refreshes over a time window, perform the bulk operation, and complete the respective CompletableFuture
with the value for the given key. Caffeine doesn't know it is being batched, which wouldn't help since most refreshes are triggered from distinct lookups (unlike, for instance, getAll). Since it is an async task the cache merely thinks it took longer to execute, the worst case is slightly skewed stats (but custom stats fixes that). It is a little bit of code wrangling if using java.util.concurrent, but very trivial if you can glue together with a reactive stream library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to aggregate individual refreshes over a time window, perform the bulk operation, and complete the respective CompletableFuture with the value for the given key.
I see. So we could do something like have single thread that continually pulls refresh work off a queue, then does an Accumulo batch scan to read all data, and then completes futures related to work it pulled off the queue. The reload pluging to caffeine could push uncompleted futures onto the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that would work, though you don't need a dedicated thread to poll. If you lock within the asyncReload method then you can detect if the request is the start of a new batch and schedule a task to run at the end of the time window. That task would lock, accumulate the work, unlock, submit it for processing, and complete the pending futures. The lock hold times are a few nanoseconds to coordinate and you can defer to a processing thread pool to allow for parallel batches if helpful (e.g. if a batch size limit).
I should point out that this is targeting 2.1 even though there's a new property because the Scan server properties are all marked experimental anyways. We could wait for 3.1 but this seems beneficial enough to put into 2.1 and is low risk. |
Reminder to wire up to the Caches class in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good
cacheRefreshPercentage); | ||
|
||
var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS) | ||
.scheduler(Scheduler.systemScheduler()).executor(context.getScheduledExecutor()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A dedicated thread pool for the cache would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a dedicated executor with a fixed pool size of 8 for now to keep it simple. It's a sync cache and we are not using it for loading just for other things like refresh, expiration tasks, etc. If we are not keeping up or need more control we could add a property to make the size configurable or bump up the executor size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed one more update that enables metrics so we can monitor the thread pool to see if we need to make adjustments to the size
This was done in merge commit 23ff646 |
This adds a property to configure the scan server tablet metadata Caffeine cache to refresh cached tablet metadata in the background on cache hits after the refresh time has passed. The refresh time is expressed as a percentage of the expiration time. This allows the cached entries to refresh before expiration if they are frequently used so that scans will not be blocked waiting on a refresh on expiration. Entries still expire if no cache hits come after the refresh time and expiration time passes.
See: https://github.com/ben-manes/caffeine/wiki/Refresh
This closes #4544