Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [BEAM-3645] support multi processes for Python FnApiRunner with EmbeddedGrpcWorkerHandler #8769

Conversation

Hannah-Jiang
Copy link
Contributor

@Hannah-Jiang Hannah-Jiang commented Jun 5, 2019

Support multi processes for Python FnApiRunner with EmbeddedGrpcWorkerHandler

  • This PR is working on multiplexing part only. ParallelBundleManager part will be worked on a separate PR or added to current PR later.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@Hannah-Jiang
Copy link
Contributor Author

R: @robertwb , @aaltay , @pabloem @lukecwik
I created a working version (not final). I tested with simple pipelines and confirmed that tasks and data are multiplexed to multi processes. Can we do a short review to make sure I am good to move in this direction?

Thanks,
Hannah

@lukecwik lukecwik changed the title BEAM-3645 support multi processes for Python FnApiRunner with EmbeddedGrpcWorkerHandler [WIP] [BEAM-3645] support multi processes for Python FnApiRunner with EmbeddedGrpcWorkerHandler Jun 5, 2019
@Hannah-Jiang Hannah-Jiang force-pushed the fnapirunner-multi-processes-python branch from 0e3fc71 to fab22c5 Compare June 6, 2019 00:16
@Hannah-Jiang Hannah-Jiang force-pushed the fnapirunner-multi-processes-python branch from 5bf0aee to 303134d Compare June 6, 2019 22:24
@Hannah-Jiang
Copy link
Contributor Author

Hannah-Jiang commented Jun 6, 2019

Here I add some explanation to make it easier to understand what I am doing.

Assumption/Precondition:

  1. Since it is a local runner, workers are not added or removed dynamically during a job and we have fixed number of workers. Instead of reading signals from workers to dynamically change worker list, we create a fixed list of workers. A worker is added to the list when it is started and its life status is not monitored.

  2. Since it is a local runner, number of workers are not big (would be <=32). I used a dict to count current work load for each worker.

Load balancing algorithm:
A <worker_id : task_count> map is used to record current work load for each worker. When a new task is added to a worker's queue, increase load by 1, and when receive response
of a task, decrease load by 1. A new process_bundle request will be assigned to a worker who has minimum work load. At the end, task_count of all workers should be 0.

Task multiplexing:
A GRPC server with 1 control server, 1 data server, 1 state server and 1 logging server is used for communication between a runner and workers. A control handler is multiplexing tasks to workers. A <worker_id, task_queue> is maintained to store tasks. Runner identifies work_id from GRPC context and yield from the worker's task queue. A <instruction_id : worker_id> mapping is maintained
for multiplexing data and other tasks (process_bundle_progress, process_bundle_split).

Data multiplexing:
A server side round robin data channel is implemented for multiplexing. Data is stored for each worker and sent to the worker when it asks for data. It's a quite similar implementation as task multiplexing. No change with client side data channel, because we have one runner as it is.

@Hannah-Jiang
Copy link
Contributor Author

I listed many reviewers, it's unclear who I am exactly asking for review. @robertwb is it possible to do a short review for me? Thank you.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be going down the path of writing a single, multiplexing controller, but this adds complexity and in the end I don't think that's actually going to be the easiest interface to use when writing a ParallelBundleManager, and pushes the multiplexing complexity fairly low into the stack.

Instead, I think we'll want to have multiple BundleProcessors, each of them owning their own set of connections to a worker (called the "controller" in the code, but this could be named better as it includes a data and state and logging channel as well) and a portion of the input data. We'd need to update worker_handler_factory to create and cache multiple workers (up to a bounded limit) rather than just one, but we could get the whole thing working sharing the single worker (which can process multiple work items in parallel) before actually spinning up multiple processes (which should be a simple extension).


def _get_available_worker(self):
candidate = None
min_load = -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize to float('inf') rather than giving -1 a special meaning. Alternatively, check if candidate is not None rather than if min_load is -1.

candidate = None
min_load = -1
for worker, load in self._worker_load.items():
# if found a worker without any task, return it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this fall out of looking for the one with the minimum load?

name='run_worker', target=self.worker.run)
self.worker_thread.daemon = True
self.worker_thread.start()
_work_commend_line = b'%s -m apache_beam.runners.worker.sdk_worker_main' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be changing EmbeddedGrpcWorkerHandler to start subprocesses. Instead, we should be re-using SubprocessSdkWorkerHandler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Actually, the ability to manage multiple workers shouldn't be tied to a particular type of worker; we should be able to handle multiple docker workers, multiple in-process workers, multiple sub-process workers, etc. which would indicate this should be a new type of class that delegates to a set of WorkerHandlers. It is an optimization (that can come later) to share a single control and data service rather than start one for each worker.)

@@ -1238,6 +1312,19 @@ def process_bundle(self, inputs, expected_outputs):
process_bundle_descriptor_reference=self._bundle_descriptor.id))
result_future = self._controller.control_handler.push(process_bundle)

