From fd8937556f95db4086ce095efa1e83041c896334 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Tue, 20 Jun 2017 23:57:07 +0000 Subject: [PATCH 1/6] Master save and load state from etcd --- go/cmd/master/master.go | 55 ++++++++++-- go/master/client_internal_test.go | 21 ++++- go/master/client_test.go | 21 ++++- go/master/etcd_store.go | 133 ++++++++++++++++++++++++++++ go/master/service.go | 142 +++++++++++++++++++++++------- go/pserver/cclient/cclient.go | 6 +- 6 files changed, 330 insertions(+), 48 deletions(-) create mode 100644 go/master/etcd_store.go diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 25cd1cafcdf32..49ad0300b8315 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -5,41 +5,80 @@ import ( "net/http" "net/rpc" "strconv" + "strings" + "sync" "time" "github.com/namsral/flag" + log "github.com/sirupsen/logrus" "github.com/PaddlePaddle/Paddle/go/master" ) +type inMemStore struct { + mu sync.Mutex + buf []byte +} + +func (m *inMemStore) Save(b []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.buf = b + return nil +} + +func (m *inMemStore) Load() ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + + return m.buf, nil +} + func main() { port := flag.Int("port", 8080, "port of the master server.") - faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).") + ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.") + endpoints := flag.String("endpoints", "", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.") taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") flag.Parse() - if *faultTolerance { - panic("fault tolernance not implemented.") + if *endpoints == "" { + log.Warningln("-endpoints not set, fault tolerance not be enabled.") + } + + var store master.Store + if *endpoints != "" { + eps := strings.Split(*endpoints, ",") + var err error + store, err = master.NewEtcdStore(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) + if err != nil { + log.Fatal(err) + } + } else { + store = &inMemStore{} + } + s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) + if err != nil { + log.Fatal(err) } - s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) - err := rpc.Register(s) + err = rpc.Register(s) if err != nil { - panic(err) + log.Fatal(err) } rpc.HandleHTTP() l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) if err != nil { - panic(err) + log.Fatal(err) } err = http.Serve(l, nil) if err != nil { - panic(err) + log.Fatal(err) } } diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index 00fcca0e2cf44..a5b76fe8530a1 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -32,6 +32,19 @@ func (a TestAddresser) Address() string { return string(a) } +type myStore struct { + buf []byte +} + +func (m *myStore) Save(b []byte) error { + m.buf = b + return nil +} + +func (m *myStore) Load() ([]byte, error) { + return m.buf, nil +} + func TestGetFinishTask(t *testing.T) { const path = "/tmp/master_client_test_0" @@ -47,9 +60,13 @@ func TestGetFinishTask(t *testing.T) { } go func(l net.Listener) { - s := NewService(chunkPerTask, time.Second, 1) + s, err := NewService(&myStore{}, chunkPerTask, time.Second, 1) + if err != nil { + panic(err) + } + server := rpc.NewServer() - err := server.Register(s) + err = server.Register(s) if err != nil { panic(err) } diff --git a/go/master/client_test.go b/go/master/client_test.go index 2b3f873ecf3a6..ae5f17c2d4901 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -15,6 +15,19 @@ import ( "github.com/PaddlePaddle/recordio" ) +type myStore struct { + buf []byte +} + +func (m *myStore) Save(b []byte) error { + m.buf = b + return nil +} + +func (m *myStore) Load() ([]byte, error) { + return m.buf, nil +} + func TestNextRecord(t *testing.T) { const ( path = "/tmp/master_client_TestFull" @@ -33,9 +46,13 @@ func TestNextRecord(t *testing.T) { } go func(l net.Listener) { - s := master.NewService(10, time.Second, 1) + s, err := master.NewService(&myStore{}, 10, time.Second, 1) + if err != nil { + panic(err) + } + server := rpc.NewServer() - err := server.Register(s) + err = server.Register(s) if err != nil { panic(err) } diff --git a/go/master/etcd_store.go b/go/master/etcd_store.go new file mode 100644 index 0000000000000..ce178370ff90d --- /dev/null +++ b/go/master/etcd_store.go @@ -0,0 +1,133 @@ +package master + +import ( + "context" + "sync" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + log "github.com/sirupsen/logrus" +) + +const ( + // DefaultLockPath is the default etcd master lock path. + DefaultLockPath = "/master/lock" + // DefaultStatePath is the default etcd key for master state. + DefaultStatePath = "/master/state" +) + +// EtcdStore is the Store implementation backed by etcd. +type EtcdStore struct { + lockPath string + statePath string + ttlSec int + client *clientv3.Client + + mu sync.Mutex + lock *concurrency.Mutex +} + +// NewEtcdStore creates a new EtcdStore. +func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (*EtcdStore, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + }) + if err != nil { + return nil, err + } + + sess, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlSec)) + if err != nil { + return nil, err + } + + lock := concurrency.NewMutex(sess, lockPath) + // It's fine for the lock to get stuck, in this case we have + // multiple master servers running (only configured to have + // one master running, but split-brain problem may cuase + // multiple master servers running), and the cluster management + // software will kill one of them. + log.Infof("Trying to acquire lock at %s.", lockPath) + err = lock.Lock(context.TODO()) + if err != nil { + return nil, err + } + log.Infof("Successfully acquired lock at %s.", lockPath) + + e := &EtcdStore{} + e.client = cli + e.lock = lock + e.lockPath = lockPath + e.statePath = statePath + e.ttlSec = ttlSec + return e, nil +} + +// Save saves the state into the etcd. +func (e *EtcdStore) Save(state []byte) error { + e.mu.Lock() + defer e.mu.Unlock() + + ctx := context.TODO() + put := clientv3.OpPut(e.statePath, string(state)) + resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit() + if err != nil { + return err + } + + if !resp.Succeeded { + log.Errorln("No longer owns the lock, trying to lock and save again.") + sess, err := concurrency.NewSession(e.client, concurrency.WithTTL(e.ttlSec)) + if err != nil { + return err + } + + e.lock = concurrency.NewMutex(sess, e.lockPath) + log.Infof("Try to acquire lock at %s.", e.lockPath) + err = e.lock.Lock(context.TODO()) + if err != nil { + return err + } + log.Infof("Successfully acquired lock at %s.", e.lockPath) + return e.Save(state) + } + + return nil +} + +// Load loads the state from etcd. +func (e *EtcdStore) Load() ([]byte, error) { + e.mu.Lock() + ctx := context.TODO() + get := clientv3.OpGet(e.statePath) + + resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(get).Commit() + if err != nil { + return nil, err + } + + if !resp.Succeeded { + log.Errorln("No longer owns the lock, trying to lock and load again.") + sess, err := concurrency.NewSession(e.client) + if err != nil { + return nil, err + } + + e.lock = concurrency.NewMutex(sess, e.lockPath) + e.lock.Lock(context.TODO()) + e.mu.Unlock() + return e.Load() + } + + kvs := resp.Responses[0].GetResponseRange().Kvs + if len(kvs) == 0 { + // No state exists + e.mu.Unlock() + return nil, nil + } + + state := kvs[0].Value + e.mu.Unlock() + return state, nil +} diff --git a/go/master/service.go b/go/master/service.go index 55e1e2d1a4a5c..d453777b05555 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -1,6 +1,9 @@ package master import ( + "bytes" + "compress/gzip" + "encoding/gob" "errors" "os" "path/filepath" @@ -12,24 +15,54 @@ import ( "github.com/PaddlePaddle/recordio" ) +const ( + dialTimeout = 5 * time.Second +) + +// Store is the interface for save and load the master state. +type Store interface { + Save([]byte) error + Load() ([]byte, error) +} + +// Chunk is a chunk of data consisted of several data instances. +type Chunk struct { + Path string + Index recordio.Index // chunk index +} + +// Task is the basic unit of data instances assigned to trainers. +type Task struct { + ID int + Chunks []Chunk +} + +type taskEntry struct { + Epoch int + NumTimeout int + Task Task +} + +type taskQueues struct { + Todo []taskEntry + Pending map[int]taskEntry // map from task ID to task entry + Done []taskEntry + Failed []Task +} + // Service is the master server service. type Service struct { chunksPerTask int timeoutDur time.Duration timeoutMax int ready chan struct{} + store Store mu sync.Mutex initDone bool taskQueues taskQueues } -// Recover recovers service state from etcd. -func Recover() (*Service, error) { - // TODO(helin): recover from snapshot state from etcd. - return nil, nil -} - func partition(chunks []Chunk, chunksPerTask int) []taskEntry { id := 0 if chunksPerTask <= 0 { @@ -58,7 +91,7 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry { } // NewService creates a new service. -func NewService(chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service { +func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) (*Service, error) { s := &Service{} s.chunksPerTask = chunksPerTask s.timeoutDur = timeoutDur @@ -66,38 +99,81 @@ func NewService(chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Se s.taskQueues = taskQueues{} s.taskQueues.Pending = make(map[int]taskEntry) s.ready = make(chan struct{}) - return s -} + s.store = store + recovered, err := s.recover() + if err != nil { + return nil, err + } -// Chunk is a chunk of data consisted of several data instances. -type Chunk struct { - Path string - Index recordio.Index // chunk index -} + if recovered { + // Recovered. Now the state is already initialized, + // and the master is ready. + s.initDone = true + close(s.ready) + } -// Task is the basic unit of data instances assigned to trainers. -type Task struct { - ID int - Chunks []Chunk + return s, nil } -type taskEntry struct { - Epoch int - NumTimeout int - Task Task -} +// recover recovers service state from etcd. +func (s *Service) recover() (bool, error) { + state, err := s.store.Load() + if err != nil { + return false, err + } -type taskQueues struct { - Todo []taskEntry - Pending map[int]taskEntry // map from task ID to task entry - Done []taskEntry - Failed []Task + if state == nil { + log.Infoln("No state exists, not recovered.") + return false, nil + } + + log.Infof("Loaded snapshot of size: %d bytes.", len(state)) + gr, err := gzip.NewReader(bytes.NewReader(state)) + if err != nil { + return false, err + } + + dec := gob.NewDecoder(gr) + var tqs taskQueues + err = dec.Decode(&tqs) + if err != nil { + return false, err + } + + err = gr.Close() + if err != nil { + // Only close failed, recover actually succeed, so + // just log error. + log.Errorln(err) + } + + s.taskQueues = tqs + return true, nil } -// *must* be called with s.mu being held. +// snapshot *must* be called with s.mu being held. func (s *Service) snapshot() error { - // TODO(helin): snapshot state on etcd. - return nil + // TOOD(helin): etcd request has a size limit, so the snapshot + // size is limited by the max request size. We should either + // divide the snapshot into smaller chunks and save under + // different keys, or configure the request size to be big + // enough: + // https://github.com/coreos/etcd/blob/2f84f3d8d8ed8f9537ab6ffa44a3a1c7eddfa9b1/embed/config.go#L44 + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + enc := gob.NewEncoder(gw) + err := enc.Encode(s.taskQueues) + if err != nil { + return err + } + err = gw.Close() + if err != nil { + return err + } + + state := buf.Bytes() + log.Infof("Saving snapshot of size: %d bytes.", len(state)) + return s.store.Save(state) } func readChunks(globPaths []string) ([]Chunk, error) { @@ -207,12 +283,12 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { t.NumTimeout++ if t.NumTimeout > s.timeoutMax { - log.Warningf("Task %v timed out %d times, discard.\n", t.Task, t.NumTimeout) + log.Warningf("Task %v timed out %d times, discard.", t.Task, t.NumTimeout) s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task) return } - log.Warningf("Task %v timed out %d times, retry.\n", t.Task, t.NumTimeout) + log.Warningf("Task %v timed out %d times, retry.", t.Task, t.NumTimeout) s.taskQueues.Todo = append(s.taskQueues.Todo, t) } } diff --git a/go/pserver/cclient/cclient.go b/go/pserver/cclient/cclient.go index 92a41b7f54348..bbaf43d9f1434 100644 --- a/go/pserver/cclient/cclient.go +++ b/go/pserver/cclient/cclient.go @@ -133,7 +133,7 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter, if err != nil { if err.Error() == pserver.AlreadyInitialized { - log.Warningf("parameter %s already initialized, treat paddle_init_param as sucessful.\n", name) + log.Warningf("parameter %s already initialized, treat paddle_init_param as sucessful.", name) return C.PSERVER_OK } log.Errorln(err) @@ -200,7 +200,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", ")) return C.PSERVER_ERROR } @@ -210,7 +210,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", ")) return C.PSERVER_ERROR } } From 44226853029119e195530e78ff7d0ab883b72dff Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 21 Jun 2017 18:55:49 +0000 Subject: [PATCH 2/6] put InMemStore into master package --- go/cmd/master/master.go | 23 +---------------------- go/master/client_internal_test.go | 15 +-------------- go/master/client_test.go | 15 +-------------- go/master/inmem_store.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 31 insertions(+), 50 deletions(-) create mode 100644 go/master/inmem_store.go diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 49ad0300b8315..48fe2e6f75a8a 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -6,7 +6,6 @@ import ( "net/rpc" "strconv" "strings" - "sync" "time" "github.com/namsral/flag" @@ -15,26 +14,6 @@ import ( "github.com/PaddlePaddle/Paddle/go/master" ) -type inMemStore struct { - mu sync.Mutex - buf []byte -} - -func (m *inMemStore) Save(b []byte) error { - m.mu.Lock() - defer m.mu.Unlock() - - m.buf = b - return nil -} - -func (m *inMemStore) Load() ([]byte, error) { - m.mu.Lock() - defer m.mu.Unlock() - - return m.buf, nil -} - func main() { port := flag.Int("port", 8080, "port of the master server.") @@ -58,7 +37,7 @@ func main() { log.Fatal(err) } } else { - store = &inMemStore{} + store = &master.InMemStore{} } s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index a5b76fe8530a1..251225780ae30 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -32,19 +32,6 @@ func (a TestAddresser) Address() string { return string(a) } -type myStore struct { - buf []byte -} - -func (m *myStore) Save(b []byte) error { - m.buf = b - return nil -} - -func (m *myStore) Load() ([]byte, error) { - return m.buf, nil -} - func TestGetFinishTask(t *testing.T) { const path = "/tmp/master_client_test_0" @@ -60,7 +47,7 @@ func TestGetFinishTask(t *testing.T) { } go func(l net.Listener) { - s, err := NewService(&myStore{}, chunkPerTask, time.Second, 1) + s, err := NewService(&InMemStore{}, chunkPerTask, time.Second, 1) if err != nil { panic(err) } diff --git a/go/master/client_test.go b/go/master/client_test.go index ae5f17c2d4901..85a86761c2e58 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -15,19 +15,6 @@ import ( "github.com/PaddlePaddle/recordio" ) -type myStore struct { - buf []byte -} - -func (m *myStore) Save(b []byte) error { - m.buf = b - return nil -} - -func (m *myStore) Load() ([]byte, error) { - return m.buf, nil -} - func TestNextRecord(t *testing.T) { const ( path = "/tmp/master_client_TestFull" @@ -46,7 +33,7 @@ func TestNextRecord(t *testing.T) { } go func(l net.Listener) { - s, err := master.NewService(&myStore{}, 10, time.Second, 1) + s, err := master.NewService(&master.InMemStore{}, 10, time.Second, 1) if err != nil { panic(err) } diff --git a/go/master/inmem_store.go b/go/master/inmem_store.go new file mode 100644 index 0000000000000..bcd549b20e463 --- /dev/null +++ b/go/master/inmem_store.go @@ -0,0 +1,28 @@ +package master + +import "sync" + +// InMemStore is an in memory implementation of Store interface. +// +// It does not tolerate the fault that casues the program to crash. +type InMemStore struct { + mu sync.Mutex + buf []byte +} + +// Save saves the state into the in-memory store. +func (m *InMemStore) Save(state []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.buf = state + return nil +} + +// Load loads the state from the in-memory store. +func (m *InMemStore) Load() ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + + return m.buf, nil +} From a4ba403e792fc21b5e032ad6116f1fc00fb4ba8d Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 21 Jun 2017 19:00:25 +0000 Subject: [PATCH 3/6] add comment for gracefully stop etcd store --- go/master/etcd_store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/master/etcd_store.go b/go/master/etcd_store.go index ce178370ff90d..d8e95056d5d5c 100644 --- a/go/master/etcd_store.go +++ b/go/master/etcd_store.go @@ -29,6 +29,10 @@ type EtcdStore struct { // NewEtcdStore creates a new EtcdStore. func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (*EtcdStore, error) { + // TODO(helin): gracefully shutdown etcd store. Becuase etcd + // store holds a etcd lock, even though the lock will expire + // when the lease timeout, we need to implement graceful + // shutdown to release the lock. cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, From bf79c9e5bba41dd9f1e122a779e27e3e8dca9ee3 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 21 Jun 2017 19:02:21 +0000 Subject: [PATCH 4/6] add log when master recovered from saved state. --- go/master/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/master/service.go b/go/master/service.go index d453777b05555..58e68e7448599 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -110,6 +110,7 @@ func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, timeou // and the master is ready. s.initDone = true close(s.ready) + log.Info("Master recovered from saved state.") } return s, nil From 42313a3c35637b8d706aa4dbdef65c671e7d6665 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 23 Jun 2017 22:11:45 +0000 Subject: [PATCH 5/6] rename EtcdStore to Etcd --- go/cmd/master/master.go | 2 +- go/master/etcd_store.go | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 48fe2e6f75a8a..a62bc4310e62e 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -32,7 +32,7 @@ func main() { if *endpoints != "" { eps := strings.Split(*endpoints, ",") var err error - store, err = master.NewEtcdStore(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) + store, err = master.NewEtcd(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) if err != nil { log.Fatal(err) } diff --git a/go/master/etcd_store.go b/go/master/etcd_store.go index d8e95056d5d5c..21b3e2cb0f539 100644 --- a/go/master/etcd_store.go +++ b/go/master/etcd_store.go @@ -16,8 +16,9 @@ const ( DefaultStatePath = "/master/state" ) -// EtcdStore is the Store implementation backed by etcd. -type EtcdStore struct { +// Etcd is the etcd abstraction that master uses for fault tolerance +// and service registry. +type Etcd struct { lockPath string statePath string ttlSec int @@ -27,8 +28,8 @@ type EtcdStore struct { lock *concurrency.Mutex } -// NewEtcdStore creates a new EtcdStore. -func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (*EtcdStore, error) { +// NewEtcd creates a new Etcd. +func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, error) { // TODO(helin): gracefully shutdown etcd store. Becuase etcd // store holds a etcd lock, even though the lock will expire // when the lease timeout, we need to implement graceful @@ -59,7 +60,7 @@ func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (* } log.Infof("Successfully acquired lock at %s.", lockPath) - e := &EtcdStore{} + e := &Etcd{} e.client = cli e.lock = lock e.lockPath = lockPath @@ -69,7 +70,7 @@ func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (* } // Save saves the state into the etcd. -func (e *EtcdStore) Save(state []byte) error { +func (e *Etcd) Save(state []byte) error { e.mu.Lock() defer e.mu.Unlock() @@ -101,7 +102,7 @@ func (e *EtcdStore) Save(state []byte) error { } // Load loads the state from etcd. -func (e *EtcdStore) Load() ([]byte, error) { +func (e *Etcd) Load() ([]byte, error) { e.mu.Lock() ctx := context.TODO() get := clientv3.OpGet(e.statePath) @@ -119,8 +120,12 @@ func (e *EtcdStore) Load() ([]byte, error) { } e.lock = concurrency.NewMutex(sess, e.lockPath) - e.lock.Lock(context.TODO()) + err = e.lock.Lock(context.TODO()) e.mu.Unlock() + if err != nil { + return nil, err + } + return e.Load() } From 7dad02661f1cd7406eac871354c94cebf4d38345 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Sat, 24 Jun 2017 00:04:26 +0000 Subject: [PATCH 6/6] Master server registers itself to etcd. --- go/cmd/master/master.go | 14 +++- go/master/{etcd_store.go => etcd_client.go} | 90 +++++++++++---------- 2 files changed, 56 insertions(+), 48 deletions(-) rename go/master/{etcd_store.go => etcd_client.go} (56%) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index a62bc4310e62e..54fa254863156 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "net" "net/http" "net/rpc" @@ -12,13 +13,13 @@ import ( log "github.com/sirupsen/logrus" "github.com/PaddlePaddle/Paddle/go/master" + "github.com/PaddlePaddle/Paddle/go/utils/networkhelper" ) func main() { port := flag.Int("port", 8080, "port of the master server.") - ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.") - endpoints := flag.String("endpoints", "", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.") + endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.") taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") @@ -31,8 +32,13 @@ func main() { var store master.Store if *endpoints != "" { eps := strings.Split(*endpoints, ",") - var err error - store, err = master.NewEtcd(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) + ip, err := networkhelper.GetExternalIP() + if err != nil { + log.Fatal(err) + } + + addr := fmt.Sprintf("%s:%d", ip, *port) + store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) if err != nil { log.Fatal(err) } diff --git a/go/master/etcd_store.go b/go/master/etcd_client.go similarity index 56% rename from go/master/etcd_store.go rename to go/master/etcd_client.go index 21b3e2cb0f539..b7293a759896f 100644 --- a/go/master/etcd_store.go +++ b/go/master/etcd_client.go @@ -2,7 +2,7 @@ package master import ( "context" - "sync" + "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" @@ -14,22 +14,22 @@ const ( DefaultLockPath = "/master/lock" // DefaultStatePath is the default etcd key for master state. DefaultStatePath = "/master/state" + // DefaultAddrPath is the default etcd key for master address. + DefaultAddrPath = "/master/addr" ) -// Etcd is the etcd abstraction that master uses for fault tolerance +// EtcdClient is the etcd client that master uses for fault tolerance // and service registry. -type Etcd struct { +type EtcdClient struct { lockPath string statePath string - ttlSec int client *clientv3.Client - - mu sync.Mutex - lock *concurrency.Mutex + lock *concurrency.Mutex } -// NewEtcd creates a new Etcd. -func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, error) { +// NewEtcdClient creates a new EtcdClient. +func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { + log.Debugf("Connecting to etcd at %v", endpoints) // TODO(helin): gracefully shutdown etcd store. Becuase etcd // store holds a etcd lock, even though the lock will expire // when the lease timeout, we need to implement graceful @@ -53,27 +53,35 @@ func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, // one master running, but split-brain problem may cuase // multiple master servers running), and the cluster management // software will kill one of them. - log.Infof("Trying to acquire lock at %s.", lockPath) + log.Debugf("Trying to acquire lock at %s.", lockPath) err = lock.Lock(context.TODO()) if err != nil { return nil, err } - log.Infof("Successfully acquired lock at %s.", lockPath) - - e := &Etcd{} - e.client = cli - e.lock = lock - e.lockPath = lockPath - e.statePath = statePath - e.ttlSec = ttlSec + log.Debugf("Successfully acquired lock at %s.", lockPath) + + put := clientv3.OpPut(addrPath, string(addr)) + resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit() + if err != nil { + return nil, err + } + + if !resp.Succeeded { + log.Fatal("No longer owns the master lock. Exiting.") + } + + e := &EtcdClient{ + lockPath: lockPath, + statePath: statePath, + client: cli, + lock: lock, + } + return e, nil } // Save saves the state into the etcd. -func (e *Etcd) Save(state []byte) error { - e.mu.Lock() - defer e.mu.Unlock() - +func (e *EtcdClient) Save(state []byte) error { ctx := context.TODO() put := clientv3.OpPut(e.statePath, string(state)) resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit() @@ -82,17 +90,21 @@ func (e *Etcd) Save(state []byte) error { } if !resp.Succeeded { - log.Errorln("No longer owns the lock, trying to lock and save again.") - sess, err := concurrency.NewSession(e.client, concurrency.WithTTL(e.ttlSec)) - if err != nil { - return err - } - - e.lock = concurrency.NewMutex(sess, e.lockPath) - log.Infof("Try to acquire lock at %s.", e.lockPath) - err = e.lock.Lock(context.TODO()) + log.Errorln("No longer owns the lock, trying to lock again") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := e.lock.Lock(ctx) + cancel() if err != nil { - return err + // We lost the master lock and can not acquire + // it back, it means some other master is + // already started. We don't want cluster + // managment system to kill the master server + // who is holding the lock and running + // correctly. So the most feasible solution is + // to kill current master server. The current + // state is not saved, but the trainer's RPC + // call will fail, so the trainer will retry. + log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err) } log.Infof("Successfully acquired lock at %s.", e.lockPath) return e.Save(state) @@ -102,8 +114,7 @@ func (e *Etcd) Save(state []byte) error { } // Load loads the state from etcd. -func (e *Etcd) Load() ([]byte, error) { - e.mu.Lock() +func (e *EtcdClient) Load() ([]byte, error) { ctx := context.TODO() get := clientv3.OpGet(e.statePath) @@ -114,14 +125,7 @@ func (e *Etcd) Load() ([]byte, error) { if !resp.Succeeded { log.Errorln("No longer owns the lock, trying to lock and load again.") - sess, err := concurrency.NewSession(e.client) - if err != nil { - return nil, err - } - - e.lock = concurrency.NewMutex(sess, e.lockPath) - err = e.lock.Lock(context.TODO()) - e.mu.Unlock() + err = e.lock.Lock(context.Background()) if err != nil { return nil, err } @@ -132,11 +136,9 @@ func (e *Etcd) Load() ([]byte, error) { kvs := resp.Responses[0].GetResponseRange().Kvs if len(kvs) == 0 { // No state exists - e.mu.Unlock() return nil, nil } state := kvs[0].Value - e.mu.Unlock() return state, nil }