Skip to content

Commit

Permalink
[BEAM-8733] Handle the registration request synchronously in the Pyth…
Browse files Browse the repository at this point in the history
…on SDK harness.
  • Loading branch information
sunjincheng121 committed Nov 28, 2019
1 parent 2fc1b84 commit 26596c8
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Expand Up @@ -145,16 +145,12 @@ def _execute(self, task, request):
self._responses.put(response)

def _request_register(self, request):
self._request_execute(request)
# registration request is handled synchronously
self._execute(
lambda: self.create_worker().do_instruction(request), request)

def _request_process_bundle(self, request):

def task():
self._execute(
lambda: self.create_worker().do_instruction(request), request)
self._worker_thread_pool.submit(task)
_LOGGER.debug(
"Currently using %s threads." % len(self._worker_thread_pool._workers))
self._request_execute(request)

def _request_process_bundle_split(self, request):
self._request_process_bundle_action(request)
Expand Down Expand Up @@ -190,6 +186,8 @@ def task():
lambda: self.create_worker().do_instruction(request), request)

self._worker_thread_pool.submit(task)
_LOGGER.debug(
"Currently using %s threads." % len(self._worker_thread_pool._workers))

def create_worker(self):
return SdkWorker(
Expand Down

0 comments on commit 26596c8

Please sign in to comment.