Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed ProcessManager not reaping child processes, causing zombie process accumulation on long-running Flink deployments (Java) ([#37930](https://github.com/apache/beam/issues/37930)).

## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ private void stopProcess(String id, Process process) {
}
}
}
// Reap the child process to prevent zombie accumulation. destroy()/destroyForcibly() send
// signals but do not call waitpid(), so the terminated process remains in the kernel process
// table as a zombie until waitFor() collects its exit status.
try {
process.waitFor();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

/** Returns true if the process exists within maxWaitTimeMillis. */
Expand Down Expand Up @@ -249,8 +257,16 @@ private void stopAllProcesses() {
processes.forEach((id, process) -> process.destroy());
}

/** Kill all remaining processes forcibly, i.e. upon JVM shutdown */
/** Kill all remaining processes forcibly and reap them, i.e. upon JVM shutdown. */
private void killAllProcesses() {
processes.forEach((id, process) -> process.destroyForcibly());
processes.forEach(
(id, process) -> {
process.destroyForcibly();
try {
process.waitFor();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
Loading