/
LoadBalancer.java
453 lines (421 loc) · 19.9 KB
/
LoadBalancer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
/**
* A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
* channel a usable subchannel when asked.
*
* <h3>Overview</h3>
*
* <p>A LoadBalancer typically implements three interfaces:
* <ol>
* <li>{@link LoadBalancer} is the main interface. All methods on it are invoked sequentially
* from the Channel Executor. It receives the results from the {@link NameResolver}, updates
* of subchannels' connectivity states, and the channel's request for the LoadBalancer to
* shutdown.</li>
* <li>{@link SubchannelPicker SubchannelPicker} does the actual load-balancing work. It selects
* a {@link Subchannel Subchannel} for each new RPC.</li>
* <li>{@link Factory Factory} creates a new {@link LoadBalancer} instance.
* </ol>
*
* <p>{@link Helper Helper} is implemented by gRPC library and provided to {@link Factory
* Factory}. It provides functionalities that a {@code LoadBalancer} implementation would typically
* need.
*
* <h3>Channel Executor</h3>
*
* <p>Channel Executor is an internal executor of the channel, which is used to serialize all the
* callback methods on the {@link LoadBalancer} interface, thus the balancer implementation doesn't
* need to worry about synchronization among them. However, the actual thread of the Channel
* Executor is typically the network thread, thus the following rules must be followed to prevent
* blocking or even dead-locking in a network
*
* <ol>
*
* <li><strong>Never block in Channel Executor</strong>. The callback methods must return
* quickly. Examples or work that must be avoided: CPU-intensive calculation, waiting on
* synchronization primitives, blocking I/O, blocking RPCs, etc.</li>
*
* <li><strong>Avoid calling into other components with lock held</strong>. Channel Executor may
* run callbacks under a lock, e.g., the transport lock of OkHttp. If your LoadBalancer has a
* lock, holds the lock in a callback method (e.g., {@link #handleSubchannelState
* handleSubchannelState()}) while calling into another class that may involve locks, be cautious
* of deadlock. Generally you wouldn't need any locking in the LoadBalancer.</li>
*
* </ol>
*
* <p>{@link Helper#runSerialized Helper.runSerialized()} allows you to schedule a task to be run in
* the Channel Executor.
*
* <h3>The canonical implementation pattern</h3>
*
* <p>A {@link LoadBalancer} keeps states like the latest addresses from NameResolver, the
* Subchannel(s) and their latest connectivity states. These states are mutated within the Channel
* Executor.
*
* <p>A typical {@link SubchannelPicker SubchannelPicker} holds a snapshot of these states. It may
* have its own states, e.g., a picker from a round-robin load-balancer may keep a pointer to the
* next Subchannel, which are typically mutated by multiple threads. The picker should only mutate
* its own state, and should not mutate or re-acquire the states of the LoadBalancer. This way the
* picker only needs to synchronize its own states, which is typically trivial to implement.
*
* <p>When the LoadBalancer states changes, e.g., Subchannels has become or stopped being READY, and
* we want subsequent RPCs to use the latest list of READY Subchannels, LoadBalancer would create
* a new picker, which holds a snapshot of the latest Subchannel list. Refer to the javadoc of
* {@link #handleSubchannelState handleSubchannelState()} how to do this properly.
*
* <p>No synchronization should be necessary between LoadBalancer and its pickers if you follow
* the pattern above. It may be possible to implement in a different way, but that would usually
* result in more complicated threading.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
@NotThreadSafe
public abstract class LoadBalancer {
/**
* Handles newly resolved server groups and metadata attributes from name resolution system.
* {@code servers} contained in {@link ResolvedServerInfoGroup} should be considered equivalent
* but may be flattened into a single list if needed.
*
* <p>Implementations should not modify the given {@code servers}.
*
* @param servers the resolved server addresses, never empty.
* @param attributes extra metadata from naming system.
*/
public abstract void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes);
/**
* Handles an error from the name resolution system.
*
* @param error a non-OK status
*/
public abstract void handleNameResolutionError(Status error);
/**
* Handles a state change on a Subchannel.
*
* <p>The initial state of a Subchannel is IDLE. You won't get a notification for the initial IDLE
* state.
*
* <p>If the new state is not SHUTDOWN, this method should create a new picker and call {@link
* Helper#updatePicker Helper.updatePicker()}. Failing to do so may result in unnecessary delays
* of RPCs. Please refer to {@link PickResult#withSubchannel PickResult.withSubchannel()}'s
* javadoc for more information.
*
* <p>SHUTDOWN can only happen in two cases. One is that LoadBalancer called {@link
* Subchannel#shutdown} earlier, thus it should have already discarded this Subchannel. The other
* is that Channel is doing a {@link ManagedChannel#shutdownNow forced shutdown} or has already
* terminated, thus there won't be further requests to LoadBalancer. Therefore, SHUTDOWN can be
* safely ignored.
*
* @param subchannel the involved Subchannel
* @param stateInfo the new state
*/
public abstract void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo);
/**
* The channel asks the load-balancer to shutdown. No more callbacks will be called after this
* method. The implementation should shutdown all Subchannels and OOB channels, and do any other
* cleanup as necessary.
*/
public abstract void shutdown();
/**
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
*
* <p>Note: Implementations should override exactly one {@code pickSubchannel}.
*/
@ThreadSafe
public abstract static class SubchannelPicker {
/**
* Make a balancing decision for a new RPC.
*
* @param affinity the affinity attributes provided via {@link CallOptions#withAffinity}
* @param headers the headers container of the RPC. It can be mutated within this method.
* @deprecated this signature is going to be removed in the next minor release. Implementations
* should instead override the {@link #pickSubchannel(LoadBalancer.PickSubchannelArgs)}.
*/
@Deprecated
public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
throw new UnsupportedOperationException();
}
/**
* Make a balancing decision for a new RPC.
*
* @param args the pick arguments
*/
// TODO(lukaszx0) make it abstract once deprecated overload will be removed.
public PickResult pickSubchannel(PickSubchannelArgs args) {
return pickSubchannel(args.getCallOptions().getAffinity(), args.getHeaders());
}
}
/**
* Provides arguments for a {@link SubchannelPicker#pickSubchannel(
* LoadBalancer.PickSubchannelArgs)}.
*/
public abstract static class PickSubchannelArgs {
/**
* Call options.
*/
public abstract CallOptions getCallOptions();
/**
* Call metadata.
*/
public abstract Metadata getHeaders();
/**
* Call method.
*/
public abstract MethodDescriptor<?, ?> getMethodDescriptor();
}
/**
* A balancing decision made by {@link SubchannelPicker SubchannelPicker} for an RPC.
*
* <p>The outcome of the decision will be one of the following:
* <ul>
* <li>Proceed: if a Subchannel is provided via {@link #withSubchannel withSubchannel()}, and is
* in READY state when the RPC tries to start on it, the RPC will proceed on that
* Subchannel.</li>
* <li>Error: if an error is provided via {@link #withError withError()}, and the RPC is not
* wait-for-ready (i.e., {@link CallOptions#withWaitForReady} was not called), the RPC will
* fail immediately with the given error.</li>
* <li>Buffer: in all other cases, the RPC will be buffered in the Channel, until the next
* picker is provided via {@link Helper#updatePicker Helper.updatePicker()}, when the RPC
* will go through the same picking process again.</li>
* </ul>
*/
@Immutable
public static final class PickResult {
private static final PickResult NO_RESULT = new PickResult(null, Status.OK);
@Nullable private final Subchannel subchannel;
// An error to be propagated to the application if subchannel == null
// Or OK if there is no error.
// subchannel being null and error being OK means RPC needs to wait
private final Status status;
private PickResult(@Nullable Subchannel subchannel, Status status) {
this.subchannel = subchannel;
this.status = Preconditions.checkNotNull(status, "status");
}
/**
* A decision to proceed the RPC on a Subchannel.
*
* <p>Only Subchannels returned by {@link Helper#createSubchannel Helper.createSubchannel()}
* will work. DO NOT try to use your own implementations of Subchannels, as they won't work.
*
* <p>When the RPC tries to use the return Subchannel, which is briefly after this method
* returns, the state of the Subchannel will decide where the RPC would go:
*
* <ul>
* <li>READY: the RPC will proceed on this Subchannel.</li>
* <li>IDLE: the RPC will be buffered. Subchannel will attempt to create connection.</li>
* <li>All other states: the RPC will be buffered.</li>
* </ul>
*
* <p><strong>All buffered RPCs will stay buffered</strong> until the next call of {@link
* Helper#updatePicker Helper.updatePicker()}, which will trigger a new picking process.
*
* <p>Note that Subchannel's state may change at the same time the picker is making the
* decision, which means the decision may be made with (to-be) outdated information. For
* example, a picker may return a Subchannel known to be READY, but it has become IDLE when is
* about to be used by the RPC, which makes the RPC to be buffered. The LoadBalancer will soon
* learn about the Subchannels' transition from READY to IDLE, create a new picker and allow the
* RPC to use another READY transport if there is any.
*
* <p>You will want to avoid running into a situation where there are READY Subchannels out
* there but some RPCs are still buffered for longer than a brief time.
* <ul>
* <li>This can happen if you return Subchannels with states other than READY and IDLE. For
* example, suppose you round-robin on 2 Subchannels, in READY and CONNECTING states
* respectively. If the picker ignores the state and pick them equally, 50% of RPCs will
* be stuck in buffered state until both Subchannels are READY.</li>
* <li>This can also happen if you don't create a new picker at key state changes of
* Subchannels. Take the above round-robin example again. Suppose you do pick only READY
* and IDLE Subchannels, and initially both Subchannels are READY. Now one becomes IDLE,
* then CONNECTING and stays CONNECTING for a long time. If you don't create a new picker
* in response to the CONNECTING state to exclude that Subchannel, 50% of RPCs will hit it
* and be buffered even though the other Subchannel is READY.</li>
* </ul>
*
* <p>In order to prevent unnecessary delay of RPCs, the rules of thumb are:
* <ol>
* <li>The picker should only pick Subchannels that are known as READY or IDLE. Whether to
* pick IDLE Subchannels depends on whether you want Subchannels to connect on-demand or
* actively:
* <ul>
* <li>If you want connect-on-demand, include IDLE Subchannels in your pick results,
* because when an RPC tries to use an IDLE Subchannel, the Subchannel will try to
* connect.</li>
* <li>If you want Subchannels to be always connected even when there is no RPC, you
* would call {@link Subchannel#requestConnection Subchannel.requestConnection()}
* whenever the Subchannel has transitioned to IDLE, then you don't need to include
* IDLE Subchannels in your pick results.</li>
* </ul></li>
* <li>Always create a new picker and call {@link Helper#updatePicker Helper.updatePicker()}
* whenever {@link #handleSubchannelState handleSubchannelState()} is called, unless the
* new state is SHUTDOWN. See {@code handleSubchannelState}'s javadoc for more
* details.</li>
* </ol>
*/
public static PickResult withSubchannel(Subchannel subchannel) {
return new PickResult(Preconditions.checkNotNull(subchannel, "subchannel"), Status.OK);
}
/**
* A decision to report a connectivity error to the RPC. If the RPC is {@link
* CallOptions#withWaitForReady wait-for-ready}, it will stay buffered. Otherwise, it will fail
* with the given error.
*
* @param error the error status. Must not be OK.
*/
public static PickResult withError(Status error) {
Preconditions.checkArgument(!error.isOk(), "error status shouldn't be OK");
return new PickResult(null, error);
}
/**
* No decision could be made. The RPC will stay buffered.
*/
public static PickResult withNoResult() {
return NO_RESULT;
}
/**
* The Subchannel if this result was created by {@link #withSubchannel withSubchannel()}, or
* null otherwise.
*/
@Nullable
public Subchannel getSubchannel() {
return subchannel;
}
/**
* The status associated with this result. Non-{@code OK} if created with {@link #withError
* withError}, or {@code OK} otherwise.
*/
public Status getStatus() {
return status;
}
@Override
public String toString() {
return "[subchannel=" + subchannel + " status=" + status + "]";
}
}
/**
* Provides essentials for LoadBalancer implementations.
*/
@ThreadSafe
public abstract static class Helper {
/**
* Creates a Subchannel, which is a logical connection to the given group of addresses which are
* considered equivalent. The {@code attrs} are custom attributes associated with this
* Subchannel, and can be accessed later through {@link Subchannel#getAttributes
* Subchannel.getAttributes()}.
*
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
* Subchannels within {@link #shutdown}.
*/
public abstract Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs);
/**
* Out-of-band channel for LoadBalancer’s own RPC needs, e.g., talking to an external
* load-balancer service.
*
* <p>The LoadBalancer is responsible for closing unused OOB channels, and closing all OOB
* channels within {@link #shutdown}.
*/
public abstract ManagedChannel createOobChannel(
EquivalentAddressGroup eag, String authority);
/**
* Set a new picker to the channel.
*
* <p>When a new picker is provided via {@code updatePicker()}, the channel will apply the
* picker on all buffered RPCs, by calling {@link SubchannelPicker#pickSubchannel(
* LoadBalancer.PickSubchannelArgs)}.
*
* <p>The channel will hold the picker and use it for all RPCs, until {@code updatePicker()} is
* called again and a new picker replaces the old one. If {@code updatePicker()} has never been
* called, the channel will buffer all RPCs until a picker is provided.
*/
public abstract void updatePicker(SubchannelPicker picker);
/**
* Schedule a task to be run in the Channel Executor, which serializes the task with the
* callback methods on the {@link LoadBalancer} interface.
*/
public abstract void runSerialized(Runnable task);
/**
* Returns the NameResolver of the channel.
*/
public abstract NameResolver.Factory getNameResolverFactory();
/**
* Returns the authority string of the channel, which is derived from the DNS-style target name.
*/
public abstract String getAuthority();
}
/**
* A logical connection to a server, or a group of equivalent servers represented by an {@link
* EquivalentAddressGroup}.
*
* <p>It maintains at most one physical connection (aka transport) for sending new RPCs, while
* also keeps track of previous transports that has been shut down but not terminated yet.
*
* <p>If there isn't an active transport yet, and an RPC is assigned to the Subchannel, it will
* create a new transport. It won't actively create transports otherwise. {@link
* #requestConnection requestConnection()} can be used to ask Subchannel to create a transport if
* there isn't any.
*/
@ThreadSafe
public abstract static class Subchannel {
/**
* Shuts down the Subchannel. After this method is called, this Subchannel should no longer
* be returned by the latest {@link SubchannelPicker picker}, and can be safely discarded.
*/
public abstract void shutdown();
/**
* Asks the Subchannel to create a connection (aka transport), if there isn't an active one.
*/
public abstract void requestConnection();
/**
* Returns the addresses that this Subchannel is bound to.
*/
public abstract EquivalentAddressGroup getAddresses();
/**
* The same attributes passed to {@link Helper#createSubchannel Helper.createSubchannel()}.
* LoadBalancer can use it to attach additional information here, e.g., the shard this
* Subchannel belongs to.
*/
public abstract Attributes getAttributes();
}
@ThreadSafe
public abstract static class Factory {
/**
* Creates a {@link LoadBalancer} that will be used inside a channel.
*/
public abstract LoadBalancer newLoadBalancer(Helper helper);
}
}