New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9474] Improve robustness of BundleFactory and ProcessEnvironment #11084
Conversation
retest this please |
...cution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
Outdated
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Show resolved
Hide resolved
} else { | ||
LOG.info("Process for worker {} still running. Killing.", id); | ||
process.destroyForcibly(); | ||
long maxTimeToWait = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the timeout change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old timeout was too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did that manifest? 500ms timeout for process termination seems too aggressive. I would prefer a longer timeout to allow for connections to be closed gracefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shutdown code is synchronous, so I'd prefer a shorter timeout here. It would be a good improvement to make it async.
I've tested this on a cluster with many runs and I've not seen a single instance lingering. Also I did not notice any difference in the logs. The environment will be torn down last, after all connections have been closed. So failures would not be visible anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want I can restore the old timeout but I would then also change the code to make the stopping async or at least stop all the processes at once and then wait (instead of tearing down the process one-by-one and wait for each process to quit).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have verified that the graceful shutdown works (in the happy path), then we are good. Maybe add a comment to the code, since all of this isn't very obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not always shutting down gracefully but that's what the change is about: removing processes and ensuring a quick recovery time. It's a trade-off. Ideally we would want to allow more time but if we wait 2 seconds with an SDK parallelism of 16, that's already more than half a minute waiting time. We really want to do the process removal in parallel. I'll look into this.
I'm not sure the ProcessManager is a good place to document the shutdown behavior. If you have any suggestions though, I'll add them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't delay the PR for it!
...-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
Outdated
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Show resolved
Hide resolved
retest this please |
@@ -352,20 +407,18 @@ public RemoteBundle getBundle( | |||
// The blocking queue of caches for serving multiple bundles concurrently. | |||
currentCache = availableCaches.take(); | |||
client = currentCache.getUnchecked(executableStage.getEnvironment()); | |||
client.ref(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client.ref
needs to remain here, the lines below rely on that and it is also more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please explain which lines rely on it.
Readability is highly highly subjective, I find it more readable to not duplicate the same code for two branches, as it is currently the case (which is potentially error-prone).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked again and couldn't see how this change alters the behavior. In any case, I don't mind to move it back to unblock this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below, preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
removes everything that isn't referenced. These two statements are logically one unit and hence I prefer to not scatter them:
client = currentCache.getUnchecked(executableStage.getEnvironment());
client.ref();
client.ref();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not true that a later ref()
introduces a bug for preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
because the refcount will be >0, otherwise we wouldn't be able to retrieve the client from the cache.
In any case, I will revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not true that a later
ref()
introduces a bug forpreparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
because the refcount will be >0, otherwise we wouldn't be able to retrieve the client from the cache.
Cache and environment are shared between executable stages. So the refcount can become 0 with concurrent eviction and release. That actually raises the question if these 2 statements should be atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that makes sense. I've already reverted the change.
I suppose there is a race condition where we retrieve an environment X and before we can call ref()
on it, we evict the environment X, close all its references, and shut it down. This will result in a job restart.
// Ensure client is referenced for this bundle, unref in close() | ||
client.ref(); | ||
// Cleanup list of clients which were active during eviction but now do not hold references | ||
evictedActiveClients.removeIf(c -> c.bundleRefCount.get() <= 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I like this in the path for every bundle. We could probably move it below line 416 (preparedClients.keySet().removeIf(c -> c.bundleRefCount.get() <= 0);
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cheap operation. I don't think we can move it because we need to run this cleanup for all branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've improved this to only remove when environment expiration is used.
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
Thanks for your comments @tweise. Let me know if you have more questions. |
3e305c7
to
5c04950
Compare
The cleanup code in DefaultJobBundleFactory and its RemoteEnvironments may leak resources. This is especially a concern when the execution engines reuses the same JVM or underlying machines for multiple runs of a pipeline. Exceptions encountered during cleanup should not lead to aborting the cleanup procedure. Not all code handles this correctly. We should also ensure that the cleanup succeeds even if the runner does not properly close the bundle, e.g. when a exception occurs during closing the bundle.
int count = bundleRefCount.decrementAndGet(); | ||
if (count == 0) { | ||
int refCount = bundleRefCount.decrementAndGet(); | ||
Preconditions.checkState(refCount >= 0, "Reference count must not be negative."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, I've added this check instead to check for correct bounds.
The cleanup code in DefaultJobBundleFactory and its RemoteEnvironments may leak
resources. This is especially a concern when the execution engines reuses the
same JVM or underlying machines for multiple runs of a pipeline.
Exceptions encountered during cleanup should not lead to aborting the cleanup
procedure. Not all code handles this correctly. We should also ensure that the
cleanup succeeds even if the runner does not properly close the bundle,
e.g. when a exception occurs during closing the bundle.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.