Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pserver etcd client #2559

Merged
merged 25 commits into from
Jul 4, 2017
Merged

Conversation

jacquesqiao
Copy link
Member

@jacquesqiao jacquesqiao commented Jun 22, 2017

fix: #2515

if etcd_addr == "" {
return C.PSERVER_ERROR
}
etcd_addrs := strings.Split(etcd_addr, ",")
Copy link
Contributor

@helinwang helinwang Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I should have mentioned this earlier, the cgo code are supposed to be just a thin layer. Because it's just a interface from C to Go. If it contains too much logic, then when we want to use Pserver from Go to Go, we need to duplicate the logic. Another reason is Go's test does not work well with cgo, it's supposed to be tested from C side. It's much easier to write test for Go.

You can create a new file in pserver package. E.g., etcd.go, and implement this interface: https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client.go#L26
And this interface:
https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client.go#L15

So cgo just need to do:

e := pserver.NewEtcd(addr) // we can call it NewEtcd, or some other name.
c := pserver.NewClient(e, len(as), e)
return add(c)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment! I will add a separate module that implement these interfaces.

return err
}
retryTimes := defaultRetryTimes
for retryTimes < 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to try infinitely because we don't know how long it will take til the etcd key is written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will optimize this strategy

time.Sleep(time.Second)
continue
}
ps_addr := string(kvs[0].Value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From design doc: https://github.com/PaddlePaddle/Paddle/tree/develop/doc/design/cluster_train#trainer-process-1

ps_addr is not fetched from one etcd key. Addresses are read from /ps/<id> which stands for IP address for each pserver, so ps_addr need to concat all these addresses.

And each trainer must "watch" pserver nodes hand handle events of pserver creation and loss.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done the first part, but I have not implemented the watcher because now the pserver client cannot change to new pserver when running, I have added a TODO and will implment this function in the following pr. Thank you!

})
if err != nil {
log.Errorln(err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type is C.paddle_pserver_client, I think the function would panic if client connect etcd failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// read ps desired number from etcd.
func(p pserverEtcdLister) desired() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p pserverEtcdLister -> p *pserverEtcdLister same as func(p pserverEtcdLister) List() []Server

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// read ps desired number from etcd.
func(p pserverEtcdLister) desired() int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -21,6 +21,10 @@ type ElementType int
const (
AlreadyInitialized = "pserver already initialized"
Uninitialized = "pserver not fully initialized"
// PsDesired is etcd path for store desired pserver count
DefaultPsDesiredPath = "/ps_desired"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: etcdv3 uses only key-values, thought the key looks like a path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

}
}

func(p pserverEtcdLister) List() []Server {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #2551 , trainers need to discover pservers and keep watch pservers using etcd's watch. I think you need to discuss with @Yancey1989 to see where wo implement the "etcd watcher"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding! I will discuss with your today~~

@@ -98,7 +98,7 @@ func (e *EtcdClient) Save(state []byte) error {
// We lost the master lock and can not acquire
// it back, it means some other master is
// already started. We don't want cluster
// managment system to kill the master server
// management system to kill the master server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have so many typos :p. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

func paddle_new_etcd_pserver_client(etcd_addr *C.char) C.paddle_pserver_client {
// TODO(helin): fault tolerant pserver client using etcd.
panic("not implemented.")
func paddle_new_etcd_pserver_client(etcd_addr *C.char, selected int) C.paddle_pserver_client {
Copy link
Contributor

@helinwang helinwang Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use etcd lock to decide which trainer is selected to initialize the parameter servers. This probably should done in the pserver package (not here in cgo) by implementing the pserver.Selector interface. I think in etcd_client.go mentioned in #2559 (comment) could be a good place to hold such code.

Don't need to do in the PR, but can you add a TODO, and it would be awesome if you can send a follow up PR for it?

Please see more detail here: https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/cluster_train/pserver_client.md#trainer-selection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a TODO to optimize this part and will learn how to use etcd lock to sync status.

ClientTest(t, c1)
}

func TestEtcdClient(t *testing.T) {
Copy link
Contributor

@helinwang helinwang Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestEtcdClient is not a self-contained test, it requires an etcd server to be running when the test is running. So the CI (after we setup cd go && go test ./...) will not run correctly if etcd is not running on 127.0.0.1:2379.

Maybe for know let's disable the test be renaming TestEtcdClient to EtcdClient, and user can enable manually when necessary.

We can explore embedding etcd later: #2504

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

)

const (
DefaultEtcdTimeout time.Duration = time.Second * time.Duration(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 * time.Second is fine. Constants in Go is very well designed: https://blog.golang.org/constants

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

servers := make([]Server, psDesired)
for {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
for i := 0; i < psDesired; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be batched into a single transaction, here is an related example: https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/etcd_client.go#L121
Something like:

resp, err := e.client.Txn(ctx).If().Then(get0, get1, get2, get3).Commit()

Feel free to improve it later, not a merge blocker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// TODO(Longfei) check the ps address
if psAddr == "" {
cancel()
log.Infof("Get psKey = %s, psAddr is null illegal", psKey, psAddr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one "%s", but two variable. go vet is a good tool for detecting this kind of error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done, thanx~

kvs := resp.Kvs
if len(kvs) == 0 {
log.Infof("Waiting for ps addr registered ...")
time.Sleep(p.timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel() need to be called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

DefaultEtcdTimeout time.Duration = time.Second * time.Duration(5)
)

type pserverEtcdLister struct {
Copy link
Contributor

@helinwang helinwang Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name could be more concise if it's something like etcdLister, the package is already pserver, people read it in their minds as: pserver etcd lister. If it's named as pserverEtcdLister, it's long, and does not read good: pserver pserver etcd lister.

This type does more than just Lister (tells desired number of pservers as well), maybe it's best to name it as EtcdClient (if rename, please change file name as well.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for this very valuable suggestion!

return servers
}

func NewEtcdAddrLister(endpoints string) (Lister, int) {
Copy link
Contributor

@helinwang helinwang Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's less abstraction (interface is an abstraction) if the return type is not an interface:PserverEtcdLister (I think it's a good idea to make pserverEtcdLister public, perhaps rename to EtcdClient, and desired public), than interface Lister.

The benefit of no abstraction unless necessary is not very obvious in the beginning, maybe the article below could explain more. I will give a not-so-good-example: At least it's easier to jump to the definition when not returning an interface (editor jumps to the function definition rather than interface definition).

A good general rule in Go would be: accept interfaces, return structs

// TODO(helin): fault tolerant pserver client using etcd.
panic("not implemented.")
func paddle_new_etcd_pserver_client(etcd_addr *C.char, selected int) C.paddle_pserver_client {
addr := C.GoString(etcd_addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etcd_addr => etcd_endpoints. We always use etcd_endpoints to represent comma separated etcd addresses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

//TODO(Qiao: tmperary disable etcdClient test for dependency of etcd)
func EtcdClient(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure to disable it before merging into develop branch. We need to make sure test always runs correctly, since other developers need to run test as well.

)

const (
DefaultEtcdTimeout time.Duration = 5 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI DefaultEtcdTimeout = 5 * time.Second could work as well.

DefaultEtcdTimeout time.Duration = 5 * time.Second
)

type EtcdCClient interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's hard to choose a name, since EtcdClient is already used ;)
However, EtcdCClient may not be a good name, because C Client means it's for C. But here is not really related to C (Go can use it as well). Maybe ClientEtcd?

// TODO(Longfei)
// 1. add watcher to watch the change state of pservers)
// 1. add etcd lock)
type EtcdCClientImpl struct {
Copy link
Contributor

@helinwang helinwang Jul 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very common pattern in Java: create interface, and write the implementation, this is due to Java has explicit interface. It's not required for Go: Go has implicit interface. Please see: https://medium.com/@cep21/preemptive-interface-anti-pattern-in-go-54c18ac0668a Unnecessary interface only adds complication to the program.

I think we need to stick to "accept interface, return struct" unless there is good reason not to.
In this case we need to remove type EtcdCClient interface, rename EtcdCClientImpl to EtcdCClient, and make func NewEtcdCClient(endpoints string) (*EtcdCClient, error) return pointer to the struct.

for i := 0; i < psDesired; i++ {
psKey := PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
resp, err := p.client.Get(ctx, psKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是改成

ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
resp, err := p.client.Get(ctx, psKey)
cancel()
if err != nil {
...

更清楚些?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的~

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方还不能这么改,因为是在一个循环里边,只能最后cancel。否则下次循环到这里就失败了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacquesqiao 下一个循环貌似又创建了一个新的ctx?

for ... {
  ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  resp, err := p.client.Get(ctx, psKey)
  cancel()
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -64,11 +71,11 @@ func (e *EtcdClient) Register() (int, error) {
log.Debugf("inited client to %s", e.etcdEndpoints)
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
// init /ps/desired using transaction, for multiple pservers may want to write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is not updated

DefaultEtcdTimeout time.Duration = 5 * time.Second
)

// TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo comment styles must be like // TODO: ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

// TODO
// 1. add watcher to watch the change state of pservers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment for exported structs and functions must be like // EtcdClient ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use gometalinter to check the code styles.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

continue
}

log.Debugf("Get psDesired number: %d\n", psDesired)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logrus formated log seems don't need a \n

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}
}

func (p *EtcdClient) List() []Server {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exported function must have comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

log.Debugf("Get psDesired number: %d\n", psDesired)
return psDesired
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break and return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.Debugf("checking %s", psKey)
resp, err := p.client.Get(ctx, psKey)
if err != nil {
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how the cancel() should be used. Should always place behind Get?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for i := 0; i < psDesired; i++ {
psKey := pserver.PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
resp, err := p.client.Get(ctx, psKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pserver clients or trainers may need to "watch" pserver node changes(like pserver down and restarted and recoverd on another node). But this may be in another PR, add some TODO here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will do this in next pr.

endpoints []string
}

// read ps desired number from etcd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Desired ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for {
cli, err = clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: timeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always use the DefaultEtcdTimeout, maybe we do not need the local variable timeout

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ep := strings.Split(endpoints, ",")
timeout := DefaultEtcdTimeout
var cli *clientv3.Client
var err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need the local variable err, and we can initialize this with cli, err := clientv3.New(...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cli will be used outside the for loop, so I have to declare it here, so I can't use cli, err := clientv3.New(...)..

return servers
}

func NewEtcd(endpoints string) (*EtcdClient, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the implementation of NewEtcd, it will never return the error with not nil, so I think we can return the error if we initialize the etc client failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused err msg

@@ -136,7 +143,7 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {
_, err := concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error {
registered := false
for i := 0; i < e.desired; i++ {
psKey := "/ps/" + strconv.Itoa(i)
psKey := PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

151行建议改成:

if ps != "" {
   continue
}
....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a common style with other code in paddle, so I will not change it this time.

// TODO: when implementing extending or reducing pservers, /ps_desired is
// changed, then we need to watch /ps_desired node for events. For now, just
// TODO: when implementing extending or reducing pservers, /ps/desired is
// changed, then we need to watch /ps/desired node for events. For now, just
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is not upated

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix done

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++ after comments updated.

Copy link
Contributor

@Yancey1989 Yancey1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@jacquesqiao jacquesqiao merged commit 9045063 into PaddlePaddle:develop Jul 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Trainer discover pservers by etcd.
5 participants