-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework shard snapshot workers #88209
Conversation
Hi @pxsalehi, I've created a changelog YAML for you. |
Pinging @elastic/es-distributed (Team:Distributed) |
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
Outdated
Show resolved
Hide resolved
I wonder if this approach is necessarily better. IIUC, we currently keep the one thread looping until the end. In the new approach, we risk the first (or an early) shard filling up the snapshot queue before the last tasks are added to the queue. This could mean that shards with no changes being scheduled after large file uploads? |
We figured this was somewhat unlikely (to happen a lot) and outweigh by the fact that we now multi-thread the starting of shard snapshots (which is the main motivation for this change, as we realized that running this single-threaded could take a long time on large data nodes). |
(Apologies for using too many vague/relative terms like many/few/large! But I need to somehow ask my question!) For me one missing piece to evaluate the worst case impact of a file upload on unchanged shard snapshots, is the following: could the size of single files (to be uploaded) be extremely large? What would be a realistic "average" case? Or is it hard to tell since it could be wildly different depending on the settings/load? It seemed to me that many not-huge files to uploads is the common case. (Also another assumption from our talk, it seems often what happens during a snapshot is that most shards have no changes, and only few of them need to upload files.) |
We generally try to not fill the queue with all tasks, for instance in If we do all shards in parallel, we risk getting I think this would be true also if we queue up the shard level execution and only do n jobs in parallel. |
You're not directly missing anything actually. But I think there is a tradeoff here. This change makes it so that running a snapshot on a 10k shard warm-node may block the snapshot pool for ~15 min (assuming ~0.5s per shard snapshot and 5 snapshot threads max) vs. taking 75min to run a single snapshot without blocking the pool.
True, the advantage of that approach would be that we'd have a harder guarantee on running the metadata work right before the file uploads (and it's a little faster potentially I guess). |
I'm trying to understand how much of this discussion relates to the original issue (#83408). If blocking the snapshot threadpool during a snapshot is a concern, then what would be an acceptable approach that addresses the issue and avoids blocking the threadpool? One thread going through the shard snapshot tasks, calculating which snapshots do not upload a file, doing all of those in the same thread, and then again in the same thread going through the shard snapshots that do have a change, and only fork-off the file-uploads? Basically, everything else other than file upload should be done on the thread going through the shards, and actually in two passes? No snapshotting of unchanged shards in parallel? Considering having many thousand unchanged shard snapshots and few shards actually uploading files is a normal occurrence, as Armin mentioned, parallelizing snapshotting of unchanged shards seems to reduce snapshot time, which I guess is an improvement. My question is that if we do not want to parallelize shard snapshot tasks, then would the approach I mentioned above have any real improvement for the issue? Currently, all unchanged snapshots happen in the same thread, which doesn't seem that different than the two phase approach! |
Armin and I had a brief conversation on this and while the existing mechanism for limiting the number of upload threads is somewhat broken, running the outer level tasks in parallel will slightly increase this brokennes. We prefer to instead fix this for good by maintaining enough data structures to just fill the snapshot pool with the right amount of work without exhausting it for potentially hours. Something like 2 queues (the other level work and the actual file upload work) and a counter for how many jobs are active should do. When any job finishes it checks the outer level queue first and once that is depleted it does the actual file upload jobs. This also ensures that all no-change shards are done prior to any file uploads. I think Armin intends to chat sync with you on this. |
@elasticmachine please run elasticsearch-ci/part-1 (I think, the test failure was unrelated to the PR. I opened: #88615) |
This reverts commit 88847d4183185f8ec4f5fec846ce13bd0cda5a61.
@henningandersen Thanks for the detailed feedback. I addressed all your comments. Please have another look. Meanwhile I'll look into that previous CI failure to see if it is related or I was just paranoid! |
I am not able to reproduce that issue on the branch. For the record, it used to timeout on different asserts in the test, the couple of times that it happened (not just a specific one), and that was on a pretty slow system. I think, it is safe to dismiss that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me now, but I'd like @original-brownbear to do the final review on this.
|
||
@Override | ||
public int compareTo(TestTask o) { | ||
return Integer.compare(priority, o.priority); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we return priority - o.priority
instead? Integer.compare
normalizes to -1, 0, 1
, thus we do not test that any other values has the right meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@henningandersen the compareTo
is only used by the priority queue, which doesn't care for the actual distance between the two priorities, I think! So I don't understand why does this matter? Could you please elaborate why is this important?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of a test verifying that PrioritizedThrottledTaskRunner
executes tasks in the right order when presented with any comparable object. But the test here is restricted to only verify using a sub-set of such comparable objects that use "normalized" return values in compareTo
.
Arguing in terms of the implementation seems invalid, the purpose of the tests is to demonstrate that the implementation works under as many circumstances as possible. Given the simple change to demonstrate this under more circumstances here, I think we should make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've done the change already. Thanks for the explanation! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, just one point where I asked for some docs because I'm having trouble understanding the code and one rather trivial detail and this should be good to go :)
} | ||
|
||
// visible for testing | ||
protected void pollAndSpawn() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some commentary on how this loop works. In particular, on why we need to peek the queue. This is somewhat hard to follow for me and will be even harder to follow for future readers of this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
this.context = context; | ||
} | ||
|
||
public abstract short priority(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just use int
here. short
is kind of pointless isn't it, might even be a net negative for comparison performance?
@original-brownbear @henningandersen All done! Please check again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I'm good with this one now as is. Performance seems alright too, gave it a quick test run benchmarking. Unless @henningandersen has anything open? I think we're good to go here.
for (int i = 0; i < enqueued; i++) { | ||
new Thread(() -> { | ||
try { | ||
threadBlocker.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: could use CyclicBarrier
here instead which seems to be the correct primitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. But since we need only a one-time barrier, the latch is enough I think. I've seen it used like that in several places in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree with Armin here, CyclicBarrier
is good for a rendezvous interaction. And saves a line of code too.
But fine to leave as is...
Thanks Henning and Armin! |
Currently, when starting the shard snapshot tasks, we loop through the
shards to be snapshotted back-to-back. This will calculate whether/which
changes in the shard there are to be snapshotted, write some metadata,
and ONLY if there are changes and there are files to be uploaded, fork
off the file upload calls. This could be further improved by parallelizing
shard snapshot tasks (currently, only file uploads happen in parallel).
This change uses a worker pool and a queue to:
limit the number of the concurrently running snapshot tasks.
Closes #83408