Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the concurrent programming design doc #6394

Merged
merged 2 commits into from Jan 5, 2018

Conversation

wangkuiyi
Copy link
Collaborator

No description provided.


### The Worker Program

The worker program looks like
Copy link
Contributor

@putcn putcn Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also give the example protobuf of the worker program?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding that.

x = fluid.listen_and_do(
fluid.k8s.self_addr(),
func(input Tensor) {
output = fluid.mult(input, W)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does listen_and_do both receives and sends values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. Let me make that clear.


The following table compares concepts in Fluid and Go

| Go | Fluid |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concepts of a programming language is not only for Go, all programming language have. People may not familiar with Go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other languages don't have goroutine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You can use the concept "coroutine". But never mind, either is OK.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, coroutine is completely different from goroutine. Nothing similar to each other indeed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goroutine is preemptive scheduled, coroutine need to give up the occupation of OS thread before it could run other coroutines. goroutine is implemented as a thread pool in the Go runtime library, coroutine is a special data type -- a stack of stack.


An explanation of the above program:

- `fluid.k8s` is a package that provides access to Kubernetes API.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we should have fluid.k8s package!

parent = 0,
vars = [x, y, index],
ops = [
slice(input = [X, index], output = x) # index is initialized by parallel_for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does slice should be under block0 because it must be done before the parallel operator. And also need a merge operator in block0 after parallel_for to merge sliced data from multiple threads/nodes.

Copy link
Contributor

@gongweibao gongweibao Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that slice only gets the Index data fragment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following your suggestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @gongweibao is right. Because block 1 is a sub-block of block 0, so it could refer to X defined in block 0. The variable index is created in block 1's scope.


### The Master Program

The master program could look like the following:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of transpiler converting the program to a multi-node version, for multi-threaded version may be quiet different.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the output from a transpiler differ from that from a human programmer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangkuiyi It should be the same. I mean:

  • User Program => Transpiler (multi-thread) => multi-threaded single node parallel program.
  • User Program -> Transpiler (multi-node) -> multi-node single threaded parallel program.
  • User Program => Transpiler (multi-node) -> Transpiler (multi-thread) => multi-node multi-threaded program.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is multi-threading -- both ParallelFor and ListenAndDo in this example create threads.

This example doesn't use multiple GPUs on the same node. But it would be easily changed to do that -- the worker program can use ParallelFor to further split the x it received and use multiple GPUs on a worker. I can add this part to an additional section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

Copy link
Contributor

@gongweibao gongweibao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we creating a map reduce model?

@@ -0,0 +1,163 @@
# Design Doc: Concurrent Programming with Fluid

Copy link
Contributor

@Superjomn Superjomn Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the parallel_do design and it is great.

But for curiosity, Is this design a concurrent programming use case just for distributed computation(data distribution)?

or it includes the future design for the whole concurrent programming support?

If the whole design, there needs some other concepts in addition to parallel_for, such as waiting_for or the low-level mutex and condition_variable.

And I wonder what is the capacity of fluid's concurrent programming? fully support the low-level concurrent concepts(very flexible) such as mutex or follow some classical framework(easy to use) such as map/reduce?

Copy link
Member

@QiJune QiJune Dec 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Layer is the user-defined function, it looks like we need a high order function map to do parallel training and inference.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChunweiYan and @PaddlePaddle/paddle Thanks for the question.

This design aims for the future. I completely understand we are going to have a stack of abstractions of concurrency. I will try to complete this document to include the following:

Go Fluid
parallel fluid.parallel.do and fluid.parallel.for
select SelectOp
channel Channel a new data type in VarDesc
goroutine GoOp

where GoOp creates a (green) thread in PaddlePaddle's thread pool.

SelectOp, Channel, and GoOp are the BSP primitives. It is mathematically proved that given these primitives, we no longer need semaphore and mutex. In practice, using the BSP primitives, it would be difficult to write concurrent programs that deadlocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem Go is under CSP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus, the comparations of parallel programming models: https://en.wikipedia.org/wiki/Parallel_programming_model.

By the way, "Model parallelism" is some kind of "Task parallelism" on the above page.


- `fluid.k8s` is a package that provides access to Kubernetes API.
- `fluid.k8s.get_worker_addrs` returns the list of IP and ports of all pods of the current job except for the current one (the master pod).
- `fluid.tensor_array` creates a [tensor array](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor_array.h). `fluid.parallel_for` creates a `ParallelFor` intrinsic, which, when executed,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need an operator like device_parallel.do()? Or device_parallel.do is a high-level wrapper of parallel_for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sub-graph is different between GPU parallel and CPU parallel, such as GPU parallel need Broadcast/All-Reduce Op to distribute/sync the paramters, and CPU parallel only need to fetch different data index in different threads.

So maybe we can allow the users to describe the Program by himself, and we can also offer a transpiler to convert the CPU sub-graph to GPU sub-graph.

```go
func main() {
W = Tensor(...)
x = fluid.listen_and_do(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. listen_and_do makes serving easy.

How about multiple devices serving? Just mix a parallel_for and listen to many ports? Or we have a pool of devices? Each time listen_and_do will get a device from that pool and put the same device back when return values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we support mixed device? If one block contains op both in GPU and CPU (while op), will we take two pool of device and create two executor?

Y = tensor_array(len(L))
parallel_for(input = X, output = Y,
attrs = {L, block_id(1)}) # referring to block 1
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering that how to deal with variable synchronization. How we notify the next op the variable is ready? And, how next op wait for the condition? Do we need a low level synchronize mechanism, such as conditional variable or mutex?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean how we make sure that parallel_for completes after all the threads complete? If so, the answer is that the ParallelForOp should wait until all threads complete. Just like the Go implementation of parallel.For always waits

https://github.com/wangkuiyi/parallel/blob/92b5cec6e73ec1ed9af283f69070b42a63fb94da/parallel.go#L91-L93.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!

```go
func main() {
W = Tensor(...)
x = fluid.listen_and_do(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will we support mixed device? If one block contains op both in GPU and CPU (while op), will we take two pool of device and create two executor?

2. once a connection is established,
1. creates a scope of two parameters, "input" and "output",
2. reads a [Fluid variable](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/variable.h) and saves it into "input",
3. creates an Executor instance and calls `Executor.Run(block)`, where the block is generated by running the lambda specified as the second parameter of `fluid.listen_and_do`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we can write a transpiler that takes a ProgramDesc message that represents the above example program and outputs two ProgramDesc messages, one for running on the master process/node, and the other one for worker processes/nodes.

From the above description, one ProgramDesc runs on the master process/node, other one runs on worker processes/nodes.

Here, does this worker program or fluid.listen_and_do need a recv op before mult op to receive x (a piece of X), and send op after mult op to send y (the output of current work node)? Or here is a Fluid program, can we show the ProgramDesc for fluid.listen_and_do like the protobuf message ProgramDesc in the part of Master Program from line 73 to line 95?

In addition, the line 134 is x = fluid.listen_and_do(), what x means here? x is a piece of X in the part of Master Program. Does this x returned by fluid.listen_and_do() is also a piece of X?

If I don't understand it, please forgive me. :)

slice(input = [X, index], output = x) # index is initialized by parallel_for
send(input = x, attrs = L[index])
recv(outputs = y, attrs = L[index])
assign(input = y, output = Y[index])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding assign(input = y, output = Y[index]) will copy the content of y into Y[index]. Do we need it here?

Maybe rather than:

recv(outputs = y, attrs = L[index])
assign(input = y, output = Y[index])

we can:

recv(outputs = Y[index], attrs = L[index])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! It would be great if we can do this -- assign into a tensor of a tensor array.

read(output = X)
kube_get_workers_addrs(output = L)
Y = tensor_array(len(L))
parallel_for(input = X, output = Y,
Copy link
Contributor

@helinwang helinwang Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We need a merge step after parallel_for to merge Y (a slice of tensors) into a single tensor.
  • Is parallel_for synchronous call or asynchronous call?
    • If asynchronous, we need a way to know when parallel_for is done.
    • If synchronous, and when there are two different parallel_for:
      parallel_for(...) // parallel_for A
      parallel_for(...) // parallel_for B
      
      How can we exploit the possible concurrency between parallel_for A and parallel_for B?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put these two parallel_for calls in a parallel_do call. In Go, it's like

parallel.Do( 
  func(){ parallel_for(...) } // parallel_for A
  func(){ parallel_for(...) } // parallel_for B
}

In Fluid, it could be

pd = fluid.parallel.do()
with pd.block():
  with fluid.parallel.for():
    # parallel_for A
with pd.block():
  with fluid.parallel.for():
    # parallel_for B

```go
func main() {
W = Tensor(...)
x = fluid.listen_and_do(
Copy link
Contributor

@helinwang helinwang Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think listen_and_do should be replaced by the recv op. In the master program example, the recv op is already used.
One program could send many variables to another program on a different node, we should not open a new port for each send (which I assume is what listen_and_do does). Instead of using listen_and_do, we can use recv op which will multiplex all receives on a single port, and reuses the same TCP connection.

kube_get_workers_addrs(output = L)
Y = tensor_array(len(L))
parallel_for(input = X, output = Y,
attrs = {L, block_id(1)}) # referring to block 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my best knowledge, attrs is written at compile time. At compile time, L don't have a value. Maybe the address should be passed in as a Var:

parallel_for(input = [X, L], ...)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! L should be an input, instead of an attribute. Thanks for pointing out this problem!

fluid.parallel_for(X, L,
func(index int) { //// block 1
x = X[index]
fluid.send(L[index], x)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think send and recv need a key as argument, because a program can issue multiple send to a node, without a key the send/recv pairs can not be associated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You are right. I ama adding it.

3. `fluid.run` calls `Executor.Run` to run the first block in the `ProgramDesc` message.
4. `Executor.Run`'s implementation is extremely simple -- it doesn't plan the execution nor create threads; instead, it runs on the current thread and execute intrinsics/operators' `Run` method sequentially as they appear in the `Block.ops` array.
5. Intrinsics/operators' `Run` method might create threads. For example, the `ListenAndDo` operator creates a thread to handle each incoming request.
6. Threads are not necessarily OS thread; instead, they could be [green threads](https://en.wikipedia.org/wiki/Green_threads) managed by ThreadPool. Multiple green threads might run on the same OS thread. An example green threads is Go's [goroutines](https://tour.golang.org/concurrency/1).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can't call it as green threads. I found some discussion here and here

And here it's defined as hybrid threading

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shall merge this so that further implementation PRs can go on.

@typhoonzero typhoonzero merged commit 5911644 into PaddlePaddle:develop Jan 5, 2018
PaddlePaddle Distributed Refactoring (Due: 201802) automation moved this from FUTURE to DONE Jan 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet