Skip to content

Commit

Permalink
[BEAM-12792] Install pipline dependencies to temporary venv (#16658)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoerious committed Nov 10, 2022
1 parent 74f87b0 commit 08b6a52
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
than requiring them to be passed separately via the `--extra_package` option
(Python) ([#23684](https://github.com/apache/beam/pull/23684)).
* Pipeline Resource Hints now supported via `--resource_hints` flag (Go) ([#23990](https://github.com/apache/beam/pull/23990)).
* Make Python SDK containers reusable on portable runners by installing dependencies to temporary venvs ([BEAM-12792](https://issues.apache.org/jira/browse/BEAM-12792)).

## Breaking Changes

Expand Down
36 changes: 23 additions & 13 deletions sdks/python/apache_beam/runners/worker/worker_pool_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@
_LOGGER = logging.getLogger(__name__)


def kill_process_gracefully(proc, timeout=10):
"""
Kill a worker process gracefully by sending a SIGTERM and waiting for
it to finish. A SIGKILL will be sent if the process has not finished
after ``timeout`` seconds.
"""
def _kill():
proc.terminate()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
_LOGGER.warning('Worker process did not respond, killing it.')
proc.kill()
proc.wait() # Avoid zombies

kill_thread = threading.Thread(target=_kill)
kill_thread.start()
kill_thread.join()


class BeamFnExternalWorkerPoolServicer(
beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):

Expand Down Expand Up @@ -95,7 +115,7 @@ def start(
# Register to kill the subprocesses on exit.
def kill_worker_processes():
for worker_process in worker_pool._worker_processes.values():
worker_process.kill()
kill_process_gracefully(worker_process)

atexit.register(kill_worker_processes)

Expand Down Expand Up @@ -172,19 +192,9 @@ def StopWorker(self,
worker_process = self._worker_processes.pop(
stop_worker_request.worker_id, None)
if worker_process:

def kill_worker_process():
try:
worker_process.kill()
except OSError:
# ignore already terminated process
return

_LOGGER.info("Stopping worker %s" % stop_worker_request.worker_id)
# communicate is necessary to avoid zombie process
# time box communicate (it has no timeout parameter in Py2)
threading.Timer(1, kill_worker_process).start()
worker_process.communicate()
kill_process_gracefully(worker_process)

return beam_fn_api_pb2.StopWorkerResponse()


Expand Down
2 changes: 1 addition & 1 deletion sdks/python/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ RUN ccache --set-config=sloppiness=file_macro && ccache --set-config=hash_dir=fa

####
# Install Apache Beam SDK. Use --no-deps and pip check to verify that all
# necessary dependencies are specified in base_image_requiremetns.txt.
# necessary dependencies are specified in base_image_requirements.txt.
####
COPY target/apache-beam.tar.gz /opt/apache/beam/tars/
RUN pip install --no-deps -v /opt/apache/beam/tars/apache-beam.tar.gz[gcp]
Expand Down
Loading

0 comments on commit 08b6a52

Please sign in to comment.