Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trainer library discover master by etcd #2551

Merged
merged 17 commits into from
Jun 29, 2017
Merged
4 changes: 4 additions & 0 deletions go/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CMakeCache.txt
Copy link
Member

@jacquesqiao jacquesqiao Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this files? Since the build result is now moved to build dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

cmake_install.cmake
CMakeFiles
go
7 changes: 7 additions & 0 deletions go/master/c/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func (a addresser) Address() string {
return string(a)
}

//export paddle_new_etcd_master_client
func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client {
p := C.GoString(etcdEndpoints)
c := master.NewEtcdClient(addresser(p), bufSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just c := master.NewEtcdClient(p), bufSize) (remove addresser)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return add(c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉把NewEtcdClient里面的c := NewClient(etcdClient.ch, bufSize)放在这里比较好。这样master.EtcdClient就和master.Client不耦合了。EtcdClient负责和etcd交互,Client只读chan string里的新地址。这样unit test也比较好写。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
直接用了etcd_clint.go中的EtcdClient,因为想要复用NewEtcdClient所以将原有初始化时写入master地址的逻辑独立到一个Put函数中。

}

//export paddle_new_master_client
func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client {
a := C.GoString(addr)
Expand Down
76 changes: 76 additions & 0 deletions go/master/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package master

import (
"context"
"os"
"strings"
"time"

"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
)

const masterAddrPath = "/master"

// Addresser provide the address of the master server.
type Addresser interface {
Address() string
Expand All @@ -20,6 +25,51 @@ type Client struct {
ch chan []byte
}

// MasterAddresser provide master address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment function name must be the same to function name masterAddresser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
Delete MasterAddresser.

type masterAddresser struct {
client *clientv3.Client
timeout time.Duration
endpoints []string
}

// Address return the address
func (m masterAddresser) Address() string {
for {
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
resp, err := m.client.Get(ctx, masterAddrPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients should use watch to retrieve notification about master node events. masterMonitor should also change to handle these events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients should use watch to retrieve notification about master node events
I think it's a good idea, it will reduce the count for GET operator to etcd.

masterMonitor should also change to handle these events

I do not think masterMonitor will handle the events, we can handle these in a new method named masterAddresser.init(), and masterAddresser.Addresser() only return the latest master address.

Copy link
Contributor

@helinwang helinwang Jun 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally thought client can poll etcd to get the latest master address. But I think @typhoonzero 's idea is better.
Because Addresser is an polling interface, to get notification about the change rather than polling to get the change. We need to change the interface. One possible solution could be:
Change:

func NewClient(addr Addresser, bufSize int)

To:

func NewClient(addrCh <-chan string, bufSize int)

And maybe change moniterMaster to:

func (c *Client) monitorMaster(addrCh <-chan string) {
        lastMaster := ""
        for curMaster := range addrCh {
                // connect to the new address once address changed.                                                                                                                                                                         
                if curMaster != lastMaster {
                        if curMaster == "" {
                                err := c.conn.Close()
                                if err != nil {
                                        log.Errorln(err)
                                }
                        } else {
                                err := c.conn.Connect(curMaster)
                                if err != nil {
                                        log.Errorln(err)

                                        // connect to addr failed, set                                                                                                                                                                      
                                        // to last known addr in order                                                                                                                                                                      
                                        // to retry next time.                                                                                                                                                                              
                                        curMaster = lastMaster
                                }

                        }
                }
	        lastMaster = curMaster
        }
}

Copy link
Contributor Author

@Yancey1989 Yancey1989 Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
I think we also need an interface with etcd operator, so that we can do some unit test easier, I'll add this feature in the next PR.

Copy link
Contributor

@helinwang helinwang Jun 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Thanks!
We can use interface to make unit test easier. For example, the master server is taking an interface Store as the parameter: https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/service.go#L94.
The definition of store is:

type Store interface {
	Save([]byte) error
	Load() ([]byte, error)
}

We have an implementation of Store using etcd. But to test the master server, it's easy, we don't to run a etcd cluster. We can just write a stub interface for Store.
However this only tests master service. Does not test the etcd Store implementation. But that's a good start.

cancel()
if err != nil {
log.Errorf("Fetch master addr failed, reconnecting to etcd, %v", err)
err := m.client.Close()
if err != nil {
log.Errorln(err)
time.Sleep(m.timeout)
continue
}
// reconnect to etcd server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seem that etcd have a some kind of retry method for grpc but don't know how to use yet, may be help here https://github.com/coreos/etcd/blob/master/clientv3/retry.go#L28

m.client, err = clientv3.New(clientv3.Config{
Endpoints: m.endpoints,
DialTimeout: m.timeout,
})
if err != nil {
log.Errorf("Reconnecting etcd failed, sleep for %d seconds ...\n%v", m.timeout, err)
time.Sleep(m.timeout)
continue
}
continue
}
kvs := resp.Kvs
if len(kvs) == 0 {
log.Infoln("Waiting for master be ready ...")
time.Sleep(m.timeout)
continue
}
mAddr := kvs[0].Value
log.Debugf("Fetched master address: %s\n", mAddr)
return string(mAddr)
}
}

// NewClient creates a new Client.
//
// bufSize is the record buffer size. NextRecord will read from this
Expand All @@ -33,6 +83,32 @@ func NewClient(addr Addresser, bufSize int) *Client {
return c
}

// NewEtcdClient create a new master client by etcd
//
// endpoints is the endpoints for etcd and separated by ",", such as
// "172.0.1.0:2379,172.0.1.1:2379"
// timeout is the timeout for etcd calls
// bufSize is the record buffer size. NextRecord will read from this buffer.
func NewEtcdClient(endpoints string, timeout int, bufSize int) *Client {
t := time.Second * time.Duration(timeout)
ep := strings.Split(endpoints, ",")
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: t,
})
if err != nil {
log.Errorf("Init etcd connection failed: %v", err)
panic(err)
}
log.Debugf("Connected to etcd: %s\n", endpoints)
mAddresser := masterAddresser{
client: cli,
timeout: t,
endpoints: ep,
}
return NewClient(mAddresser, bufSize)
}

func (c *Client) getRecords() {
for {
t, err := c.getTask()
Expand Down
6 changes: 6 additions & 0 deletions go/master/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
"github.com/stretchr/testify/assert"
)

func TestNextRecord(t *testing.T) {
Expand Down Expand Up @@ -77,3 +78,8 @@ func TestNextRecord(t *testing.T) {
}
}
}

func TestNewEtcdClientFailed(t *testing.T) {
assert.Panics(t, func() { master.NewEtcdClient("localhost:1235", 3, 1) },
"Invalid etcd address should be panic.")
}