/
announce.go
158 lines (136 loc) · 4.67 KB
/
announce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package allocator
import (
"context"
"fmt"
"os"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/task"
)
// Announcement manages a unique key which is "announced" to peers through Etcd,
// with an associated lease and a value which may be updated over time. It's
// useful for managing keys which simultaneously represent semantics of existence,
// configuration, and processing live-ness (such as allocator member keys).
type Announcement struct {
Key string
Revision int64
etcd *clientv3.Client
}
// Announce a key and value to etcd under the LeaseID, asserting the key doesn't
// already exist. If the key does exist, Announce will retry until it disappears
// (eg, due to a former lease timeout).
func Announce(etcd *clientv3.Client, key, value string, lease clientv3.LeaseID) *Announcement {
for {
var resp, err = etcd.Txn(context.Background()).
If(clientv3.Compare(clientv3.Version(key), "=", 0)).
Then(clientv3.OpPut(key, value, clientv3.WithLease(lease))).
Commit()
if err == nil && resp.Succeeded == false {
err = fmt.Errorf("key exists")
}
if err == nil {
return &Announcement{
Key: key,
Revision: resp.Header.Revision,
etcd: etcd,
}
}
log.WithFields(log.Fields{"err": err, "key": key}).
Warn("failed to announce key (will retry)")
time.Sleep(announceConflictRetryInterval)
}
}
// Update the value of a current Announcement.
func (a *Announcement) Update(value string) error {
var resp, err = a.etcd.Txn(context.Background()).
If(clientv3.Compare(clientv3.ModRevision(a.Key), "=", a.Revision)).
Then(clientv3.OpPut(a.Key, value, clientv3.WithIgnoreLease())).
Commit()
if err == nil && resp.Succeeded == false {
err = fmt.Errorf("key modified or deleted externally (expected revision %d)", a.Revision)
}
if err == nil {
a.Revision = resp.Header.Revision
}
return err
}
// SessionArgs are arguments of StartSession.
type SessionArgs struct {
Etcd *clientv3.Client
Tasks *task.Group
Spec interface {
Validate() error
ZeroLimit()
MarshalString() string
}
State *State
LeaseTTL time.Duration
SignalCh <-chan os.Signal
TestHook func(round int, isIdle bool)
}
// StartSession starts an allocator session. It:
// * Validates the MemberSpec.
// * Establishes an Etcd lease which conveys "liveness" of this member to its peers.
// * Announces the MemberSpec under the lease.
// * Loads the KeySpace as-of the announcement revision.
// * Queues tasks to the *task.Group which:
// - Closes the Etcd lease on task.Group cancellation.
// - Monitors SignalCh and zeros the MemberSpec ItemLimit on its signal.
// - Runs the Allocate loop, cancelling the *task.Group on completion.
func StartSession(args SessionArgs) error {
if err := args.Spec.Validate(); err != nil {
return errors.WithMessage(err, "spec.Validate")
}
var lease, err = concurrency.NewSession(args.Etcd, concurrency.WithTTL(int(args.LeaseTTL.Seconds())))
if err != nil {
return errors.WithMessage(err, "establishing Etcd lease")
}
// Close |lease| when the task.Group is cancelled, or cancel the task.Group
// if the lease fails to keep-alive within its deadline.
args.Tasks.Queue("lease.Close", func() error {
select {
case <-args.Tasks.Context().Done():
return lease.Close()
case <-lease.Done():
return errors.New("unable to keep member lease alive")
}
})
var ann = Announce(args.Etcd, args.State.LocalKey, args.Spec.MarshalString(), lease.Lease())
// Initialize the KeySpace at the announcement revision.
if err = args.State.KS.Load(context.Background(), args.Etcd, ann.Revision); err != nil {
return errors.WithMessage(err, "loading KeySpace")
}
// Monitor |SignalCh|, and re-announce a member limit of zero if it triggers.
args.Tasks.Queue("zero member limit on signal", func() error {
select {
case sig := <-args.SignalCh:
log.WithField("signal", sig).Info("caught signal")
case <-args.Tasks.Context().Done():
return nil
}
// Zero our advertised limit in Etcd. Upon seeing this, Allocator will
// work to discharge all of our assigned items, and Allocate will exit
// gracefully when none remain.
args.Spec.ZeroLimit()
return ann.Update(args.Spec.MarshalString())
})
// Serve the allocation loop. Cancel the task.Group when it finishes.
args.Tasks.Queue("Allocate", func() error {
defer args.Tasks.Cancel()
var err = Allocate(AllocateArgs{
Context: args.Tasks.Context(),
Etcd: args.Etcd,
State: args.State,
TestHook: args.TestHook,
})
if errors.Cause(err) == context.Canceled {
err = nil
}
return err
})
return nil
}
var announceConflictRetryInterval = time.Second * 10