From bff8d8d8617593ed37901586c0d09af24de562b2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Jun 2017 00:01:08 +0800 Subject: [PATCH 01/14] add trainer library --- go/.gitignore | 4 ++ go/trainer/c/CMakeLists.txt | 23 +++++++++ go/trainer/c/cclient.go | 39 +++++++++++++++ go/trainer/c/test/CMakeLists.txt | 20 ++++++++ go/trainer/c/test/main.c | 4 ++ go/trainer/client.go | 86 ++++++++++++++++++++++++++++++++ go/trainer/client_test.go | 14 ++++++ 7 files changed, 190 insertions(+) create mode 100644 go/.gitignore create mode 100644 go/trainer/c/CMakeLists.txt create mode 100644 go/trainer/c/cclient.go create mode 100644 go/trainer/c/test/CMakeLists.txt create mode 100644 go/trainer/c/test/main.c create mode 100644 go/trainer/client.go create mode 100644 go/trainer/client_test.go diff --git a/go/.gitignore b/go/.gitignore new file mode 100644 index 0000000000000..275619bb34441 --- /dev/null +++ b/go/.gitignore @@ -0,0 +1,4 @@ +CMakeCache.txt +cmake_install.cmake +CMakeFiles +go diff --git a/go/trainer/c/CMakeLists.txt b/go/trainer/c/CMakeLists.txt new file mode 100644 index 0000000000000..fbdd0d6f8cd39 --- /dev/null +++ b/go/trainer/c/CMakeLists.txt @@ -0,0 +1,23 @@ +cmake_minimum_required(VERSION 3.0) + +get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") + +project(cxx_go C Go) + +include(golang) +include(flags) + +go_library(paddle_trainer STATIC) + +if(PROJ_ROOT) + add_custom_command(OUTPUT ${PROJ_ROOT}/paddle/trainer/libpaddle_trainer.a + COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_trainer.h ${PROJ_ROOT}/paddle/trainer/ + COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_trainer.a ${PROJ_ROOT}/paddle/trainer/ + WORKING_DIRECTORY ${PROJ_ROOT}/paddle + DEPENDS paddle_trainer) + add_custom_target(paddle_trainer_lib ALL DEPENDS ${PROJ_ROOT}/paddle/trainer/libpaddle_trainer.a) +endif(PROJ_ROOT) + +add_subdirectory(test) diff --git a/go/trainer/c/cclient.go b/go/trainer/c/cclient.go new file mode 100644 index 0000000000000..8547b1a7e64f7 --- /dev/null +++ b/go/trainer/c/cclient.go @@ -0,0 +1,39 @@ +package main + +/* +#include + +#define PADDLE_TRAINER_INIT_ERROR -1 +typedef int paddle_trainer; + +*/ +import "C" +import ( + "sync" + + "github.com/PaddlePaddle/Paddle/go/trainer" +) + +var mu sync.Mutex +var handleMap = make(map[C.paddle_trainer]*trainer.Trainer) +var curHandle C.paddle_pserver_client + +func add(t *trainer.Trainer) C.paddle_trainer { + mu.Lock() + defer mu.Unlock() + instance := curHandle + curHandle++ + handleMap[instance] = t + return trainer +} + +//export paddle_new_trainer +func paddle_new_trainer(endpoints *C.char) C.paddle_trainer { + t, err := trainer.NewTrainer(C.GoString(endpoints)) + if err != nil { + return C.PADDLE_TRAINER_ERROR + } + return add(t) +} + +func main() {} diff --git a/go/trainer/c/test/CMakeLists.txt b/go/trainer/c/test/CMakeLists.txt new file mode 100644 index 0000000000000..68525fa521458 --- /dev/null +++ b/go/trainer/c/test/CMakeLists.txt @@ -0,0 +1,20 @@ +cmake_minimum_required(VERSION 3.0) + +add_executable(main main.c) +add_dependencies(main paddle_trainer) + +if(APPLE) + set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") +else() + set(CMAKE_EXE_LINKER_FLAGS "-pthread") +endif() + +if(PROJ_ROOT) + include_directories(${CMAKE_CURRENT_BINARY_DIR}/..) + target_link_libraries(main ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_trainer.a pthread) + #target_link_libraries(test_trainer ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_trainer.a pthread) +else(PROJ_ROOT) + include_directories(${CMAKE_BINARY_DIR}) + target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_trainer.a pthread) + #target_link_libraries(test_trainer ${CMAKE_BINARY_DIR}/libpaddle_trainer.a pthread) +endif(PROJ_ROOT) diff --git a/go/trainer/c/test/main.c b/go/trainer/c/test/main.c new file mode 100644 index 0000000000000..4d7922f493564 --- /dev/null +++ b/go/trainer/c/test/main.c @@ -0,0 +1,4 @@ +#include "libpaddle_trainer.h" +int main() { + char etcd_addrs[] = "localhost:6789" trainer = paddle_new_trainer(etcd_addrs) +} diff --git a/go/trainer/client.go b/go/trainer/client.go new file mode 100644 index 0000000000000..8d65c6d4a7fd8 --- /dev/null +++ b/go/trainer/client.go @@ -0,0 +1,86 @@ +package trainer + +import ( + "context" + "errors" + "strings" + "time" + + master "github.com/PaddlePaddle/Paddle/go/master" + "github.com/coreos/etcd/clientv3" + log "github.com/sirupsen/logrus" +) + +const ( + defaultMasterAddrPath = "/master" + defaultMasterBufferSize = 1 + defaultMasterRetryTimes = 10 +) + +// Trainer is the identification of a trianer +type Trainer struct { + etcdEndpoints []string + etcdTimeout time.Duration + masterClient *master.Client + etcdClient *clientv3.Client +} + +// MasterAddresser is the addresser for master +type MasterAddresser string + +// Address return address with string type +func (m MasterAddresser) Address() string { + return string(m) +} + +// NewTrainer create a trainer adapter +func NewTrainer(etcdEndpoints string, timeout time.Duration) *Trainer { + t := Trainer{ + etcdEndpoints: strings.Split(etcdEndpoints, ","), + etcdTimeout: timeout, + } + return &t +} + +// Init initialized a trainer adapter +func (t *Trainer) Init() error { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: t.etcdEndpoints, + DialTimeout: t.etcdTimeout, + }) + if err != nil { + log.Errorln(err) + return err + } + t.etcdClient = cli + err = t.initMasterClient(defaultMasterRetryTimes) + if err != nil { + log.Errorln(err) + return err + } + return nil +} +func (t *Trainer) initMasterClient(retryTimes int) error { + for retryTimes < 0 { + retryTimes-- + ctx, cancel := context.WithTimeout(context.Background(), t.etcdTimeout) + resp, err := t.etcdClient.Get(ctx, defaultMasterAddrPath) + cancel() + if err != nil { + log.Errorln(err) + return err + } + kvs := resp.Kvs + if len(kvs) == 0 { + log.Infoln("Waiting for master process ready, sleep 5 seconds...") + time.Sleep(5 * time.Second) + continue + } + mAddr := MasterAddresser(kvs[0].Value) + mCli := master.NewClient(mAddr, defaultMasterBufferSize) + t.masterClient = mCli + return nil + } + log.Errorln("Executed the max retry times: %d", retryTimes) + return errors.New("initialize master client failed") +} diff --git a/go/trainer/client_test.go b/go/trainer/client_test.go new file mode 100644 index 0000000000000..c8a9078396970 --- /dev/null +++ b/go/trainer/client_test.go @@ -0,0 +1,14 @@ +package trainer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestInvaliedEtcdEndpoints(t *testing.T) { + trainer := NewTrainer("localhost:12345", 5*time.Second) + err := trainer.Init() + assert.NotNil(t, err, "Invalid etcd endpoints should be a nil client") +} From d80bd050c5b6e922c1003ce10b79d2ef4c898261 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Jun 2017 00:38:06 +0800 Subject: [PATCH 02/14] modifty file name --- go/trainer/c/{cclient.go => ctrainer.go} | 13 +++++-------- go/trainer/c/test/main.c | 3 ++- go/trainer/{client.go => trainer.go} | 0 go/trainer/{client_test.go => trainer_test.go} | 0 4 files changed, 7 insertions(+), 9 deletions(-) rename go/trainer/c/{cclient.go => ctrainer.go} (62%) rename go/trainer/{client.go => trainer.go} (100%) rename go/trainer/{client_test.go => trainer_test.go} (100%) diff --git a/go/trainer/c/cclient.go b/go/trainer/c/ctrainer.go similarity index 62% rename from go/trainer/c/cclient.go rename to go/trainer/c/ctrainer.go index 8547b1a7e64f7..4156748af436f 100644 --- a/go/trainer/c/cclient.go +++ b/go/trainer/c/ctrainer.go @@ -3,20 +3,20 @@ package main /* #include -#define PADDLE_TRAINER_INIT_ERROR -1 typedef int paddle_trainer; */ import "C" import ( "sync" + "time" "github.com/PaddlePaddle/Paddle/go/trainer" ) var mu sync.Mutex var handleMap = make(map[C.paddle_trainer]*trainer.Trainer) -var curHandle C.paddle_pserver_client +var curHandle C.paddle_trainer func add(t *trainer.Trainer) C.paddle_trainer { mu.Lock() @@ -24,15 +24,12 @@ func add(t *trainer.Trainer) C.paddle_trainer { instance := curHandle curHandle++ handleMap[instance] = t - return trainer + return instance } //export paddle_new_trainer -func paddle_new_trainer(endpoints *C.char) C.paddle_trainer { - t, err := trainer.NewTrainer(C.GoString(endpoints)) - if err != nil { - return C.PADDLE_TRAINER_ERROR - } +func paddle_new_trainer(endpoints *C.char, timeout C.int) C.paddle_trainer { + t := trainer.NewTrainer(C.GoString(endpoints), time.Second*time.Duration(timeout)) return add(t) } diff --git a/go/trainer/c/test/main.c b/go/trainer/c/test/main.c index 4d7922f493564..d90938cb52871 100644 --- a/go/trainer/c/test/main.c +++ b/go/trainer/c/test/main.c @@ -1,4 +1,5 @@ #include "libpaddle_trainer.h" int main() { - char etcd_addrs[] = "localhost:6789" trainer = paddle_new_trainer(etcd_addrs) + char etcd_addrs[] = "localhost:6789"; + trainer = paddle_new_trainer(etcd_addrs, 5) } diff --git a/go/trainer/client.go b/go/trainer/trainer.go similarity index 100% rename from go/trainer/client.go rename to go/trainer/trainer.go diff --git a/go/trainer/client_test.go b/go/trainer/trainer_test.go similarity index 100% rename from go/trainer/client_test.go rename to go/trainer/trainer_test.go From 475f77d89af2fc73abd4a2dc3c028415471c2d33 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Jun 2017 23:02:51 +0800 Subject: [PATCH 03/14] move trainer to master client --- go/master/c/client.go | 7 +++ go/master/client.go | 51 +++++++++++++++++++ go/master/client_test.go | 6 +++ go/trainer/c/CMakeLists.txt | 23 --------- go/trainer/c/ctrainer.go | 36 ------------- go/trainer/c/test/CMakeLists.txt | 20 -------- go/trainer/c/test/main.c | 5 -- go/trainer/trainer.go | 86 -------------------------------- go/trainer/trainer_test.go | 14 ------ 9 files changed, 64 insertions(+), 184 deletions(-) delete mode 100644 go/trainer/c/CMakeLists.txt delete mode 100644 go/trainer/c/ctrainer.go delete mode 100644 go/trainer/c/test/CMakeLists.txt delete mode 100644 go/trainer/c/test/main.c delete mode 100644 go/trainer/trainer.go delete mode 100644 go/trainer/trainer_test.go diff --git a/go/master/c/client.go b/go/master/c/client.go index b186474dc3313..0c77cea1984a6 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -54,6 +54,13 @@ func (a addresser) Address() string { return string(a) } +//export paddle_new_etcd_master_client +func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { + p := C.GoString(etcdEndpoints) + c := master.NewEtcdClient(addresser(p), bufSize) + return add(c) +} + //export paddle_new_master_client func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { a := C.GoString(addr) diff --git a/go/master/client.go b/go/master/client.go index 8451820c1963d..995bad70dda17 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -1,14 +1,19 @@ package master import ( + "context" "os" + "strings" "time" "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" + "github.com/coreos/etcd/clientv3" log "github.com/sirupsen/logrus" ) +const masterAddrPath = "/master" + // Addresser provide the address of the master server. type Addresser interface { Address() string @@ -20,6 +25,12 @@ type Client struct { ch chan []byte } +type MasterAddresser string + +func (m MasterAddresser) Address() string { + return string(m) +} + // NewClient creates a new Client. // // bufSize is the record buffer size. NextRecord will read from this @@ -33,6 +44,46 @@ func NewClient(addr Addresser, bufSize int) *Client { return c } +// NewEtcdClient create a new master client by etcd +// +// etcdEndpoints is the endpoints for etcd, it's separated by "," such as +// "172.0.1.0:2379,172.0.1.1:2379" +// bufSize is the record buffer size. NextRecord will read from this buffer. +func NewEtcdClient(etcdEndpoints string, etcdTimeout int, bufSize int) *Client { + timeout := time.Second * time.Duration(etcdTimeout) + ep := strings.Split(etcdEndpoints, ",") + cli, err := clientv3.New(clientv3.Config{ + Endpoints: ep, + DialTimeout: timeout, + }) + if err != nil { + log.Errorf("Init etcd connection failed: %v", err) + panic(err) + } + log.Debugf("Connected to etcd: %s\n", etcdEndpoints) + for { + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + resp, err := cli.Get(ctx, masterAddrPath) + cancel() + if err != nil { + log.Errorf("Fetch master addr failed, %v\n", err) + time.Sleep(timeout) + continue + } + kvs := resp.Kvs + if len(kvs) == 0 { + log.Infoln("Waiting for master be ready ...\n") + time.Sleep(timeout) + continue + } + + mAddr := kvs[0].Value + log.Debugf("Fetched master address: %s\n", mAddr) + return NewClient(MasterAddresser(mAddr), bufSize) + } +} + func (c *Client) getRecords() { for { t, err := c.getTask() diff --git a/go/master/client_test.go b/go/master/client_test.go index 2b3f873ecf3a6..efd9f15d0da8a 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -13,6 +13,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/recordio" + "github.com/stretchr/testify/assert" ) func TestNextRecord(t *testing.T) { @@ -77,3 +78,8 @@ func TestNextRecord(t *testing.T) { } } } + +func TestNewEtcdClientFailed(t *testing.T) { + assert.Panics(t, func() { master.NewEtcdClient("localhost:1235", 3, 1) }, + "Invalid etcd address should be panic.") +} diff --git a/go/trainer/c/CMakeLists.txt b/go/trainer/c/CMakeLists.txt deleted file mode 100644 index fbdd0d6f8cd39..0000000000000 --- a/go/trainer/c/CMakeLists.txt +++ /dev/null @@ -1,23 +0,0 @@ -cmake_minimum_required(VERSION 3.0) - -get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) -get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY) -set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake") - -project(cxx_go C Go) - -include(golang) -include(flags) - -go_library(paddle_trainer STATIC) - -if(PROJ_ROOT) - add_custom_command(OUTPUT ${PROJ_ROOT}/paddle/trainer/libpaddle_trainer.a - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_trainer.h ${PROJ_ROOT}/paddle/trainer/ - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_trainer.a ${PROJ_ROOT}/paddle/trainer/ - WORKING_DIRECTORY ${PROJ_ROOT}/paddle - DEPENDS paddle_trainer) - add_custom_target(paddle_trainer_lib ALL DEPENDS ${PROJ_ROOT}/paddle/trainer/libpaddle_trainer.a) -endif(PROJ_ROOT) - -add_subdirectory(test) diff --git a/go/trainer/c/ctrainer.go b/go/trainer/c/ctrainer.go deleted file mode 100644 index 4156748af436f..0000000000000 --- a/go/trainer/c/ctrainer.go +++ /dev/null @@ -1,36 +0,0 @@ -package main - -/* -#include - -typedef int paddle_trainer; - -*/ -import "C" -import ( - "sync" - "time" - - "github.com/PaddlePaddle/Paddle/go/trainer" -) - -var mu sync.Mutex -var handleMap = make(map[C.paddle_trainer]*trainer.Trainer) -var curHandle C.paddle_trainer - -func add(t *trainer.Trainer) C.paddle_trainer { - mu.Lock() - defer mu.Unlock() - instance := curHandle - curHandle++ - handleMap[instance] = t - return instance -} - -//export paddle_new_trainer -func paddle_new_trainer(endpoints *C.char, timeout C.int) C.paddle_trainer { - t := trainer.NewTrainer(C.GoString(endpoints), time.Second*time.Duration(timeout)) - return add(t) -} - -func main() {} diff --git a/go/trainer/c/test/CMakeLists.txt b/go/trainer/c/test/CMakeLists.txt deleted file mode 100644 index 68525fa521458..0000000000000 --- a/go/trainer/c/test/CMakeLists.txt +++ /dev/null @@ -1,20 +0,0 @@ -cmake_minimum_required(VERSION 3.0) - -add_executable(main main.c) -add_dependencies(main paddle_trainer) - -if(APPLE) - set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") -else() - set(CMAKE_EXE_LINKER_FLAGS "-pthread") -endif() - -if(PROJ_ROOT) - include_directories(${CMAKE_CURRENT_BINARY_DIR}/..) - target_link_libraries(main ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_trainer.a pthread) - #target_link_libraries(test_trainer ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_trainer.a pthread) -else(PROJ_ROOT) - include_directories(${CMAKE_BINARY_DIR}) - target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_trainer.a pthread) - #target_link_libraries(test_trainer ${CMAKE_BINARY_DIR}/libpaddle_trainer.a pthread) -endif(PROJ_ROOT) diff --git a/go/trainer/c/test/main.c b/go/trainer/c/test/main.c deleted file mode 100644 index d90938cb52871..0000000000000 --- a/go/trainer/c/test/main.c +++ /dev/null @@ -1,5 +0,0 @@ -#include "libpaddle_trainer.h" -int main() { - char etcd_addrs[] = "localhost:6789"; - trainer = paddle_new_trainer(etcd_addrs, 5) -} diff --git a/go/trainer/trainer.go b/go/trainer/trainer.go deleted file mode 100644 index 8d65c6d4a7fd8..0000000000000 --- a/go/trainer/trainer.go +++ /dev/null @@ -1,86 +0,0 @@ -package trainer - -import ( - "context" - "errors" - "strings" - "time" - - master "github.com/PaddlePaddle/Paddle/go/master" - "github.com/coreos/etcd/clientv3" - log "github.com/sirupsen/logrus" -) - -const ( - defaultMasterAddrPath = "/master" - defaultMasterBufferSize = 1 - defaultMasterRetryTimes = 10 -) - -// Trainer is the identification of a trianer -type Trainer struct { - etcdEndpoints []string - etcdTimeout time.Duration - masterClient *master.Client - etcdClient *clientv3.Client -} - -// MasterAddresser is the addresser for master -type MasterAddresser string - -// Address return address with string type -func (m MasterAddresser) Address() string { - return string(m) -} - -// NewTrainer create a trainer adapter -func NewTrainer(etcdEndpoints string, timeout time.Duration) *Trainer { - t := Trainer{ - etcdEndpoints: strings.Split(etcdEndpoints, ","), - etcdTimeout: timeout, - } - return &t -} - -// Init initialized a trainer adapter -func (t *Trainer) Init() error { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: t.etcdEndpoints, - DialTimeout: t.etcdTimeout, - }) - if err != nil { - log.Errorln(err) - return err - } - t.etcdClient = cli - err = t.initMasterClient(defaultMasterRetryTimes) - if err != nil { - log.Errorln(err) - return err - } - return nil -} -func (t *Trainer) initMasterClient(retryTimes int) error { - for retryTimes < 0 { - retryTimes-- - ctx, cancel := context.WithTimeout(context.Background(), t.etcdTimeout) - resp, err := t.etcdClient.Get(ctx, defaultMasterAddrPath) - cancel() - if err != nil { - log.Errorln(err) - return err - } - kvs := resp.Kvs - if len(kvs) == 0 { - log.Infoln("Waiting for master process ready, sleep 5 seconds...") - time.Sleep(5 * time.Second) - continue - } - mAddr := MasterAddresser(kvs[0].Value) - mCli := master.NewClient(mAddr, defaultMasterBufferSize) - t.masterClient = mCli - return nil - } - log.Errorln("Executed the max retry times: %d", retryTimes) - return errors.New("initialize master client failed") -} diff --git a/go/trainer/trainer_test.go b/go/trainer/trainer_test.go deleted file mode 100644 index c8a9078396970..0000000000000 --- a/go/trainer/trainer_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package trainer - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func TestInvaliedEtcdEndpoints(t *testing.T) { - trainer := NewTrainer("localhost:12345", 5*time.Second) - err := trainer.Init() - assert.NotNil(t, err, "Invalid etcd endpoints should be a nil client") -} From c1cfcdc2aac98951a5cb3ed26856ce5382731d21 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 22 Jun 2017 23:07:39 +0800 Subject: [PATCH 04/14] update --- go/master/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/master/client.go b/go/master/client.go index 995bad70dda17..40dcbd6d42833 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -25,8 +25,10 @@ type Client struct { ch chan []byte } +// MasterAddresser provide master address type MasterAddresser string +// Address return the address func (m MasterAddresser) Address() string { return string(m) } From 3c88fd21259a1378dd679664f1905b5a9a6437ee Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 23 Jun 2017 15:17:55 +0800 Subject: [PATCH 05/14] update --- go/master/client.go | 81 +++++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/go/master/client.go b/go/master/client.go index 40dcbd6d42833..96ea9e425aad1 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -26,11 +26,48 @@ type Client struct { } // MasterAddresser provide master address -type MasterAddresser string +type masterAddresser struct { + client *clientv3.Client + timeout time.Duration + endpoints []string +} // Address return the address -func (m MasterAddresser) Address() string { - return string(m) +func (m masterAddresser) Address() string { + for { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + resp, err := m.client.Get(ctx, masterAddrPath) + cancel() + if err != nil { + log.Errorf("Fetch master addr failed, reconnecting to etcd, %v", err) + err := m.client.Close() + if err != nil { + log.Errorln(err) + time.Sleep(m.timeout) + continue + } + // reconnect to etcd server + m.client, err = clientv3.New(clientv3.Config{ + Endpoints: m.endpoints, + DialTimeout: m.timeout, + }) + if err != nil { + log.Errorf("Reconnecting etcd failed, sleep for %d seconds ...\n%v", m.timeout, err) + time.Sleep(m.timeout) + continue + } + continue + } + kvs := resp.Kvs + if len(kvs) == 0 { + log.Infoln("Waiting for master be ready ...") + time.Sleep(m.timeout) + continue + } + mAddr := kvs[0].Value + log.Debugf("Fetched master address: %s\n", mAddr) + return string(mAddr) + } } // NewClient creates a new Client. @@ -48,42 +85,28 @@ func NewClient(addr Addresser, bufSize int) *Client { // NewEtcdClient create a new master client by etcd // -// etcdEndpoints is the endpoints for etcd, it's separated by "," such as +// endpoints is the endpoints for etcd and separated by ",", such as // "172.0.1.0:2379,172.0.1.1:2379" +// timeout is the timeout for etcd calls // bufSize is the record buffer size. NextRecord will read from this buffer. -func NewEtcdClient(etcdEndpoints string, etcdTimeout int, bufSize int) *Client { - timeout := time.Second * time.Duration(etcdTimeout) - ep := strings.Split(etcdEndpoints, ",") +func NewEtcdClient(endpoints string, timeout int, bufSize int) *Client { + t := time.Second * time.Duration(timeout) + ep := strings.Split(endpoints, ",") cli, err := clientv3.New(clientv3.Config{ Endpoints: ep, - DialTimeout: timeout, + DialTimeout: t, }) if err != nil { log.Errorf("Init etcd connection failed: %v", err) panic(err) } - log.Debugf("Connected to etcd: %s\n", etcdEndpoints) - for { - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - resp, err := cli.Get(ctx, masterAddrPath) - cancel() - if err != nil { - log.Errorf("Fetch master addr failed, %v\n", err) - time.Sleep(timeout) - continue - } - kvs := resp.Kvs - if len(kvs) == 0 { - log.Infoln("Waiting for master be ready ...\n") - time.Sleep(timeout) - continue - } - - mAddr := kvs[0].Value - log.Debugf("Fetched master address: %s\n", mAddr) - return NewClient(MasterAddresser(mAddr), bufSize) + log.Debugf("Connected to etcd: %s\n", endpoints) + mAddresser := masterAddresser{ + client: cli, + timeout: t, + endpoints: ep, } + return NewClient(mAddresser, bufSize) } func (c *Client) getRecords() { From cfeac09a5907f5af28d5ac9e761156a06a9a2045 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Mon, 26 Jun 2017 14:51:21 +0800 Subject: [PATCH 06/14] modify monitor master to receive a chan --- go/master/client.go | 172 +++++++++++++++--------------- go/master/client_internal_test.go | 13 +-- go/master/client_test.go | 8 +- 3 files changed, 92 insertions(+), 101 deletions(-) diff --git a/go/master/client.go b/go/master/client.go index 96ea9e425aad1..3cb3b06133527 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -1,7 +1,7 @@ package master import ( - "context" + "fmt" "os" "strings" "time" @@ -10,105 +10,38 @@ import ( "github.com/PaddlePaddle/recordio" "github.com/coreos/etcd/clientv3" log "github.com/sirupsen/logrus" + "golang.org/x/net/context" ) const masterAddrPath = "/master" -// Addresser provide the address of the master server. -type Addresser interface { - Address() string -} - // Client is the client of the master server. type Client struct { conn *connection.Conn ch chan []byte } -// MasterAddresser provide master address -type masterAddresser struct { +// EtcdClient is the client of +type EtcdClient struct { client *clientv3.Client - timeout time.Duration endpoints []string -} - -// Address return the address -func (m masterAddresser) Address() string { - for { - ctx, cancel := context.WithTimeout(context.Background(), m.timeout) - resp, err := m.client.Get(ctx, masterAddrPath) - cancel() - if err != nil { - log.Errorf("Fetch master addr failed, reconnecting to etcd, %v", err) - err := m.client.Close() - if err != nil { - log.Errorln(err) - time.Sleep(m.timeout) - continue - } - // reconnect to etcd server - m.client, err = clientv3.New(clientv3.Config{ - Endpoints: m.endpoints, - DialTimeout: m.timeout, - }) - if err != nil { - log.Errorf("Reconnecting etcd failed, sleep for %d seconds ...\n%v", m.timeout, err) - time.Sleep(m.timeout) - continue - } - continue - } - kvs := resp.Kvs - if len(kvs) == 0 { - log.Infoln("Waiting for master be ready ...") - time.Sleep(m.timeout) - continue - } - mAddr := kvs[0].Value - log.Debugf("Fetched master address: %s\n", mAddr) - return string(mAddr) - } + ch chan string + timeout time.Duration } // NewClient creates a new Client. // // bufSize is the record buffer size. NextRecord will read from this // buffer. -func NewClient(addr Addresser, bufSize int) *Client { +func NewClient(addrCh <-chan string, bufSize int) *Client { c := &Client{} c.conn = connection.New() c.ch = make(chan []byte, bufSize) - go c.monitorMaster(addr) + go c.monitorMaster(addrCh) go c.getRecords() return c } -// NewEtcdClient create a new master client by etcd -// -// endpoints is the endpoints for etcd and separated by ",", such as -// "172.0.1.0:2379,172.0.1.1:2379" -// timeout is the timeout for etcd calls -// bufSize is the record buffer size. NextRecord will read from this buffer. -func NewEtcdClient(endpoints string, timeout int, bufSize int) *Client { - t := time.Second * time.Duration(timeout) - ep := strings.Split(endpoints, ",") - cli, err := clientv3.New(clientv3.Config{ - Endpoints: ep, - DialTimeout: t, - }) - if err != nil { - log.Errorf("Init etcd connection failed: %v", err) - panic(err) - } - log.Debugf("Connected to etcd: %s\n", endpoints) - mAddresser := masterAddresser{ - client: cli, - timeout: t, - endpoints: ep, - } - return NewClient(mAddresser, bufSize) -} - func (c *Client) getRecords() { for { t, err := c.getTask() @@ -148,15 +81,14 @@ func (c *Client) getRecords() { } } -func (c *Client) monitorMaster(addr Addresser) { +func (c *Client) monitorMaster(addrCh <-chan string) { lastMaster := "" - monitor := func() { - // get the lastest address of the master server, + for curMaster := range addrCh { // connect to the new address once address changed. - curMaster := addr.Address() if curMaster != lastMaster { if curMaster == "" { err := c.conn.Close() + fmt.Printf("close conn error: %s", err) if err != nil { log.Errorln(err) } @@ -170,18 +102,10 @@ func (c *Client) monitorMaster(addr Addresser) { // to retry next time. curMaster = lastMaster } - } } - lastMaster = curMaster } - - monitor() - ticker := time.NewTicker(10 * time.Second) - for _ = range ticker.C { - monitor() - } } // SetDataset set dataset for the master server to dispatch. @@ -211,3 +135,77 @@ func (c *Client) taskFinished(taskID int) error { func (c *Client) NextRecord() []byte { return <-c.ch } + +// NewEtcdClient create a new master client by etcd +// +// endpoints is the endpoints for etcd and separated by ",", such as +// "172.0.1.0:2379,172.0.1.1:2379" +// timeout is the timeout for etcd calls +// bufSize is the record buffer size. NextRecord will read from this buffer. +func NewEtcdClient(endpoints string, timeout int, bufSize int) *Client { + t := time.Second * time.Duration(timeout) + ep := strings.Split(endpoints, ",") + cli, err := clientv3.New(clientv3.Config{ + Endpoints: ep, + DialTimeout: t, + }) + if err != nil { + log.Errorf("Init etcd connection failed: %v", err) + panic(err) + } + log.Debugf("Connected to etcd: %s\n", endpoints) + etcdClient := EtcdClient{ + client: cli, + timeout: t, + endpoints: ep, + } + etcdClient.ch = make(chan string) + c := NewClient(etcdClient.ch, bufSize) + //go etcdClient.monitorMasterAddr() + etcdClient.initMasterAddr(masterAddrPath) + go etcdClient.monitorMasterAddr() + return c +} +func (e *EtcdClient) initMasterAddr(key string) { + for { + ctx, cancel := context.WithTimeout(context.Background(), e.timeout) + resp, err := e.client.Get(ctx, masterAddrPath) + cancel() + if err != nil { + log.Errorf("etcd get key: %s failed: %s, sleep for %d seconds and reconnect...", + key, err, e.timeout) + time.Sleep(e.timeout) + err = e.client.Close() + if err != nil { + log.Error(err) + } + e.client, err = clientv3.New(clientv3.Config{ + Endpoints: e.endpoints, + DialTimeout: e.timeout, + }) + if err != nil { + log.Error(err) + } + continue + } + if len(resp.Kvs) == 0 { + log.Errorf("etcd key: %s does not exists, sleep %d seconds...", key, e.timeout/time.Second) + time.Sleep(e.timeout) + continue + } + mAddr := string(resp.Kvs[0].Value) + e.ch <- mAddr + break + } + fmt.Println("init master addr finished.") +} +func (e *EtcdClient) monitorMasterAddr() { + rch := e.client.Watch(context.Background(), masterAddrPath) + for wresp := range rch { + for _, ev := range wresp.Events { + // if event type is DELETE, ev.Kv.Value will be a empty string and Client + // will close the connection + e.ch <- string(ev.Kv.Value) + } + } +} diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index 00fcca0e2cf44..29cffee69fd0c 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -26,12 +26,6 @@ func init() { log.SetLevel(log.ErrorLevel) } -type TestAddresser string - -func (a TestAddresser) Address() string { - return string(a) -} - func TestGetFinishTask(t *testing.T) { const path = "/tmp/master_client_test_0" @@ -45,7 +39,6 @@ func TestGetFinishTask(t *testing.T) { if err != nil { panic(err) } - go func(l net.Listener) { s := NewService(chunkPerTask, time.Second, 1) server := rpc.NewServer() @@ -78,9 +71,11 @@ func TestGetFinishTask(t *testing.T) { // Manually intialize client to avoid calling c.getRecords() c := &Client{} c.conn = connection.New() - go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p))) + addr := fmt.Sprintf(":%d", p) + ch := make(chan string) + go c.monitorMaster(ch) + ch <- addr c.SetDataset([]string{path}) - checkOnePass := func(i int) { var tasks []Task for idx := 0; idx < totalTask; idx++ { diff --git a/go/master/client_test.go b/go/master/client_test.go index efd9f15d0da8a..056d3cf208372 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -21,7 +21,6 @@ func TestNextRecord(t *testing.T) { path = "/tmp/master_client_TestFull" total = 50 ) - l, err := net.Listen("tcp", ":0") if err != nil { panic(err) @@ -32,7 +31,6 @@ func TestNextRecord(t *testing.T) { if err != nil { panic(err) } - go func(l net.Listener) { s := master.NewService(10, time.Second, 1) server := rpc.NewServer() @@ -60,10 +58,10 @@ func TestNextRecord(t *testing.T) { } w.Close() f.Close() - - c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10) + curAddr := make(chan string) + c := master.NewClient(curAddr, 10) + curAddr <- fmt.Sprintf(":%d", p) c.SetDataset([]string{path}) - for pass := 0; pass < 50; pass++ { received := make(map[byte]bool) for i := 0; i < total; i++ { From ff3324cae0c3ef6e9b8dad12393c6345838f4026 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 27 Jun 2017 15:18:36 +0800 Subject: [PATCH 07/14] update --- go/.gitignore | 4 -- go/master/c/client.go | 18 +++--- go/master/client.go | 98 ++++--------------------------- go/master/client_internal_test.go | 2 +- go/master/client_test.go | 19 +++++- 5 files changed, 36 insertions(+), 105 deletions(-) delete mode 100644 go/.gitignore diff --git a/go/.gitignore b/go/.gitignore deleted file mode 100644 index 275619bb34441..0000000000000 --- a/go/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -CMakeCache.txt -cmake_install.cmake -CMakeFiles -go diff --git a/go/master/c/client.go b/go/master/c/client.go index 0c77cea1984a6..99e9a52fe0036 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -48,23 +48,23 @@ func remove(client C.paddle_master_client) *master.Client { return h } -type addresser string - -func (a addresser) Address() string { - return string(a) -} - //export paddle_new_etcd_master_client -func paddle_new_etcd_master_client(etcdEndpoints *C.char, bufSize int) C.paddle_master_client { +func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize int) C.paddle_master_client { p := C.GoString(etcdEndpoints) - c := master.NewEtcdClient(addresser(p), bufSize) + e, err := master.NewEtcdClient(p, timeout) + if err != nil { + panic(err) + } + c := master.NewEtcdMasterClient(e, bufSize) return add(c) } //export paddle_new_master_client func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { a := C.GoString(addr) - c := master.NewClient(addresser(a), bufSize) + ch := make(chan string) + c := master.NewClient(ch, bufSize) + ch <- a return add(c) } diff --git a/go/master/client.go b/go/master/client.go index 3cb3b06133527..094d80f5e88eb 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -3,32 +3,18 @@ package master import ( "fmt" "os" - "strings" - "time" "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" - "github.com/coreos/etcd/clientv3" log "github.com/sirupsen/logrus" - "golang.org/x/net/context" ) -const masterAddrPath = "/master" - // Client is the client of the master server. type Client struct { conn *connection.Conn ch chan []byte } -// EtcdClient is the client of -type EtcdClient struct { - client *clientv3.Client - endpoints []string - ch chan string - timeout time.Duration -} - // NewClient creates a new Client. // // bufSize is the record buffer size. NextRecord will read from this @@ -42,6 +28,16 @@ func NewClient(addrCh <-chan string, bufSize int) *Client { return c } +// NewEtcdMasterClient creates a new Client by etcd +func NewEtcdMasterClient(db DBOperator, bufSize int) *Client { + ch := make(chan string) + c := NewClient(ch, bufSize) + v := db.Get(MasterAddrKey) + go db.WatchWithKey(MasterAddrKey, ch) + ch <- v + return c +} + func (c *Client) getRecords() { for { t, err := c.getTask() @@ -135,77 +131,3 @@ func (c *Client) taskFinished(taskID int) error { func (c *Client) NextRecord() []byte { return <-c.ch } - -// NewEtcdClient create a new master client by etcd -// -// endpoints is the endpoints for etcd and separated by ",", such as -// "172.0.1.0:2379,172.0.1.1:2379" -// timeout is the timeout for etcd calls -// bufSize is the record buffer size. NextRecord will read from this buffer. -func NewEtcdClient(endpoints string, timeout int, bufSize int) *Client { - t := time.Second * time.Duration(timeout) - ep := strings.Split(endpoints, ",") - cli, err := clientv3.New(clientv3.Config{ - Endpoints: ep, - DialTimeout: t, - }) - if err != nil { - log.Errorf("Init etcd connection failed: %v", err) - panic(err) - } - log.Debugf("Connected to etcd: %s\n", endpoints) - etcdClient := EtcdClient{ - client: cli, - timeout: t, - endpoints: ep, - } - etcdClient.ch = make(chan string) - c := NewClient(etcdClient.ch, bufSize) - //go etcdClient.monitorMasterAddr() - etcdClient.initMasterAddr(masterAddrPath) - go etcdClient.monitorMasterAddr() - return c -} -func (e *EtcdClient) initMasterAddr(key string) { - for { - ctx, cancel := context.WithTimeout(context.Background(), e.timeout) - resp, err := e.client.Get(ctx, masterAddrPath) - cancel() - if err != nil { - log.Errorf("etcd get key: %s failed: %s, sleep for %d seconds and reconnect...", - key, err, e.timeout) - time.Sleep(e.timeout) - err = e.client.Close() - if err != nil { - log.Error(err) - } - e.client, err = clientv3.New(clientv3.Config{ - Endpoints: e.endpoints, - DialTimeout: e.timeout, - }) - if err != nil { - log.Error(err) - } - continue - } - if len(resp.Kvs) == 0 { - log.Errorf("etcd key: %s does not exists, sleep %d seconds...", key, e.timeout/time.Second) - time.Sleep(e.timeout) - continue - } - mAddr := string(resp.Kvs[0].Value) - e.ch <- mAddr - break - } - fmt.Println("init master addr finished.") -} -func (e *EtcdClient) monitorMasterAddr() { - rch := e.client.Watch(context.Background(), masterAddrPath) - for wresp := range rch { - for _, ev := range wresp.Events { - // if event type is DELETE, ev.Kv.Value will be a empty string and Client - // will close the connection - e.ch <- string(ev.Kv.Value) - } - } -} diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index 29cffee69fd0c..044fe44deefd1 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -42,7 +42,7 @@ func TestGetFinishTask(t *testing.T) { go func(l net.Listener) { s := NewService(chunkPerTask, time.Second, 1) server := rpc.NewServer() - err := server.Register(s) + err = server.Register(s) if err != nil { panic(err) } diff --git a/go/master/client_test.go b/go/master/client_test.go index 056d3cf208372..e60bb771c549d 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -16,6 +16,18 @@ import ( "github.com/stretchr/testify/assert" ) +type testDB struct{} + +func (testDB) Get(key string) string { + return "localhost:12345" +} +func (testDB) WatchWithKey(key string, ch chan string) { + for i := 1; i < 100; i++ { + ch <- fmt.Sprintf("localhost:%d", 12345+i) + time.Sleep(time.Second * time.Duration(1)) + } +} + func TestNextRecord(t *testing.T) { const ( path = "/tmp/master_client_TestFull" @@ -77,7 +89,8 @@ func TestNextRecord(t *testing.T) { } } -func TestNewEtcdClientFailed(t *testing.T) { - assert.Panics(t, func() { master.NewEtcdClient("localhost:1235", 3, 1) }, - "Invalid etcd address should be panic.") +func TestNewEtcdMasterClient(t *testing.T) { + db := testDB{} + c := master.NewEtcdMasterClient(db, 3) + assert.NotNil(t, c) } From 77cbc5507fb6ae864e60468142c7b276d28cf018 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 27 Jun 2017 18:10:15 +0800 Subject: [PATCH 08/14] use etcd client from etcd_client.go --- go/cmd/master/master.go | 6 +++- go/master/c/client.go | 3 +- go/master/client.go | 8 ++--- go/master/client_test.go | 11 +++--- go/master/etcd_client.go | 74 ++++++++++++++++++++++++++++++++++------ 5 files changed, 80 insertions(+), 22 deletions(-) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 54fa254863156..985c84e8b6005 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -38,7 +38,11 @@ func main() { } addr := fmt.Sprintf("%s:%d", ip, *port) - store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) + store, err = master.NewEtcdClient(eps, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) + if err != nil { + log.Fatal(err) + } + err = store.Put(master.DefaultAddrPath, addr) if err != nil { log.Fatal(err) } diff --git a/go/master/c/client.go b/go/master/c/client.go index 99e9a52fe0036..3724e48eaf91e 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -13,6 +13,7 @@ typedef int paddle_master_client; import "C" import ( + "strings" "sync" "unsafe" @@ -51,7 +52,7 @@ func remove(client C.paddle_master_client) *master.Client { //export paddle_new_etcd_master_client func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize int) C.paddle_master_client { p := C.GoString(etcdEndpoints) - e, err := master.NewEtcdClient(p, timeout) + e, err := master.NewEtcdClient(strings.Split(p, ","), master.DefaultAddrPath, master.DefaultLockPath, master.DefaultStatePath, 3) if err != nil { panic(err) } diff --git a/go/master/client.go b/go/master/client.go index 094d80f5e88eb..a070a3d518fb8 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -1,7 +1,6 @@ package master import ( - "fmt" "os" "github.com/PaddlePaddle/Paddle/go/connection" @@ -32,9 +31,9 @@ func NewClient(addrCh <-chan string, bufSize int) *Client { func NewEtcdMasterClient(db DBOperator, bufSize int) *Client { ch := make(chan string) c := NewClient(ch, bufSize) - v := db.Get(MasterAddrKey) - go db.WatchWithKey(MasterAddrKey, ch) - ch <- v + v := db.BlockedGet(DefaultAddrPath, 3) + ch <- string(v) + go db.WatchWithKey(DefaultAddrPath, ch) return c } @@ -84,7 +83,6 @@ func (c *Client) monitorMaster(addrCh <-chan string) { if curMaster != lastMaster { if curMaster == "" { err := c.conn.Close() - fmt.Printf("close conn error: %s", err) if err != nil { log.Errorln(err) } diff --git a/go/master/client_test.go b/go/master/client_test.go index f6916248d6b5b..aac9bfbc1843b 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -18,12 +18,15 @@ import ( type testDB struct{} -func (testDB) Get(key string) string { - return "localhost:12345" +func (testDB) Save(state []byte) error { return nil } +func (testDB) Load() ([]byte, error) { return nil, nil } +func (testDB) Put(key, value string) error { return nil } +func (testDB) BlockedGet(key string, interval int) []byte { + return []byte("localhost:12345") } -func (testDB) WatchWithKey(key string, ch chan string) { +func (testDB) WatchWithKey(key string, valCh chan string) { for i := 1; i < 100; i++ { - ch <- fmt.Sprintf("localhost:%d", 12345+i) + valCh <- fmt.Sprintf("localhost:%d", 12345+i) time.Sleep(time.Second * time.Duration(1)) } } diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index b7293a759896f..ceb73de7ca60d 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -18,6 +18,15 @@ const ( DefaultAddrPath = "/master/addr" ) +// DBOperator is an interface fo database operator, it's useful for test +type DBOperator interface { + Save(state []byte) error + Load() ([]byte, error) + Put(key, value string) error + BlockedGet(key string, interval int) []byte + WatchWithKey(key string, valChan chan string) +} + // EtcdClient is the etcd client that master uses for fault tolerance // and service registry. type EtcdClient struct { @@ -28,7 +37,7 @@ type EtcdClient struct { } // NewEtcdClient creates a new EtcdClient. -func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { +func NewEtcdClient(endpoints []string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { log.Debugf("Connecting to etcd at %v", endpoints) // TODO(helin): gracefully shutdown etcd store. Becuase etcd // store holds a etcd lock, even though the lock will expire @@ -60,16 +69,6 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat } log.Debugf("Successfully acquired lock at %s.", lockPath) - put := clientv3.OpPut(addrPath, string(addr)) - resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit() - if err != nil { - return nil, err - } - - if !resp.Succeeded { - log.Fatal("No longer owns the master lock. Exiting.") - } - e := &EtcdClient{ lockPath: lockPath, statePath: statePath, @@ -80,6 +79,26 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat return e, nil } +// Put puts the key and value +func (e *EtcdClient) Put(key, value string) error { + ctx := context.TODO() + put := clientv3.OpPut(key, value) + resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit() + if err != nil { + return err + } + if !resp.Succeeded { + log.Errorln("No longer owns the lock, trying to lock and load again.") + err = e.lock.Lock(context.Background()) + if err != nil { + return err + } + return e.Put(key, value) + } + + return nil +} + // Save saves the state into the etcd. func (e *EtcdClient) Save(state []byte) error { ctx := context.TODO() @@ -142,3 +161,36 @@ func (e *EtcdClient) Load() ([]byte, error) { state := kvs[0].Value return state, nil } + +// BlockedGet gets value from the specify key, if the key does not exists, this method will be blocked +func (e *EtcdClient) BlockedGet(key string, timeout int) []byte { + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) + resp, err := e.client.Get(ctx, key) + cancel() + if err != nil { + log.Error(err) + time.Sleep(time.Second * time.Duration(3)) + continue + } + kvs := resp.Kvs + if len(kvs) == 0 { + log.Error(err) + continue + } + v := kvs[0].Value + return v + } +} + +// WatchWithKey watch the specify key and send to valChan +func (e *EtcdClient) WatchWithKey(key string, valChan chan string) { + rch := e.client.Watch(context.Background(), key) + for wresp := range rch { + for _, ev := range wresp.Events { + log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) + valChan <- string(ev.Kv.Value) + } + } + +} From f691bf618c044f86884ec3cbd9a6f2b71f476f7e Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 27 Jun 2017 20:40:44 +0800 Subject: [PATCH 09/14] update --- go/cmd/master/master.go | 6 +--- go/master/c/client.go | 2 +- go/master/client.go | 4 +-- go/master/client_test.go | 5 +-- go/master/etcd_client.go | 66 ++++++++++++++++++++++------------------ 5 files changed, 41 insertions(+), 42 deletions(-) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 985c84e8b6005..54fa254863156 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -38,11 +38,7 @@ func main() { } addr := fmt.Sprintf("%s:%d", ip, *port) - store, err = master.NewEtcdClient(eps, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) - if err != nil { - log.Fatal(err) - } - err = store.Put(master.DefaultAddrPath, addr) + store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) if err != nil { log.Fatal(err) } diff --git a/go/master/c/client.go b/go/master/c/client.go index 3724e48eaf91e..9864cdb6c8b13 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -52,7 +52,7 @@ func remove(client C.paddle_master_client) *master.Client { //export paddle_new_etcd_master_client func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize int) C.paddle_master_client { p := C.GoString(etcdEndpoints) - e, err := master.NewEtcdClient(strings.Split(p, ","), master.DefaultAddrPath, master.DefaultLockPath, master.DefaultStatePath, 3) + e, err := master.NewEtcdClientWithoutLock(strings.Split(p, ",")) if err != nil { panic(err) } diff --git a/go/master/client.go b/go/master/client.go index a070a3d518fb8..e52270badf91c 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -28,10 +28,10 @@ func NewClient(addrCh <-chan string, bufSize int) *Client { } // NewEtcdMasterClient creates a new Client by etcd -func NewEtcdMasterClient(db DBOperator, bufSize int) *Client { +func NewEtcdMasterClient(db DatabaseOperator, bufSize int) *Client { ch := make(chan string) c := NewClient(ch, bufSize) - v := db.BlockedGet(DefaultAddrPath, 3) + v := db.WaitMasterReady(DefaultAddrPath, 3) ch <- string(v) go db.WatchWithKey(DefaultAddrPath, ch) return c diff --git a/go/master/client_test.go b/go/master/client_test.go index aac9bfbc1843b..f58f65451b14d 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -18,10 +18,7 @@ import ( type testDB struct{} -func (testDB) Save(state []byte) error { return nil } -func (testDB) Load() ([]byte, error) { return nil, nil } -func (testDB) Put(key, value string) error { return nil } -func (testDB) BlockedGet(key string, interval int) []byte { +func (testDB) WaitMasterReady(key string, interval int) []byte { return []byte("localhost:12345") } func (testDB) WatchWithKey(key string, valCh chan string) { diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index ceb73de7ca60d..7824e18f4d706 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -18,12 +18,9 @@ const ( DefaultAddrPath = "/master/addr" ) -// DBOperator is an interface fo database operator, it's useful for test -type DBOperator interface { - Save(state []byte) error - Load() ([]byte, error) - Put(key, value string) error - BlockedGet(key string, interval int) []byte +// DatabaseOperator is an interface fo database operator, it's useful for unittest +type DatabaseOperator interface { + WaitMasterReady(key string, interval int) []byte WatchWithKey(key string, valChan chan string) } @@ -36,8 +33,25 @@ type EtcdClient struct { lock *concurrency.Mutex } -// NewEtcdClient creates a new EtcdClient. -func NewEtcdClient(endpoints []string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { +// NewEtcdClientWithoutLock creates a new EtcdClient without lock +func NewEtcdClientWithoutLock(endpoints []string) (*EtcdClient, error) { + log.Debugf("Connecting to etcd at %v", endpoints) + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + }) + if err != nil { + return nil, err + } + e := &EtcdClient{ + client: cli, + } + + return e, nil +} + +// NewEtcdClient creates a new EtcdClient +func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { log.Debugf("Connecting to etcd at %v", endpoints) // TODO(helin): gracefully shutdown etcd store. Becuase etcd // store holds a etcd lock, even though the lock will expire @@ -69,6 +83,16 @@ func NewEtcdClient(endpoints []string, lockPath, addrPath, statePath string, ttl } log.Debugf("Successfully acquired lock at %s.", lockPath) + put := clientv3.OpPut(addrPath, string(addr)) + resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit() + if err != nil { + return nil, err + } + + if !resp.Succeeded { + log.Fatal("No longer owns the master lock. Exiting.") + } + e := &EtcdClient{ lockPath: lockPath, statePath: statePath, @@ -79,26 +103,6 @@ func NewEtcdClient(endpoints []string, lockPath, addrPath, statePath string, ttl return e, nil } -// Put puts the key and value -func (e *EtcdClient) Put(key, value string) error { - ctx := context.TODO() - put := clientv3.OpPut(key, value) - resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit() - if err != nil { - return err - } - if !resp.Succeeded { - log.Errorln("No longer owns the lock, trying to lock and load again.") - err = e.lock.Lock(context.Background()) - if err != nil { - return err - } - return e.Put(key, value) - } - - return nil -} - // Save saves the state into the etcd. func (e *EtcdClient) Save(state []byte) error { ctx := context.TODO() @@ -162,8 +166,8 @@ func (e *EtcdClient) Load() ([]byte, error) { return state, nil } -// BlockedGet gets value from the specify key, if the key does not exists, this method will be blocked -func (e *EtcdClient) BlockedGet(key string, timeout int) []byte { +// WaitMasterReady will wait for master is ready +func (e *EtcdClient) WaitMasterReady(key string, timeout int) []byte { for { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) resp, err := e.client.Get(ctx, key) @@ -176,6 +180,7 @@ func (e *EtcdClient) BlockedGet(key string, timeout int) []byte { kvs := resp.Kvs if len(kvs) == 0 { log.Error(err) + time.Sleep(time.Second * time.Duration(3)) continue } v := kvs[0].Value @@ -188,6 +193,7 @@ func (e *EtcdClient) WatchWithKey(key string, valChan chan string) { rch := e.client.Watch(context.Background(), key) for wresp := range rch { for _, ev := range wresp.Events { + // if received event is DELETE, the value will be an empty string log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) valChan <- string(ev.Kv.Value) } From e6492ff28ba91abb4a1119df3f76317ec4be24fc Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Tue, 27 Jun 2017 23:09:08 +0800 Subject: [PATCH 10/14] update --- go/master/etcd_client.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index 7824e18f4d706..5fa712054f909 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -18,12 +18,19 @@ const ( DefaultAddrPath = "/master/addr" ) -// DatabaseOperator is an interface fo database operator, it's useful for unittest +// DatabaseOperator is an interface fo database operator, +// it's useful for unittest type DatabaseOperator interface { WaitMasterReady(key string, interval int) []byte WatchWithKey(key string, valChan chan string) } +// EtcdClientWithoutLock is the etcd client the master users +// for service discovery +type EtcdClientWithoutLock struct { + client *clientv3.Client +} + // EtcdClient is the etcd client that master uses for fault tolerance // and service registry. type EtcdClient struct { @@ -34,7 +41,7 @@ type EtcdClient struct { } // NewEtcdClientWithoutLock creates a new EtcdClient without lock -func NewEtcdClientWithoutLock(endpoints []string) (*EtcdClient, error) { +func NewEtcdClientWithoutLock(endpoints []string) (*EtcdClientWithoutLock, error) { log.Debugf("Connecting to etcd at %v", endpoints) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, @@ -43,7 +50,7 @@ func NewEtcdClientWithoutLock(endpoints []string) (*EtcdClient, error) { if err != nil { return nil, err } - e := &EtcdClient{ + e := &EtcdClientWithoutLock{ client: cli, } @@ -167,7 +174,7 @@ func (e *EtcdClient) Load() ([]byte, error) { } // WaitMasterReady will wait for master is ready -func (e *EtcdClient) WaitMasterReady(key string, timeout int) []byte { +func (e *EtcdClientWithoutLock) WaitMasterReady(key string, timeout int) []byte { for { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) resp, err := e.client.Get(ctx, key) @@ -189,7 +196,7 @@ func (e *EtcdClient) WaitMasterReady(key string, timeout int) []byte { } // WatchWithKey watch the specify key and send to valChan -func (e *EtcdClient) WatchWithKey(key string, valChan chan string) { +func (e *EtcdClientWithoutLock) WatchWithKey(key string, valChan chan string) { rch := e.client.Watch(context.Background(), key) for wresp := range rch { for _, ev := range wresp.Events { From ca6a114726d9bfb0463e71ad53a89239f2b806b2 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 28 Jun 2017 11:55:32 +0800 Subject: [PATCH 11/14] remove etcd client without lock --- go/master/c/client.go | 16 ++++++++--- go/master/client.go | 10 ------- go/master/client_internal_test.go | 4 +-- go/master/client_test.go | 23 ++-------------- go/master/etcd_client.go | 44 +++++++++++-------------------- 5 files changed, 32 insertions(+), 65 deletions(-) diff --git a/go/master/c/client.go b/go/master/c/client.go index 9864cdb6c8b13..0f35304db2f19 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -15,9 +15,11 @@ import "C" import ( "strings" "sync" + "time" "unsafe" "github.com/PaddlePaddle/Paddle/go/master" + "github.com/coreos/etcd/clientv3" log "github.com/sirupsen/logrus" ) @@ -52,20 +54,26 @@ func remove(client C.paddle_master_client) *master.Client { //export paddle_new_etcd_master_client func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize int) C.paddle_master_client { p := C.GoString(etcdEndpoints) - e, err := master.NewEtcdClientWithoutLock(strings.Split(p, ",")) + cli, err := clientv3.New(clientv3.Config{ + Endpoints: strings.Split(p, ","), + DialTimeout: time.Second * time.Duration(timeout), + }) if err != nil { panic(err) } - c := master.NewEtcdMasterClient(e, bufSize) + ch := make(chan string, 1) + ch <- master.GetKey(cli, master.DefaultAddrPath) + go db.WatchKey(master.DefaultAddrPath) + c := master.NewClient(ch, bufSize) return add(c) } //export paddle_new_master_client func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client { a := C.GoString(addr) - ch := make(chan string) - c := master.NewClient(ch, bufSize) + ch := make(chan string, 1) ch <- a + c := master.NewClient(ch, bufSize) return add(c) } diff --git a/go/master/client.go b/go/master/client.go index e52270badf91c..d3bea49d0a816 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -27,16 +27,6 @@ func NewClient(addrCh <-chan string, bufSize int) *Client { return c } -// NewEtcdMasterClient creates a new Client by etcd -func NewEtcdMasterClient(db DatabaseOperator, bufSize int) *Client { - ch := make(chan string) - c := NewClient(ch, bufSize) - v := db.WaitMasterReady(DefaultAddrPath, 3) - ch <- string(v) - go db.WatchWithKey(DefaultAddrPath, ch) - return c -} - func (c *Client) getRecords() { for { t, err := c.getTask() diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index 55189fb3e9018..364dce7b58cf6 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -76,9 +76,9 @@ func TestGetFinishTask(t *testing.T) { c := &Client{} c.conn = connection.New() addr := fmt.Sprintf(":%d", p) - ch := make(chan string) - go c.monitorMaster(ch) + ch := make(chan string, 1) ch <- addr + go c.monitorMaster(ch) c.SetDataset([]string{path}) checkOnePass := func(i int) { var tasks []Task diff --git a/go/master/client_test.go b/go/master/client_test.go index f58f65451b14d..c00aeebfd5d1f 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -13,21 +13,8 @@ import ( "github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/recordio" - "github.com/stretchr/testify/assert" ) -type testDB struct{} - -func (testDB) WaitMasterReady(key string, interval int) []byte { - return []byte("localhost:12345") -} -func (testDB) WatchWithKey(key string, valCh chan string) { - for i := 1; i < 100; i++ { - valCh <- fmt.Sprintf("localhost:%d", 12345+i) - time.Sleep(time.Second * time.Duration(1)) - } -} - func TestNextRecord(t *testing.T) { const ( path = "/tmp/master_client_TestFull" @@ -74,9 +61,9 @@ func TestNextRecord(t *testing.T) { } w.Close() f.Close() - curAddr := make(chan string) - c := master.NewClient(curAddr, 10) + curAddr := make(chan string, 1) curAddr <- fmt.Sprintf(":%d", p) + c := master.NewClient(curAddr, 10) c.SetDataset([]string{path}) for pass := 0; pass < 50; pass++ { received := make(map[byte]bool) @@ -92,9 +79,3 @@ func TestNextRecord(t *testing.T) { } } } - -func TestNewEtcdMasterClient(t *testing.T) { - db := testDB{} - c := master.NewEtcdMasterClient(db, 3) - assert.NotNil(t, c) -} diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index 7824e18f4d706..3f7e6d824a819 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -18,12 +18,6 @@ const ( DefaultAddrPath = "/master/addr" ) -// DatabaseOperator is an interface fo database operator, it's useful for unittest -type DatabaseOperator interface { - WaitMasterReady(key string, interval int) []byte - WatchWithKey(key string, valChan chan string) -} - // EtcdClient is the etcd client that master uses for fault tolerance // and service registry. type EtcdClient struct { @@ -166,31 +160,25 @@ func (e *EtcdClient) Load() ([]byte, error) { return state, nil } -// WaitMasterReady will wait for master is ready -func (e *EtcdClient) WaitMasterReady(key string, timeout int) []byte { - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) - resp, err := e.client.Get(ctx, key) - cancel() - if err != nil { - log.Error(err) - time.Sleep(time.Second * time.Duration(3)) - continue - } - kvs := resp.Kvs - if len(kvs) == 0 { - log.Error(err) - time.Sleep(time.Second * time.Duration(3)) - continue - } - v := kvs[0].Value - return v +// GetKey get the specify key +func GetKey(c *clientv3.Client, key string, timeout int) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) + resp, err := c.Get(ctx, key) + cancel() + if err != nil { + return "", err + } + kvs := resp.Kvs + if len(kvs) == 0 { + return "", nil } + v := kvs[0].Value + return string(v), nil } -// WatchWithKey watch the specify key and send to valChan -func (e *EtcdClient) WatchWithKey(key string, valChan chan string) { - rch := e.client.Watch(context.Background(), key) +// WatchKey watch the specify key and send to valChan +func WatchKey(c *clientv3.Client, key string, valChan chan<- string) { + rch := c.Watch(context.Background(), key) for wresp := range rch { for _, ev := range wresp.Events { // if received event is DELETE, the value will be an empty string From d4c06ed8808f41a5bf076faedf6b61f28d0f7c8a Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 28 Jun 2017 12:46:15 +0800 Subject: [PATCH 12/14] update --- go/master/c/client.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/master/c/client.go b/go/master/c/client.go index 0f35304db2f19..9e35e986002c0 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -62,8 +62,12 @@ func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize i panic(err) } ch := make(chan string, 1) - ch <- master.GetKey(cli, master.DefaultAddrPath) - go db.WatchKey(master.DefaultAddrPath) + a, err := master.GetKey(cli, master.DefaultAddrPath, timeout) + if err != nil { + panic(err) + } + ch <- a + go master.WatchKey(cli, master.DefaultAddrPath, ch) c := master.NewClient(ch, bufSize) return add(c) } From 3b0206742feaadf66589e1fe032b7cccffc5148b Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Wed, 28 Jun 2017 12:51:59 +0800 Subject: [PATCH 13/14] update the comment --- go/master/etcd_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index 7fb4f1fed1b37..4c38d93ece7de 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -143,7 +143,7 @@ func (e *EtcdClient) Load() ([]byte, error) { return state, nil } -// GetKey get the specify key +// GetKey gets the value by the specify key func GetKey(c *clientv3.Client, key string, timeout int) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) resp, err := c.Get(ctx, key) @@ -159,7 +159,7 @@ func GetKey(c *clientv3.Client, key string, timeout int) (string, error) { return string(v), nil } -// WatchKey watch the specify key and send to valChan +// WatchKey watches the specify key and send to valChan if there is some event func WatchKey(c *clientv3.Client, key string, valChan chan<- string) { rch := c.Watch(context.Background(), key) for wresp := range rch { From 0d8a0ed5b0f4e5d381a1ced0fef7a11d68255383 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 29 Jun 2017 09:52:24 +0800 Subject: [PATCH 14/14] update commonts --- go/master/etcd_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index 4c38d93ece7de..34c0a8a9f756a 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -27,7 +27,7 @@ type EtcdClient struct { lock *concurrency.Mutex } -// NewEtcdClient creates a new EtcdClient +// NewEtcdClient creates a new EtcdClient. func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { log.Debugf("Connecting to etcd at %v", endpoints) // TODO(helin): gracefully shutdown etcd store. Becuase etcd @@ -143,7 +143,7 @@ func (e *EtcdClient) Load() ([]byte, error) { return state, nil } -// GetKey gets the value by the specify key +// GetKey gets the value by the specify key. func GetKey(c *clientv3.Client, key string, timeout int) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) resp, err := c.Get(ctx, key) @@ -159,7 +159,7 @@ func GetKey(c *clientv3.Client, key string, timeout int) (string, error) { return string(v), nil } -// WatchKey watches the specify key and send to valChan if there is some event +// WatchKey watches the specify key and send to valChan if there is some event. func WatchKey(c *clientv3.Client, key string, valChan chan<- string) { rch := c.Watch(context.Background(), key) for wresp := range rch {