Skip to content

Commit

Permalink
etcd-election support wait
Browse files Browse the repository at this point in the history
  • Loading branch information
sfwn committed Feb 21, 2022
1 parent 14fb5e1 commit 786f4ff
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
38 changes: 35 additions & 3 deletions providers/etcd-election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ type Interface interface {
ResignLeader() error
OnLeader(handler func(context.Context))
Watch(ctx context.Context, opts ...WatchOption) <-chan Event
ResetWait(wait bool) // `true` means waiting participate in election until wait become `false`
}

type config struct {
Prefix string `file:"root_path" default:"etcd-election"`
NodeID string `file:"node_id"`
Wait bool `file:"wait" default:"false"`
}

type provider struct {
Expand All @@ -87,6 +89,10 @@ type provider struct {
Client *clientv3.Client `autowired:"etcd-client"`
prefix string

wait bool
stopWaitChan chan struct{}
beginWaitChan chan struct{}

lock sync.RWMutex
leaderHandlers []func(ctx context.Context)
cancelHandler func()
Expand All @@ -102,18 +108,35 @@ func (p *provider) Init(ctx servicehub.Context) error {
if len(p.Cfg.NodeID) <= 0 {
p.Cfg.NodeID = uuid.NewV4().String()
}
p.wait = p.Cfg.Wait
p.beginWaitChan = make(chan struct{})
p.stopWaitChan = make(chan struct{})
p.Log.Info("my node id: ", p.Cfg.NodeID)
return nil
}

func (p *provider) reset(session *concurrency.Session) {
func (p *provider) resetSession(session *concurrency.Session) {
session.Close()
p.lock.Lock()
p.session, p.election = nil, nil
p.iAmLeader = false
p.lock.Unlock()
}

func (p *provider) ResetWait(wait bool) {
p.lock.Lock()
defer p.lock.Unlock()
if wait == p.wait {
return
}
p.wait = wait
if !p.wait {
p.stopWaitChan <- struct{}{}
} else {
p.beginWaitChan <- struct{}{}
}
}

func (p *provider) Run(ctx context.Context) error {
for {
select {
Expand All @@ -122,6 +145,11 @@ func (p *provider) Run(ctx context.Context) error {
default:
}

if p.wait {
p.Log.Info("waiting participate in election until wait is false")
<-p.stopWaitChan
}

session, err := p.newSession(ctx, 5*time.Second)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -140,7 +168,7 @@ func (p *provider) Run(ctx context.Context) error {
if errors.Is(err, context.Canceled) {
return nil
}
p.reset(session)
p.resetSession(session)
p.Log.Errorf("fail to Campaign: %s", err, reflect.TypeOf(err))
time.Sleep(1 * time.Second)
continue
Expand All @@ -151,7 +179,7 @@ func (p *provider) Run(ctx context.Context) error {
// The campaign of B exited with nil after connection was restored.
select {
case <-session.Done():
p.reset(session)
p.resetSession(session)
continue
default:
}
Expand All @@ -166,6 +194,10 @@ func (p *provider) Run(ctx context.Context) error {
case <-ctx.Done():
p.resignLeader()
return nil
case <-p.beginWaitChan:
p.Log.Info("begin wait, exit election")
p.resignLeader()
continue
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions providers/etcd-election/examples/examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ etcd:
cert_file: "etcd-client.pem"
cert_key_file: "etcd-client-key.pem"
ca_file: "etcd-ca.pem"

example:

etcd-election:
etcd-election:
wait: true
20 changes: 14 additions & 6 deletions providers/etcd-election/examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,21 @@ func (p *provider) leaderTask(ctx context.Context) {
}

func (p *provider) Run(ctx context.Context) error {
select {
case <-time.After(10 * time.Second):
p.Log.Info("resign leader")
p.Election.ResignLeader()
case <-ctx.Done():
wait := true
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-time.After(10 * time.Second):
p.Log.Info("resign leader")
p.Election.ResignLeader()
case <-ticker.C:
wait = !wait
p.Log.Info("reset wait: ", wait)
p.Election.ResetWait(wait)
case <-ctx.Done():
}
}
return nil
}

func init() {
Expand Down

0 comments on commit 786f4ff

Please sign in to comment.