From 08b4eb290fa4b51c85f95d6c3a47c6648eb11a0c Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Tue, 29 Aug 2017 12:32:38 -0700 Subject: [PATCH 1/2] Enable progress request handling in python SDK harness --- .../apache_beam/runners/worker/sdk_worker.py | 82 +++++++++++++------ 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6a236802b9a6..40357a675391 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -21,10 +21,11 @@ from __future__ import division from __future__ import print_function +import functools import logging import Queue as queue -import threading import traceback +from concurrent import futures from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.runners.worker import bundle_processor @@ -36,6 +37,9 @@ class SdkHarness(object): def __init__(self, control_channel): self._control_channel = control_channel self._data_channel_factory = data_plane.GrpcClientDataChannelFactory() + # TODO: Ensure thread safety to run with more than 1 thread. + self._default_work_thread_pool = futures.ThreadPoolExecutor(max_workers=1) + self._progress_thread_pool = futures.ThreadPoolExecutor(max_workers=1) def run(self): contol_stub = beam_fn_api_pb2.BeamFnControlStub(self._control_channel) @@ -56,20 +60,43 @@ def get_responses(): def process_requests(): for work_request in contol_stub.Control(get_responses()): logging.info('Got work %s', work_request.instruction_id) - try: - response = self.worker.do_instruction(work_request) - except Exception: # pylint: disable=broad-except - logging.error( - 'Error processing instruction %s', - work_request.instruction_id, - exc_info=True) - response = beam_fn_api_pb2.InstructionResponse( - instruction_id=work_request.instruction_id, - error=traceback.format_exc()) - responses.put(response) - t = threading.Thread(target=process_requests) - t.start() - t.join() + request_type = work_request.WhichOneof('request') + if request_type == ['process_bundle_progress']: + thread_pool = self._progress_thread_pool + else: + thread_pool = self._default_work_thread_pool + + # Need this wrapper to capture the original stack trace. + def do_instruction(request): + try: + return self.worker.do_instruction(request) + except Exception as e: # pylint: disable=broad-except + traceback_str = traceback.format_exc(e) + raise StandardError("Error processing request. Original traceback " + "is\n%s\n" % traceback_str) + + def handle_response(request, response_future): + try: + response = response_future.result() + except Exception as e: # pylint: disable=broad-except + logging.error( + 'Error processing instruction %s', + request.instruction_id, + exc_info=True) + response = beam_fn_api_pb2.InstructionResponse( + instruction_id=request.instruction_id, + error=str(e)) + responses.put(response) + + thread_pool.submit(do_instruction, work_request).add_done_callback( + functools.partial(handle_response, work_request)) + + process_requests() + logging.info("No more requests from control plane") + logging.info("SDK Harness waiting for in-flight requests to complete") + # Wait until existing requests are processed. + self._progress_thread_pool.shutdown() + self._default_work_thread_pool.shutdown() # get_responses may be blocked on responses.get(), but we need to return # control to its caller. responses.put(no_more_work) @@ -87,20 +114,18 @@ def __init__(self, state_handler, data_channel_factory): def do_instruction(self, request): request_type = request.WhichOneof('request') if request_type: - # E.g. if register is set, this will construct - # InstructionResponse(register=self.register(request.register)) - return beam_fn_api_pb2.InstructionResponse(**{ - 'instruction_id': request.instruction_id, - request_type: getattr(self, request_type) - (getattr(request, request_type), request.instruction_id) - }) + # E.g. if register is set, this will call self.register(request.register)) + return getattr(self, request_type)( + getattr(request, request_type), request.instruction_id) else: raise NotImplementedError - def register(self, request, unused_instruction_id=None): + def register(self, request, instruction_id): for process_bundle_descriptor in request.process_bundle_descriptor: self.fns[process_bundle_descriptor.id] = process_bundle_descriptor - return beam_fn_api_pb2.RegisterResponse() + return beam_fn_api_pb2.InstructionResponse(**{ + 'instruction_id': instruction_id, + 'register': beam_fn_api_pb2.RegisterResponse()}) def process_bundle(self, request, instruction_id): bundle_processor.BundleProcessor( @@ -108,4 +133,11 @@ def process_bundle(self, request, instruction_id): self.state_handler, self.data_channel_factory).process_bundle(instruction_id) - return beam_fn_api_pb2.ProcessBundleResponse() + return beam_fn_api_pb2.InstructionResponse(**{ + 'instruction_id': instruction_id, + 'process_bundle': beam_fn_api_pb2.ProcessBundleResponse()}) + + def process_bundle_progress(self, request, instruction_id): + return beam_fn_api_pb2.InstructionResponse(**{ + 'instruction_id': instruction_id, + 'error': 'Not Supported'}) From 56f3e2748221cdea4203cebce6cd777e3faaf5bc Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Sat, 16 Sep 2017 16:58:10 -0700 Subject: [PATCH 2/2] Address comments --- .../apache_beam/runners/worker/sdk_worker.py | 68 +++++++++---------- 1 file changed, 33 insertions(+), 35 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 40357a675391..f2ca5667ab99 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -57,41 +57,39 @@ def get_responses(): return yield response - def process_requests(): - for work_request in contol_stub.Control(get_responses()): - logging.info('Got work %s', work_request.instruction_id) - request_type = work_request.WhichOneof('request') - if request_type == ['process_bundle_progress']: - thread_pool = self._progress_thread_pool - else: - thread_pool = self._default_work_thread_pool - - # Need this wrapper to capture the original stack trace. - def do_instruction(request): - try: - return self.worker.do_instruction(request) - except Exception as e: # pylint: disable=broad-except - traceback_str = traceback.format_exc(e) - raise StandardError("Error processing request. Original traceback " - "is\n%s\n" % traceback_str) - - def handle_response(request, response_future): - try: - response = response_future.result() - except Exception as e: # pylint: disable=broad-except - logging.error( - 'Error processing instruction %s', - request.instruction_id, - exc_info=True) - response = beam_fn_api_pb2.InstructionResponse( - instruction_id=request.instruction_id, - error=str(e)) - responses.put(response) - - thread_pool.submit(do_instruction, work_request).add_done_callback( - functools.partial(handle_response, work_request)) - - process_requests() + for work_request in contol_stub.Control(get_responses()): + logging.info('Got work %s', work_request.instruction_id) + request_type = work_request.WhichOneof('request') + if request_type == ['process_bundle_progress']: + thread_pool = self._progress_thread_pool + else: + thread_pool = self._default_work_thread_pool + + # Need this wrapper to capture the original stack trace. + def do_instruction(request): + try: + return self.worker.do_instruction(request) + except Exception as e: # pylint: disable=broad-except + traceback_str = traceback.format_exc(e) + raise StandardError("Error processing request. Original traceback " + "is\n%s\n" % traceback_str) + + def handle_response(request, response_future): + try: + response = response_future.result() + except Exception as e: # pylint: disable=broad-except + logging.error( + 'Error processing instruction %s', + request.instruction_id, + exc_info=True) + response = beam_fn_api_pb2.InstructionResponse( + instruction_id=request.instruction_id, + error=str(e)) + responses.put(response) + + thread_pool.submit(do_instruction, work_request).add_done_callback( + functools.partial(handle_response, work_request)) + logging.info("No more requests from control plane") logging.info("SDK Harness waiting for in-flight requests to complete") # Wait until existing requests are processed.