-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Regular Polling #874
Regular Polling #874
Conversation
This is ready for review but may need a few tweaks before we actually merge it. Like the Java version updates we should merge this after all the simpler/safer PRs, right before testing begins, so it can easily be backed out if we encounter any complex problems. |
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should | ||
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. | ||
*/ | ||
public final int MAX_TASKS_PER_WORKER = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the justification for increasing to 100 from 16? Are there any other ramifications other than response size that we should be concerned about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously this determined a the amount of tasks that were regularly delivered to a worker. The worker couldn't apply nuanced backpressure to the stream, so we'd usually hand out the maximum number to any worker, as long as the job had that many tasks left unfinished. With the workers now stating how many tasks they want, this value functions more as a true upper limit than a typical value. The worker has its own internal task buffer based on how many processor cores it has. It only requests as many tasks as it needs to top up that buffer, and it does so often. So most of the time it is expected to request less than 100, and maybe even less than the 16 that were delivered in the past. But if it's really burning through tasks quickly and has 90 free slots, it can ask for 90 and get them. This then serves as a guard rail to keep the response size reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the revised approach here will be much better for users and ourselves. For testing, we'll want to make sure we try large and small grid and freeform origins/destinations, including some path analyses.
Should the comments about long-polling (around https://github.com/conveyal/r5/pull/874/files#diff-841a03f379ef45c1154d2d2f56614d6acbda5a40d57b214f6b2d2b74a3795247R55) be updated?
// "[The] core pool size is the threshold beyond which [an] executor service prefers to queue up the task than | ||
// spawn a new thread." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think I intended to replace this with an extract from the original Javadoc, but with a few grammar tweaks this sentence sums up the situation well. I'll add an attribution link to the comment.
private static final int POLL_INTERVAL_MIN_SECONDS = 1; | ||
private static final int POLL_INTERVAL_MAX_SECONDS = 15; | ||
private static final int POLL_JITTER_SECONDS = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could any problems arise from POLL_JITTER_SECONDS
exceeding POLL_INTERVAL_MIN_SECONDS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The random delay is now only added before a worker's first time polling the backend. Once it starts polling, the min and max are respected with no additional random delays added in. I can't think of any reason the workers would drift into sync over time, so this single initial jitter delay should be sufficient.
return JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), | ||
List<RegionalTask> tasks = JsonUtilities.lenientObjectMapper.readValue(responseEntity.getContent(), | ||
new TypeReference<List<RegionalTask>>() {}); | ||
return tasks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own understanding, what's the reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the generic type parameter of the function allows Java to properly infer the return type from that TypeReference parameter. It should compile fine without assigning to a variable, so I probably did this just to make it easier to inspect the list of tasks received by placing a breakpoint on the final line with the return statement. Many debuggers including intellij allow you to create function breakpoints, pause on function exit, and watch the return values. In addition to being a little convoluted to set up, at some time in the past this would often make the debugger really slow or fail so I learned to avoid it. I should try using function breakpoints more often, maybe it works smoothly now. Here I also could have watched the value in the caller where the function return value is assigned to a variable, so I may have been rearranging some code when I added this assignment. There are other related cases where it can make sense to break operations out onto several lines, even using clearly named (final) variables that are expected to be optimized out by the JVM, just to facilitate debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like if you break anywhere inside a method and then hit the "step out" button/key IntelliJ now displays the return value among the local variables. And in any case here the return value is assigned on return. I'll refactor this back to the original combined method call / return statement.
@@ -24,6 +24,8 @@ | |||
public class WorkerStatus { | |||
|
|||
private static final Logger LOG = LoggerFactory.getLogger(WorkerStatus.class); | |||
private static final int LEGACY_WORKER_MAX_TASKS = 16; | |||
public static final int LEGACY_WORKER_MAX_POLL_INTERVAL_SECONDS = 2 * 60; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as legacy WORKER_RECORD_DURATION_MSEC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, when the workers poll, they send a value pollIntervalSeconds
which is the maximum amount of time until the worker will poll again. This is not hard-wired in case we need to change the value in future workers, or even dynamically adjust it on a single worker. If the backend is not polled by that same worker within pollIntervalSeconds
(plus some safety factor to account for contention or requests piling up) the worker must have shut down or become otherwise incapacitated. In the past, the workers did not supply this value, and the backend would assume any worker was shut down if it didn't poll every WORKER_RECORD_DURATION_MSEC
(creating problems when workers were too busy crunching numbers for difficult tasks).
I'll make some updates to these comments. They remain generally correct but could stand to be refreshed with some new details. Once I push that commit, this should be ready for final review and merge. |
Looking this over again, I thought of one more consideration. We need to gracefully handle two opposite situations: one where the tasks are trivial (no transit or even no roads at all around the origin point so they finish instantly) and one where the tasks are difficult (car routing, huge destination grids etc. that are very slow). If the new code allows one or more workers to grab a large number of tasks at once, the whole job could suffer from a slow-joiner problem if those tasks are very slow. For example, if there are 200 workers active, just one of them might grab the last 40 tasks in a job, leaving all the other workers idle while it slowly works through the last 40 tasks. If each task takes 30 seconds, the last few percent of the job will take 20 minutes. In principle this problem can already occur with the previous system, but we don't want to make it worse (and should think over ways to alleviate it in the future). In the old code we have: The new system tries to maintain a full queue with 10c slots, so 40 for 4 cores and 20 for 2 cores. So in most circumstances I don't expect the slow-joiner problem to be worse with the new code. However, looking at some specific cases: Behavior might be better with the new system where the tasks are really slow to complete but polling is more regular and frequent and there are a large number of tasks relative to the number of queue slots per worker. We would expect the last N tasks to be evenly distributed to a large number of workers, as they all poll in rapid succession trying to fill the few slots in their queues that are empty. Behavior might be bad with both systems (and possibly worse with the new system) when we have extremely slow tasks and a small number of origins in a job. For example 80 freeform origin points with car routing and a huge number of destinations. Assuming 4-core workers, under the new system the first two workers will take all the tasks and slowly crunch through them. Under the old system, the first worker will take 40 tasks, and a batch of workers will start up each taking 16 tasks, but that batch will be very small (maybe only one or two additional workers). In either case at least one slow joiner has 40 tasks to work through, so the unnecessary delay in finishing the job may be equal. This problem may show up in either system when the total number of tasks is significantly less than the total number of slots in all workers, closer to the total number of slots in a single worker. Considering all this, we may want to set the max tasks delivered on each polling operation lower, closer to its previous value of 16. Workers would adapt by polling more than once in rapid succession, still filling their queues but spreading tasks across all workers. Nonetheless, the first worker still might fill up its queue before any additional workers start, making it the slow joiner. So it may also be a good idea to make the queues shorter. This will have the negative effect of slowing down huge jobs with lots of trivial tasks. In short there are several parameters to balance, and I'll try to just set them similar to the old system until we can carry out some tuning experiments. The best solution long term may be active shaping of how many tasks are distributed to each worker by the broker. This should not be too hard to accomplish given an accurate catalog of active workers and their current workloads. The catalog was not very accurate before due to sporadic polling, but its accuracy should now bee high after this PR. So my current thinking is to replicate the old behavior (distribute small blocks of tasks to workers), then more fairly distribute them in larger blocks in a future update to worker pool management. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving for merge and testing.
Addresses #596. Workers poll every 15 seconds even when busy, ask for a specific number of tasks, and report how soon they will poll again. Backend becomes aware of worker shutdown within 20 seconds. Designed to be backward compatible with old workers.
This is a relatively straightforward change that has been tested locally, but is created as a draft PR as it touches core regional analysis functionality so needs careful review and testing.