# send process bundle request first, then send data because we need to know
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work for the (non-threaded) direct case, as the request to process the bundle will block until all the data is available.

else:
_worker_id = None

if 'WORKER_COUNT' in os.environ:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these have the same meaning? I'd rather avoid passing this by environment, especially if the environment overrides the more explicit setting.

self.task_worker_mapping[item.instruction_id] = worker_id
else:
worker_id = self.task_worker_mapping[item.instruction_reference]
self._worker_load[worker_id] += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should non-process-bundle tasks add to the load? (Similarly with register above.)

@@ -305,6 +306,69 @@ def Data(self, elements_iterator, context):
for elements in self._write_outputs():
yield elements

class GrpcServerRoundRobinDataChannel(GrpcServerDataChannel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a multiplexing channel, not a round robin, channel, right?

worker.start()
self._worker_list.append(worker)
# add worker to control_handler
self.control_handler.queue_per_worker[worker_id] = queue.Queue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels odd to be reaching into control_handler and populating queue_per_worker here.

self._uid_counter = 0
self._state = self.UNSTARTED_STATE
self._lock = threading.Lock()
self._inputs = collections.defaultdict(iter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're mapping most members of this class to dicts. To me this indicates that it would be better to have a separate class that maintains a dict of worker ids to instances of this class (or possibly break this class up into the per-worker and not-per-worker portions).

@@ -90,33 +91,90 @@ class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer):

_DONE_MARKER = object()

task_worker_mapping = collections.defaultdict(str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these (class-level) globals?

class ParallelBundleManager(BundleManager):
_uid_counter = 0
def process_bundle(self, inputs, expected_outputs):
input_value = list(inputs.values())[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputs is a dict {input : buffer}. We'll want to split all of them up, not just the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_input[transform.unique_name] = pcoll_buffers[pcoll_id]
pcoll_buffers[buffer_id] = _GroupingBuffer(
              pre_gbk_coder, post_gbk_coder, windowing_strategy, self._num_workers)

We put only one buffer to each input, so taking the first one only is same as taking all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be the case in the future. If we want to make this assumption, we should at least assert it. But preferable IMHO to be general as that's not too hard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today I found a case where we shouldn't split inputs, which is a transform with timer and when use grpc handler. I would like to get your advice how to know when we should split inputs and when we shouldn't. I added a comment with examples at the new PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a way to work around by moving type check before entering to for loop. Fixed at the new PR.

_uid_counter = 0
def process_bundle(self, inputs, expected_outputs):
input_value = list(inputs.values())[0]
if isinstance(input_value, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These if statements to me suggest that we should have a base Buffer class, that has an append() and __iter__ method, rather than doing a type check here. (Actually, rather than a just an __iter__ method that does splitting, we also have a partition(n) method that returns multiple pieces of itself. Then we could write

partitioned_inputs = [{} for _ in range(num_workers)]
for name, input in inputs.items():
  for ix, part in enumerate(input.partition(num_workers)):
    partitioned_inputs[ix][name] = part

and then partitioned_inputs would be a list of dicts, one for each worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the iterator to return a list at a new PR, which is same as before.
Previously, we returned an iterator with one element, now it returns an iterator with N elements.
We can apply the same interface when we read either from list type or GroupingBuffer type and write to output stream.

for i, byte_stream in enumerate(byte_streams):
    if idx is None or i == idx:
        data_out.write(byte_stream)
        if idx is not None:
          break

If we introduce partition() function to GroupingBuffer, don't we need to check the type every time when we write to output_stream because list type doesn't have this function?
I don't have a clear idea to avoid type check here at the moment though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting that we create a newclass (that could subclass list if we want, but has a partition method) to accomplish this. This way the fact that there is parallelism doesn't leak through the stack up and down (e.g the BundleManager code can be used unchanged, rather than being passed all the inputs and then a(n easy to forget) flag of which ones to ignore, and also simplifies the Buffer class in that it doesn't have an extra attribute redundantly remembering the parallelism of the context it must be used in (plus all the locking, state-tracking, sleeping etc.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation. Your suggestion brings many advantages. I was able to get rid of handling threads and reuse BundleManager as it is by writing a new class with partition() method. It is changed at the new PR.

with futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
for i in range(num_workers):
ParallelBundleManager._uid_counter += 1
future = executor.submit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor.map would probably be easier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fixed at the new PR now.


elif isinstance(input_value, _GroupingBuffer):
#TODO: read it more general way
num_workers = min(EmbeddedGrpcWorkerHandler.num_workers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get this from a pipeline option, not EmbeddedGrpcWorkerHandler.num_workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed at the new PR.

for i in range(num_workers):
ParallelBundleManager._uid_counter += 1
future = executor.submit(
super(ParallelBundleManager, self).process_bundle, inputs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be cleaner to have several separate (ordinary) BundleManagers rather than calling the super method. Especially as they'll probably end up having different state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fixed at the new PR now.

@Hannah-Jiang
Copy link
Contributor Author

Close this PR because it is working on at #8872 and #8979

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants