/
dispatcher.go
370 lines (319 loc) · 12.2 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
package protocol
import (
"context"
"sync"
"time"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
// DispatcherGRPCBalancerName is the client-side dispatcher's registered gRPC
// balancer. To utilize client-side dispatching, the service endpoint should be
// dialed with grpc.WithBalancerName(protocol.DispatcherGRPCBalancerName).
const DispatcherGRPCBalancerName = "protocolDispatcher"
// RegisterGRPCDispatcher registers the dispatcher balancer with gRPC. It should
// be called once at program startup. The supplied |localZone| is used to prefer
// intra-zone (over inter-zone) members where able.
func RegisterGRPCDispatcher(localZone string) {
balancer.Register(dispatcherBuilder{zone: localZone})
}
// WithDispatchRoute attaches a Route and optional ProcessSpec_ID to a Context
// passed to a gRPC RPC call. If ProcessSpec_ID is non-zero valued, the RPC is
// dispatched to the specified member. Otherwise, the RPC is dispatched to a
// Route member, preferring:
// * A member not having a currently-broken network connection (eg, due to
// a stale Route or network split).
// * A member which is in the same zone as the caller (potentially reducing
// network traffic costs.
// * A member having a Ready connection (potentially reducing latency).
func WithDispatchRoute(ctx context.Context, rt Route, id ProcessSpec_ID) context.Context {
return context.WithValue(ctx, dispatchRouteCtxKey{}, dispatchRoute{route: rt, id: id})
}
// WithDispatchDefault attaches a Route and ProcessSpec_ID which indicate
// that the RPC should be dispatched to the default service address.
func WithDispatchDefault(ctx context.Context) context.Context {
return WithDispatchRoute(ctx, Route{Primary: -1}, ProcessSpec_ID{})
}
// WithDispatchItemRoute uses the DispatchRouter to resolve |item| to a Route
// and ProcessSpec_ID, which are in-turn attached to the Context and returned
// for dispatcher's use.
func WithDispatchItemRoute(ctx context.Context, dr DispatchRouter, item string, requirePrimary bool) context.Context {
var rt = dr.Route(ctx, item)
var id ProcessSpec_ID
if requirePrimary && rt.Primary != -1 {
id = rt.Members[rt.Primary]
}
return context.WithValue(ctx, dispatchRouteCtxKey{},
dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr})
}
// DispatchRouter routes item to Routes, and observes item Routes.
type DispatchRouter interface {
// Route an |item| to a Route, which may be empty if the Route is unknown.
Route(ctx context.Context, item string) Route
// UpdateRoute for |item|. A nil |route| is treated as an invalidation.
UpdateRoute(item string, route *Route)
// IsNoopRouter returns true if Route is a no-op.
IsNoopRouter() bool
}
// NoopDispatchRouter is a DispatchRouter which doesn't route.
type NoopDispatchRouter struct{}
func (NoopDispatchRouter) Route(context.Context, string) Route { return Route{Primary: -1} }
func (NoopDispatchRouter) UpdateRoute(string, *Route) {}
func (NoopDispatchRouter) IsNoopRouter() bool { return true }
// dispatcher manages the lifetime of SubConns to individual Endpoints, dialing
// Endpoints when needed and shutting them down when they are no longer used.
// SubConns creation and selection is driven by the Routes and ProcessSpec_IDs
// attached to RPC call Contexts via WithDispatchRoute or WithDispatchItemRoute.
type dispatcher struct {
cc balancer.ClientConn
zone string
idConn map[ProcessSpec_ID]markedSubConn
connID map[balancer.SubConn]ProcessSpec_ID
connState map[balancer.SubConn]connectivity.State
sweepDoneCh chan struct{}
sweepMark uint8
sweepTicker *time.Ticker
mu sync.Mutex
}
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. We don't actually care about these, instead using the
// dialed service address directly when a dispatched Route isn't available. If
// that address is, for example, a headless DNS balancer, the `net` package
// implements its own resolution and selection of an appropriate A record.
func (d *dispatcher) UpdateClientConnState(_ balancer.ClientConnState) error {
return nil
}
// ResolverError is called by gRPC when the name resolver reports an error.
// We don't actually care about these, instead using the
// dialed service address directly when a dispatched Route isn't available. If
// that address is, for example, a headless DNS balancer, the `net` package
// implements its own resolution and selection of an appropriate A record.
func (d *dispatcher) ResolverError(_ error) {}
func (d *dispatcher) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
d.mu.Lock()
var id, ok = d.connID[sc]
if !ok {
panic("unexpected SubConn")
}
if state.ConnectivityState == connectivity.Connecting && d.connState[sc] == connectivity.TransientFailure {
// gRPC will quickly transition failed connections back into a Connecting
// state. In many cases, such as a remote-initiated close from a
// shutting-down server, the SubConn may never return. Until we see a
// successful re-connect, continue to consider the SubConn as broken
// (and trigger invalidations of cached Routes which use it).
} else {
d.connState[sc] = state.ConnectivityState
}
if state.ConnectivityState == connectivity.Shutdown {
delete(d.idConn, id)
delete(d.connID, sc)
delete(d.connState, sc)
}
d.mu.Unlock()
// Notify gRPC that block requests may now be able to proceed.
d.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: d,
})
}
// markedSubConn tracks the last mark associated with a SubConn.
// SubConns not used for a complete sweep interval are closed.
type markedSubConn struct {
subConn balancer.SubConn
mark uint8
}
// Pick implements the Picker interface, used by gRPC to select a ready SubConn.
func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
var dr, ok = info.Ctx.Value(dispatchRouteCtxKey{}).(dispatchRoute)
if !ok {
panic("expected dispatchRoute on Context; check for missing WithDispatchRoute ?")
}
defer d.mu.Unlock()
d.mu.Lock()
var dispatchID = dr.id
// If |dispatchID| is not prescribed, select our highest-preference member.
if dispatchID == (ProcessSpec_ID{}) {
for _, id := range dr.route.Members {
if d.less(id, dispatchID) {
dispatchID = id
}
}
}
msc, ok := d.idConn[dispatchID]
if !ok {
// Initiate a new SubConn to the ProcessSpec_ID.
var err error
if msc.subConn, err = d.cc.NewSubConn(
[]resolver.Address{{
Addr: d.idToAddr(dr.route, dispatchID),
Type: resolver.Backend,
}},
balancer.NewSubConnOptions{},
); err != nil {
return balancer.PickResult{}, err
}
msc.mark = d.sweepMark
d.idConn[dispatchID] = msc
d.connID[msc.subConn] = dispatchID
d.connState[msc.subConn] = connectivity.Idle
msc.subConn.Connect()
}
// Update the mark of this markedSubConn to keep it alive.
if msc.mark != d.sweepMark {
msc.mark = d.sweepMark
d.idConn[dispatchID] = msc
}
var state = d.connState[msc.subConn]
if tr, ok := trace.FromContext(info.Ctx); ok {
tr.LazyPrintf("Pick(Route: %s, ID: %s) => %s (%s)",
&dr.route, &dr.id, &dispatchID, state)
}
switch state {
case connectivity.Idle, connectivity.Connecting:
// gRPC will block until connection becomes ready.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.TransientFailure:
// If we're dispatching to the default service SubConn, then return
// ErrNoSubConnAvailable so that gRPC blocks RPCs until a SubConn is
// re-established. Otherwise, we immediately fail RPCs which require
// specific SubConns that are currently broken.
if dispatchID == (ProcessSpec_ID{}) {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
if dr.DispatchRouter != nil {
dr.DispatchRouter.UpdateRoute(dr.item, nil) // Invalidate.
}
// gRPC will fail-fast RPCs having grpc.FailFast (the default), and block others.
return balancer.PickResult{}, balancer.ErrTransientFailure
case connectivity.Ready:
return balancer.PickResult{
SubConn: msc.subConn,
Done: makeDoneClosure(dr),
}, nil
default:
panic(state) // Unexpected connectivity.State.
}
}
// Close is notified by gRPC of a parent grpc.ClientConn closure,
// and terminates the period sweep channel.
func (d *dispatcher) Close() { close(d.sweepDoneCh) }
// less defines an ordering over ProcessSpec_ID preferences used by dispatcher.
func (d *dispatcher) less(lhs, rhs ProcessSpec_ID) bool {
// Always prefer a defined ProcessSpec_ID over the zero-valued one
// (which is interpreted as "use the default service address".
if lhs != rhs && (rhs == ProcessSpec_ID{}) {
return true
}
// Then prefer a same-zone member over a cross-zone one,
// as this can save substantial networking cost.
var lOK = lhs.Zone == d.zone
var rOK = rhs.Zone == d.zone
if lOK != rOK {
return lOK
}
// Then prefer a non-failed transport over a failed one. Note that state
// orders on Idle => Connecting => Ready => TransientFailure, and |lState|
// & |rState| will default to Idle if IDs are not actually in |idConn|.
var lState = d.connState[d.idConn[lhs].subConn]
var rState = d.connState[d.idConn[rhs].subConn]
lOK = lState < connectivity.TransientFailure
rOK = rState < connectivity.TransientFailure
if lOK != rOK {
return lOK
}
// Then prefer to use a Ready connection over building a new one.
return lState > rState
}
// idToAddr returns a suitable address for the ID.
func (d *dispatcher) idToAddr(rt Route, id ProcessSpec_ID) string {
if id == (ProcessSpec_ID{}) {
return d.cc.Target() // Use the default service address.
}
for i := range rt.Members {
if rt.Members[i] == id {
return rt.Endpoints[i].URL().Host
}
}
panic("ProcessSpec_ID must be in Route.Members")
}
// sweep removes any SubConns not having their mark updated in the time between calls,
// with the exception of the default service SubConn (with ProcessSpec_ID{}).
func (d *dispatcher) sweep() {
var toSweep []balancer.SubConn
d.mu.Lock()
for id, msc := range d.idConn {
if msc.mark != d.sweepMark && id != (ProcessSpec_ID{}) {
toSweep = append(toSweep, msc.subConn)
}
}
d.sweepMark++ // Update for next iteration.
d.mu.Unlock()
for _, sc := range toSweep {
// RemoveSubConn begins SubConn shutdown. We expect to see a
// HandleSubConnStateChange with connectivity.Shutdown, at which
// point we'll de-index it.
d.cc.RemoveSubConn(sc)
}
}
// servePeriodicSweeps invokes sweep() every ticker fire.
func (d *dispatcher) servePeriodicSweeps() {
for {
select {
case <-d.sweepDoneCh:
d.sweepTicker.Stop()
return
case <-d.sweepTicker.C:
d.sweep()
}
}
}
// makeDoneClosure builds a closure which calls |invalidate| if the RPC ended
// in an Unavailable error, which gRPC uses to signal various transport errors.
func makeDoneClosure(dr dispatchRoute) func(balancer.DoneInfo) {
if dr.DispatchRouter == nil {
return nil
}
return func(info balancer.DoneInfo) {
if info.Err == nil {
return
} else if s, ok := status.FromError(info.Err); ok && s.Code() == codes.Unavailable {
dr.DispatchRouter.UpdateRoute(dr.item, nil) // Invalidate.
}
}
}
// dispatcherBuilder implements balancer.Builder, and builds dispatcher instances.
type dispatcherBuilder struct{ zone string }
func (db dispatcherBuilder) Name() string { return DispatcherGRPCBalancerName }
func (db dispatcherBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
var d = &dispatcher{
cc: cc,
zone: db.zone,
idConn: make(map[ProcessSpec_ID]markedSubConn),
connID: make(map[balancer.SubConn]ProcessSpec_ID),
connState: make(map[balancer.SubConn]connectivity.State),
sweepDoneCh: make(chan struct{}),
sweepMark: 1,
sweepTicker: time.NewTicker(dispatchSweepInterval),
}
go d.servePeriodicSweeps()
d.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: d,
}) // Signal as ready for RPCs.
return d
}
type (
// dispatchRoute is attached to Contexts by WithDispatchRoute, for dispatcher.Pick to inspect.
dispatchRoute struct {
route Route
id ProcessSpec_ID
item string
DispatchRouter
}
// dispatchRouteCtxKey keys dispatchRoute values attached to Contexts.
dispatchRouteCtxKey struct{}
)
var dispatchSweepInterval = time.Second * 30