Skip to content

Commit

Permalink
Merge pull request #2539 from helinwang/master_etcd_1
Browse files Browse the repository at this point in the history
Master save and load state from etcd, and register on etcd.
  • Loading branch information
helinwang committed Jun 24, 2017
2 parents f277726 + 7dad026 commit 93019df
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 49 deletions.
42 changes: 33 additions & 9 deletions go/cmd/master/master.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,69 @@
package main

import (
"fmt"
"net"
"net/http"
"net/rpc"
"strconv"
"strings"
"time"

"github.com/namsral/flag"
log "github.com/sirupsen/logrus"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
)

func main() {
port := flag.Int("port", 8080, "port of the master server.")

faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()

if *faultTolerance {
panic("fault tolernance not implemented.")
if *endpoints == "" {
log.Warningln("-endpoints not set, fault tolerance not be enabled.")
}

var store master.Store
if *endpoints != "" {
eps := strings.Split(*endpoints, ",")
ip, err := networkhelper.GetExternalIP()
if err != nil {
log.Fatal(err)
}

addr := fmt.Sprintf("%s:%d", ip, *port)
store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
if err != nil {
log.Fatal(err)
}
} else {
store = &master.InMemStore{}
}

s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
}

s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
err = rpc.Register(s)
if err != nil {
panic(err)
log.Fatal(err)
}

rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
panic(err)
log.Fatal(err)
}

err = http.Serve(l, nil)
if err != nil {
panic(err)
log.Fatal(err)
}
}
8 changes: 6 additions & 2 deletions go/master/client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ func TestGetFinishTask(t *testing.T) {
}

go func(l net.Listener) {
s := NewService(chunkPerTask, time.Second, 1)
s, err := NewService(&InMemStore{}, chunkPerTask, time.Second, 1)
if err != nil {
panic(err)
}

server := rpc.NewServer()
err := server.Register(s)
err = server.Register(s)
if err != nil {
panic(err)
}
Expand Down
8 changes: 6 additions & 2 deletions go/master/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ func TestNextRecord(t *testing.T) {
}

go func(l net.Listener) {
s := master.NewService(10, time.Second, 1)
s, err := master.NewService(&master.InMemStore{}, 10, time.Second, 1)
if err != nil {
panic(err)
}

server := rpc.NewServer()
err := server.Register(s)
err = server.Register(s)
if err != nil {
panic(err)
}
Expand Down
144 changes: 144 additions & 0 deletions go/master/etcd_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package master

import (
"context"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)

const (
// DefaultLockPath is the default etcd master lock path.
DefaultLockPath = "/master/lock"
// DefaultStatePath is the default etcd key for master state.
DefaultStatePath = "/master/state"
// DefaultAddrPath is the default etcd key for master address.
DefaultAddrPath = "/master/addr"
)

// EtcdClient is the etcd client that master uses for fault tolerance
// and service registry.
type EtcdClient struct {
lockPath string
statePath string
client *clientv3.Client
lock *concurrency.Mutex
}

// NewEtcdClient creates a new EtcdClient.
func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
// TODO(helin): gracefully shutdown etcd store. Becuase etcd
// store holds a etcd lock, even though the lock will expire
// when the lease timeout, we need to implement graceful
// shutdown to release the lock.
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
return nil, err
}

sess, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlSec))
if err != nil {
return nil, err
}

lock := concurrency.NewMutex(sess, lockPath)
// It's fine for the lock to get stuck, in this case we have
// multiple master servers running (only configured to have
// one master running, but split-brain problem may cuase
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Debugf("Trying to acquire lock at %s.", lockPath)
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Debugf("Successfully acquired lock at %s.", lockPath)

put := clientv3.OpPut(addrPath, string(addr))
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
if err != nil {
return nil, err
}

if !resp.Succeeded {
log.Fatal("No longer owns the master lock. Exiting.")
}

e := &EtcdClient{
lockPath: lockPath,
statePath: statePath,
client: cli,
lock: lock,
}

return e, nil
}

// Save saves the state into the etcd.
func (e *EtcdClient) Save(state []byte) error {
ctx := context.TODO()
put := clientv3.OpPut(e.statePath, string(state))
resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit()
if err != nil {
return err
}

if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock again")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := e.lock.Lock(ctx)
cancel()
if err != nil {
// We lost the master lock and can not acquire
// it back, it means some other master is
// already started. We don't want cluster
// managment system to kill the master server
// who is holding the lock and running
// correctly. So the most feasible solution is
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err)
}
log.Infof("Successfully acquired lock at %s.", e.lockPath)
return e.Save(state)
}

return nil
}

// Load loads the state from etcd.
func (e *EtcdClient) Load() ([]byte, error) {
ctx := context.TODO()
get := clientv3.OpGet(e.statePath)

resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(get).Commit()
if err != nil {
return nil, err
}

if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and load again.")
err = e.lock.Lock(context.Background())
if err != nil {
return nil, err
}

return e.Load()
}

kvs := resp.Responses[0].GetResponseRange().Kvs
if len(kvs) == 0 {
// No state exists
return nil, nil
}

state := kvs[0].Value
return state, nil
}
28 changes: 28 additions & 0 deletions go/master/inmem_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package master

import "sync"

// InMemStore is an in memory implementation of Store interface.
//
// It does not tolerate the fault that casues the program to crash.
type InMemStore struct {
mu sync.Mutex
buf []byte
}

// Save saves the state into the in-memory store.
func (m *InMemStore) Save(state []byte) error {
m.mu.Lock()
defer m.mu.Unlock()

m.buf = state
return nil
}

// Load loads the state from the in-memory store.
func (m *InMemStore) Load() ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()

return m.buf, nil
}
Loading

0 comments on commit 93019df

Please sign in to comment.