Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: etcd-election support wait #245

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions providers/etcd-election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ type Interface interface {
ResignLeader() error
OnLeader(handler func(context.Context))
Watch(ctx context.Context, opts ...WatchOption) <-chan Event
ResetWait(wait bool) // `true` means waiting participate in election until wait become `false`
}

type config struct {
Prefix string `file:"root_path" default:"etcd-election"`
NodeID string `file:"node_id"`
Wait bool `file:"wait" default:"false"`
}

type provider struct {
Expand All @@ -87,6 +89,10 @@ type provider struct {
Client *clientv3.Client `autowired:"etcd-client"`
prefix string

wait bool
stopWaitChan chan struct{}
beginWaitChan chan struct{}

lock sync.RWMutex
leaderHandlers []func(ctx context.Context)
cancelHandler func()
Expand All @@ -102,18 +108,35 @@ func (p *provider) Init(ctx servicehub.Context) error {
if len(p.Cfg.NodeID) <= 0 {
p.Cfg.NodeID = uuid.NewV4().String()
}
p.wait = p.Cfg.Wait
p.beginWaitChan = make(chan struct{})
p.stopWaitChan = make(chan struct{})
p.Log.Info("my node id: ", p.Cfg.NodeID)
return nil
}

func (p *provider) reset(session *concurrency.Session) {
func (p *provider) resetSession(session *concurrency.Session) {
session.Close()
p.lock.Lock()
p.session, p.election = nil, nil
p.iAmLeader = false
p.lock.Unlock()
}

func (p *provider) ResetWait(wait bool) {
p.lock.Lock()
defer p.lock.Unlock()
if wait == p.wait {
return
}
p.wait = wait
if !p.wait {
p.stopWaitChan <- struct{}{}
} else {
p.beginWaitChan <- struct{}{}
}
}

func (p *provider) Run(ctx context.Context) error {
for {
select {
Expand All @@ -122,6 +145,15 @@ func (p *provider) Run(ctx context.Context) error {
default:
}

p.lock.Lock()
wait := p.wait
p.lock.Unlock()

if wait {
p.Log.Info("waiting participate in election until wait is false")
<-p.stopWaitChan
}

session, err := p.newSession(ctx, 5*time.Second)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -140,7 +172,7 @@ func (p *provider) Run(ctx context.Context) error {
if errors.Is(err, context.Canceled) {
return nil
}
p.reset(session)
p.resetSession(session)
p.Log.Errorf("fail to Campaign: %s", err, reflect.TypeOf(err))
time.Sleep(1 * time.Second)
continue
Expand All @@ -151,7 +183,7 @@ func (p *provider) Run(ctx context.Context) error {
// The campaign of B exited with nil after connection was restored.
select {
case <-session.Done():
p.reset(session)
p.resetSession(session)
continue
default:
}
Expand All @@ -166,6 +198,10 @@ func (p *provider) Run(ctx context.Context) error {
case <-ctx.Done():
p.resignLeader()
return nil
case <-p.beginWaitChan:
p.Log.Info("begin wait, exit election")
p.resignLeader()
continue
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions providers/etcd-election/examples/examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ etcd:
cert_file: "etcd-client.pem"
cert_key_file: "etcd-client-key.pem"
ca_file: "etcd-ca.pem"

example:

etcd-election:
etcd-election:
wait: true
21 changes: 15 additions & 6 deletions providers/etcd-election/examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,22 @@ func (p *provider) leaderTask(ctx context.Context) {
}

func (p *provider) Run(ctx context.Context) error {
select {
case <-time.After(10 * time.Second):
p.Log.Info("resign leader")
p.Election.ResignLeader()
case <-ctx.Done():
wait := true
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-time.After(10 * time.Second):
p.Log.Info("resign leader")
p.Election.ResignLeader()
case <-ticker.C:
wait = !wait
p.Log.Info("reset wait: ", wait)
p.Election.ResetWait(wait)
case <-ctx.Done():
return nil
}
}
return nil
}

func init() {
Expand Down