-
Notifications
You must be signed in to change notification settings - Fork 173
/
RoundRobinLoadBalancer.java
1234 lines (1101 loc) · 57.9 KB
/
RoundRobinLoadBalancer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;
import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConnectionLimitReachedException;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.AsyncCloseable;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.loadbalancer.Exceptions.StacklessConnectionRejectedException;
import io.servicetalk.loadbalancer.Exceptions.StacklessNoActiveHostException;
import io.servicetalk.loadbalancer.Exceptions.StacklessNoAvailableHostException;
import io.servicetalk.utils.internal.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map.Entry;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT;
import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.AVAILABLE;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.EXPIRED;
import static io.servicetalk.client.api.ServiceDiscovererEvent.Status.UNAVAILABLE;
import static io.servicetalk.concurrent.api.AsyncCloseables.newCompositeCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.Single.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static java.lang.Integer.toHexString;
import static java.lang.Math.min;
import static java.lang.System.identityHashCode;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static java.util.stream.Collectors.toList;
/**
* Consult {@link RoundRobinLoadBalancerFactory} for a description of this {@link LoadBalancer} type.
*
* @param <ResolvedAddress> The resolved address type.
* @param <C> The type of connection.
*/
final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements TestableLoadBalancer<ResolvedAddress, C> {
private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);
private static final Object[] EMPTY_ARRAY = new Object[0];
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<RoundRobinLoadBalancer, List> usedHostsUpdater =
AtomicReferenceFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, List.class, "usedHosts");
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RoundRobinLoadBalancer> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "index");
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<RoundRobinLoadBalancer> nextResubscribeTimeUpdater =
AtomicLongFieldUpdater.newUpdater(RoundRobinLoadBalancer.class, "nextResubscribeTime");
private static final long RESUBSCRIBING = -1L;
/**
* With a relatively small number of connections we can minimize connection creation under moderate concurrency by
* exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per
* selection attempt.
*/
private static final int MIN_RANDOM_SEARCH_SPACE = 64;
/**
* For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for
* trying to locate an available connection when most connections are in use. This increases tail latencies, thus
* after some number of failed attempts it appears to be more beneficial to open a new connection instead.
* <p>
* The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection
* counts, larger connection counts, low connection churn, high connection churn.
*/
private static final float RANDOM_SEARCH_FACTOR = 0.75f;
private volatile long nextResubscribeTime = RESUBSCRIBING;
@SuppressWarnings("unused")
private volatile int index;
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
private final String id;
private final String targetResource;
private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
private final Processor<Object, Object> eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32);
private final Publisher<Object> eventStream;
private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final ListenableAsyncCloseable asyncCloseable;
/**
* Creates a new instance.
*
* @param id a (unique) ID to identify the created {@link RoundRobinLoadBalancer}.
* @param targetResourceName {@link String} representation of the target resource for which this instance
* is performing load balancing.
* @param eventPublisher provides a stream of addresses to connect to.
* @param connectionFactory a function which creates new connections.
* @param healthCheckConfig configuration for the health checking mechanism, which monitors hosts that
* are unable to have a connection established. Providing {@code null} disables this mechanism (meaning the host
* continues being eligible for connecting on the request path).
* @see RoundRobinLoadBalancerFactory
*/
RoundRobinLoadBalancer(
final String id,
final String targetResourceName,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final int linearSearchSpace,
@Nullable final HealthCheckConfig healthCheckConfig) {
this.id = id + '@' + toHexString(identityHashCode(this));
this.targetResource = requireNonNull(targetResourceName);
this.eventPublisher = requireNonNull(eventPublisher);
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
this.connectionFactory = requireNonNull(connectionFactory);
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
this.asyncCloseable = toAsyncCloseable(graceful -> {
discoveryCancellable.cancel();
eventStreamProcessor.onComplete();
final CompositeCloseable compositeCloseable;
for (;;) {
List<Host<ResolvedAddress, C>> currentList = usedHosts;
if (isClosedList(currentList) ||
usedHostsUpdater.compareAndSet(this, currentList, new ClosedList<>(currentList))) {
compositeCloseable = newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory);
LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.",
this, graceful ? "" : "non", currentList.size(), currentList);
break;
}
}
return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync())
.beforeOnError(t -> {
if (!graceful) {
usedHosts = new ClosedList<>(emptyList());
}
})
.beforeOnComplete(() -> usedHosts = new ClosedList<>(emptyList()));
});
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
subscribeToEvents(false);
}
private void subscribeToEvents(boolean resubscribe) {
// This method is invoked only when we are in RESUBSCRIBING state. Only one thread can own this state.
assert nextResubscribeTime == RESUBSCRIBING;
if (resubscribe) {
LOGGER.debug("{}: resubscribing to the ServiceDiscoverer event publisher.", this);
discoveryCancellable.cancelCurrent();
}
toSource(eventPublisher).subscribe(new EventSubscriber(resubscribe));
if (healthCheckConfig != null) {
assert healthCheckConfig.executor instanceof NormalizedTimeSourceExecutor;
nextResubscribeTime = nextResubscribeTime(healthCheckConfig, this);
}
}
private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
final HealthCheckConfig config, final RoundRobinLoadBalancer<R, C> lb) {
final long lower = config.healthCheckResubscribeLowerBound;
final long upper = config.healthCheckResubscribeUpperBound;
final long currentTime = config.executor.currentTime(NANOSECONDS);
final long result = currentTime + RandomUtils.nextLongInclusive(lower, upper);
LOGGER.debug("{}: current time {}, next resubscribe attempt can be performed at {}.",
lb, currentTime, result);
return result;
}
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!Host.isUnhealthy(host.connState)) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}
private static <ResolvedAddress> boolean onlyAvailable(
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean onlyAvailable = !events.isEmpty();
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (!AVAILABLE.equals(event.status())) {
onlyAvailable = false;
break;
}
}
return onlyAvailable;
}
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAvailable(
final Host<ResolvedAddress, C> host,
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean available = false;
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (host.address.equals(event.address())) {
available = true;
break;
}
}
return !available;
}
private final class EventSubscriber
implements Subscriber<Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> {
private boolean firstEventsAfterResubscribe;
EventSubscriber(boolean resubscribe) {
this.firstEventsAfterResubscribe = resubscribe;
}
@Override
public void onSubscribe(final Subscription s) {
// We request max value here to make sure we do not access Subscription concurrently
// (requestN here and cancel from discoveryCancellable). If we request-1 in onNext we would have to wrap
// the Subscription in a ConcurrentSubscription which is costly.
// Since, we synchronously process onNexts we do not really care about flow control.
s.request(Long.MAX_VALUE);
discoveryCancellable.nextCancellable(s);
}
@Override
public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (events == null) {
LOGGER.debug("{}: unexpectedly received null instead of events.", RoundRobinLoadBalancer.this);
return;
}
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
final ServiceDiscovererEvent.Status eventStatus = event.status();
LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.",
RoundRobinLoadBalancer.this, event, eventStatus);
@SuppressWarnings("unchecked")
final List<Host<ResolvedAddress, C>> usedAddresses =
usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, oldHosts -> {
if (isClosedList(oldHosts)) {
return oldHosts;
}
final ResolvedAddress addr = requireNonNull(event.address());
@SuppressWarnings("unchecked")
final List<Host<ResolvedAddress, C>> oldHostsTyped =
(List<Host<ResolvedAddress, C>>) oldHosts;
if (AVAILABLE.equals(eventStatus)) {
return addHostToList(oldHostsTyped, addr);
} else if (EXPIRED.equals(eventStatus)) {
if (oldHostsTyped.isEmpty()) {
return emptyList();
} else {
return markHostAsExpired(oldHostsTyped, addr);
}
} else if (UNAVAILABLE.equals(eventStatus)) {
return listWithHostRemoved(oldHostsTyped, host -> {
boolean match = host.address.equals(addr);
if (match) {
host.markClosed();
}
return match;
});
} else {
LOGGER.error("{}: Unexpected Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
RoundRobinLoadBalancer.this, event, eventStatus, oldHosts);
return oldHosts;
}
});
LOGGER.debug("{}: now using addresses (size={}): {}.",
RoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses);
if (AVAILABLE.equals(eventStatus)) {
if (usedAddresses.size() == 1) {
eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT);
}
} else if (usedAddresses.isEmpty()) {
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
}
}
if (firstEventsAfterResubscribe) {
// We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY.
if (events.isEmpty()) {
return; // Wait for the next collection of events.
}
firstEventsAfterResubscribe = false;
if (!onlyAvailable(events)) {
// Looks like the current ServiceDiscoverer maintains a state between re-subscribes. It already
// assigned correct states to all hosts. Even if some of them were left UNHEALTHY, we should keep
// running health-checks.
return;
}
// Looks like the current ServiceDiscoverer doesn't maintain a state between re-subscribes and always
// starts from an empty state propagating only AVAILABLE events. To be in sync with the
// ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the
// initial collection of events, regardless of their current state.
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
for (Host<ResolvedAddress, C> host : currentHosts) {
if (notAvailable(host, events)) {
host.closeAsyncGracefully().subscribe();
}
}
}
}
private List<Host<ResolvedAddress, C>> markHostAsExpired(
final List<Host<ResolvedAddress, C>> oldHostsTyped, final ResolvedAddress addr) {
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
// Host removal will be handled by the Host's onClose::afterFinally callback
host.markExpired();
break; // because duplicates are not allowed, we can stop iteration
}
}
return oldHostsTyped;
}
private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
Host<ResolvedAddress, C> host = new Host<>(RoundRobinLoadBalancer.this.toString(), addr, healthCheckConfig);
host.onClose().afterFinally(() ->
usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> {
@SuppressWarnings("unchecked")
List<Host<ResolvedAddress, C>> previousHostsTyped =
(List<Host<ResolvedAddress, C>>) previousHosts;
return listWithHostRemoved(previousHostsTyped, current -> current == host);
}
)).subscribe();
return host;
}
private List<Host<ResolvedAddress, C>> addHostToList(
List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
if (oldHostsTyped.isEmpty()) {
return singletonList(createHost(addr));
}
// duplicates are not allowed
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
if (!host.markActiveIfNotClosed()) {
// If the host is already in CLOSED state, we should create a new entry.
// For duplicate ACTIVE events or for repeated activation due to failed CAS
// of replacing the usedHosts array the marking succeeds so we will not add a new entry.
break;
}
return oldHostsTyped;
}
}
final List<Host<ResolvedAddress, C>> newHosts = new ArrayList<>(oldHostsTyped.size() + 1);
newHosts.addAll(oldHostsTyped);
newHosts.add(createHost(addr));
return newHosts;
}
private List<Host<ResolvedAddress, C>> listWithHostRemoved(
List<Host<ResolvedAddress, C>> oldHostsTyped, Predicate<Host<ResolvedAddress, C>> hostPredicate) {
if (oldHostsTyped.isEmpty()) {
// this can happen when an expired host is removed during closing of the RoundRobinLoadBalancer,
// but all of its connections have already been closed
return oldHostsTyped;
}
final List<Host<ResolvedAddress, C>> newHosts = new ArrayList<>(oldHostsTyped.size() - 1);
for (int i = 0; i < oldHostsTyped.size(); ++i) {
final Host<ResolvedAddress, C> current = oldHostsTyped.get(i);
if (hostPredicate.test(current)) {
for (int x = i + 1; x < oldHostsTyped.size(); ++x) {
newHosts.add(oldHostsTyped.get(x));
}
return newHosts.isEmpty() ? emptyList() : newHosts;
} else {
newHosts.add(current);
}
}
return newHosts;
}
@Override
public void onError(final Throwable t) {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onError(t);
}
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
RoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
}
@Override
public void onComplete() {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
RoundRobinLoadBalancer.this, hosts.size(), hosts);
}
}
private static <T> Single<T> failedLBClosed(String targetResource) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
}
@Override
public Single<C> selectConnection(final Predicate<C> selector, @Nullable final ContextMap context) {
return defer(() -> selectConnection0(selector, context, false).shareContextOnSubscribe());
}
@Override
public Single<C> newConnection(@Nullable final ContextMap context) {
return defer(() -> selectConnection0(c -> true, context, true).shareContextOnSubscribe());
}
@Override
public Publisher<Object> eventStream() {
return eventStream;
}
@Override
public String toString() {
return "RoundRobinLoadBalancer{" +
"id=" + id +
", targetResource=" + targetResource +
'}';
}
private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final List<Host<ResolvedAddress, C>> usedHosts = this.usedHosts;
if (usedHosts.isEmpty()) {
return isClosedList(usedHosts) ? failedLBClosed(targetResource) :
// This is the case when SD has emitted some items but none of the hosts are available.
failed(StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
RoundRobinLoadBalancer.class, "selectConnection0(...)"));
}
// try one loop over hosts and if all are expired, give up
final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
final int localCursor = (cursor + i) % usedHosts.size();
final Host<ResolvedAddress, C> host = usedHosts.get(localCursor);
assert host != null : "Host can't be null.";
if (!forceNewConnectionAndReserve) {
// Try first to see if an existing connection can be used
final Object[] connections = host.connState.connections;
// Exhaust the linear search space first:
final int linearAttempts = min(connections.length, linearSearchSpace);
for (int j = 0; j < linearAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[j];
if (selector.test(connection)) {
return succeeded(connection);
}
}
// Try other connections randomly:
if (connections.length > linearAttempts) {
final int diff = connections.length - linearAttempts;
// With small enough search space, attempt number of times equal to number of remaining connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff :
(int) (diff * RANDOM_SEARCH_FACTOR);
for (int j = 0; j < randomAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)];
if (selector.test(connection)) {
return succeeded(connection);
}
}
}
}
// Don't open new connections for expired or unhealthy hosts, try a different one.
// Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress.
if (host.isActiveAndHealthy()) {
pickedHost = host;
break;
}
}
if (pickedHost == null) {
if (healthCheckConfig != null && allUnhealthy(usedHosts)) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
}
return failed(StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
RoundRobinLoadBalancer.class, "selectConnection0(...)"));
}
// No connection was selected: create a new one.
final Host<ResolvedAddress, C> host = pickedHost;
// This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here.
// Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver.
Single<? extends C> establishConnection = connectionFactory.newConnection(host.address, context, null);
if (host.healthCheckConfig != null) {
// Schedule health check before returning
establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(t, connectionFactory));
}
return establishConnection
.flatMap(newCnx -> {
if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
return newCnx.closeAsync().<C>concat(failed(StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + targetResource
+ " could not be reserved.",
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
}
// Invoke the selector before adding the connection to the pool, otherwise, connection can be
// used concurrently and hence a new connection can be rejected by the selector.
if (!selector.test(newCnx)) {
// Failure in selection could be the result of connection factory returning cached connection,
// and not having visibility into max-concurrent-requests, or other threads already selected the
// connection which uses all the max concurrent request count.
// If there is caching Propagate the exception and rely upon retry strategy.
Single<C> failedSingle = failed(StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + targetResource
+ " was rejected by the selection filter.",
RoundRobinLoadBalancer.class, "selectConnection0(...)"));
// Just in case the connection is not closed add it to the host so we don't lose track,
// duplicates will be filtered out.
return (host.addConnection(newCnx, null) ?
failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
}
if (host.addConnection(newCnx, null)) {
return succeeded(newCnx).shareContextOnSubscribe();
}
return newCnx.closeAsync().<C>concat(isClosedList(this.usedHosts) ? failedLBClosed(targetResource) :
failed(StacklessConnectionRejectedException.newInstance(
"Failed to add newly created connection " + newCnx + " for " + targetResource
+ " for " + host, RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
});
}
@Override
public Completable onClose() {
return asyncCloseable.onClose();
}
@Override
public Completable onClosing() {
return asyncCloseable.onClosing();
}
@Override
public Completable closeAsync() {
return asyncCloseable.closeAsync();
}
@Override
public Completable closeAsyncGracefully() {
return asyncCloseable.closeAsyncGracefully();
}
@Override
public List<Entry<ResolvedAddress, List<C>>> usedAddresses() {
return usedHosts.stream().map(Host::asEntry).collect(toList());
}
private static boolean isClosedList(List<?> list) {
return list.getClass().equals(ClosedList.class);
}
private static final class Host<Addr, C extends LoadBalancedConnection> implements ListenableAsyncCloseable {
private enum State {
// The enum is not exhaustive, as other states have dynamic properties.
// For clarity, the other state classes are listed as comments:
// ACTIVE - see ActiveState
// UNHEALTHY - see HealthCheck
EXPIRED,
CLOSED
}
private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState();
private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES);
private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED);
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<Host, ConnState> connStateUpdater =
newUpdater(Host.class, ConnState.class, "connState");
private final String lbDescription;
final Addr address;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final ListenableAsyncCloseable closeable;
private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;
Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) {
this.lbDescription = lbDescription;
this.address = address;
this.healthCheckConfig = healthCheckConfig;
this.closeable = toAsyncCloseable(graceful ->
graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync));
}
boolean markActiveIfNotClosed() {
final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
if (oldConnState.state == State.EXPIRED) {
return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES);
}
// If oldConnState.state == State.ACTIVE this could mean either a duplicate event,
// or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything.
// UNHEALTHY state cannot transition to ACTIVE without passing the health check.
return oldConnState;
}).state;
return oldState != State.CLOSED;
}
void markClosed() {
final ConnState oldState = closeConnState();
final Object[] toRemove = oldState.connections;
cancelIfHealthCheck(oldState);
LOGGER.debug("{}: closing {} connection(s) gracefully to the closed address: {}.",
lbDescription, toRemove.length, address);
for (Object conn : toRemove) {
@SuppressWarnings("unchecked")
final C cConn = (C) conn;
cConn.closeAsyncGracefully().subscribe();
}
}
private ConnState closeConnState() {
for (;;) {
// We need to keep the oldState.connections around even if we are closed because the user may do
// closeGracefully with a timeout, which fails, and then force close. If we discard connections when
// closeGracefully is started we may leak connections.
final ConnState oldState = connState;
if (oldState.state == State.CLOSED || connStateUpdater.compareAndSet(this, oldState,
new ConnState(oldState.connections, State.CLOSED))) {
return oldState;
}
}
}
void markExpired() {
for (;;) {
ConnState oldState = connStateUpdater.get(this);
if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) {
break;
}
Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED;
if (connStateUpdater.compareAndSet(this, oldState,
new ConnState(oldState.connections, nextState))) {
cancelIfHealthCheck(oldState);
if (nextState == State.CLOSED) {
// Trigger the callback to remove the host from usedHosts array.
this.closeAsync().subscribe();
}
break;
}
}
}
void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
// Marking healthy is called when we need to recover from an unexpected error.
// However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed
// to open connections and entered the UNHEALTHY state before the original thread continues execution here.
// In such case, the flipped state is not the same as the one that just succeeded to open a connection.
// In an unlikely scenario that the following connection attempts fail indefinitely, a health check task
// would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new
// health check.
ConnState oldState = connStateUpdater.getAndUpdate(this, previous -> {
if (Host.isUnhealthy(previous)) {
return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES);
}
return previous;
});
if (oldState.state != originalHealthCheckState) {
cancelIfHealthCheck(oldState);
}
}
void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extends C> connectionFactory) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);
if (!Host.isActive(previous) || previous.connections.length > 0
|| cause instanceof ConnectionLimitReachedException) {
LOGGER.debug("{}: failed to open a new connection to the host on address {}. {}.",
lbDescription, address, previous, cause);
break;
}
ActiveState previousState = (ActiveState) previous.state;
if (previousState.failedConnections + 1 < healthCheckConfig.failedThreshold) {
final ActiveState nextState = previousState.forNextFailedConnection();
if (connStateUpdater.compareAndSet(this, previous,
new ConnState(previous.connections, nextState))) {
LOGGER.debug("{}: failed to open a new connection to the host on address {}" +
" {} time(s) ({} consecutive failures will trigger health-checking).",
lbDescription, address, nextState.failedConnections,
healthCheckConfig.failedThreshold, cause);
break;
}
// another thread won the race, try again
continue;
}
final HealthCheck<Addr, C> healthCheck = new HealthCheck<>(connectionFactory, this, cause);
final ConnState nextState = new ConnState(previous.connections, healthCheck);
if (connStateUpdater.compareAndSet(this, previous, nextState)) {
LOGGER.info("{}: failed to open a new connection to the host on address {} " +
"{} time(s) in a row. Error counting threshold reached, marking this host as " +
"UNHEALTHY for the selection algorithm and triggering background health-checking.",
lbDescription, address, healthCheckConfig.failedThreshold, cause);
healthCheck.schedule(cause);
break;
}
}
}
boolean isActiveAndHealthy() {
return isActive(connState);
}
static boolean isActive(final ConnState connState) {
return ActiveState.class.equals(connState.state.getClass());
}
static boolean isUnhealthy(final ConnState connState) {
return HealthCheck.class.equals(connState.state.getClass());
}
boolean addConnection(final C connection, final @Nullable HealthCheck<Addr, C> currentHealthCheck) {
int addAttempt = 0;
for (;;) {
final ConnState previous = connStateUpdater.get(this);
if (previous.state == State.CLOSED) {
return false;
}
++addAttempt;
final Object[] existing = previous.connections;
// Brute force iteration to avoid duplicates. If connections grow larger and faster lookup is required
// we can keep a Set for faster lookups (at the cost of more memory) as well as array.
for (final Object o : existing) {
if (o.equals(connection)) {
return true;
}
}
Object[] newList = Arrays.copyOf(existing, existing.length + 1);
newList[existing.length] = connection;
// If we were able to add a new connection to the list, we should mark the host as ACTIVE again and
// reset its failures counter.
final Object newState = Host.isActive(previous) || Host.isUnhealthy(previous) ?
STATE_ACTIVE_NO_FAILURES : previous.state;
if (connStateUpdater.compareAndSet(this,
previous, new ConnState(newList, newState))) {
// It could happen that the Host turned into UNHEALTHY state either concurrently with adding a new
// connection or with passing a previous health-check (if SD turned it into ACTIVE state). In both
// cases we have to cancel the "previous" ongoing health check. See "markHealthy" for more context.
if (Host.isUnhealthy(previous) &&
(currentHealthCheck == null || previous.state != currentHealthCheck)) {
assert newState == STATE_ACTIVE_NO_FAILURES;
cancelIfHealthCheck(previous);
}
break;
}
}
LOGGER.trace("{}: added a new connection {} to {} after {} attempt(s).",
lbDescription, connection, this, addAttempt);
// Instrument the new connection so we prune it on close
connection.onClose().beforeFinally(() -> {
int removeAttempt = 0;
for (;;) {
final ConnState currentConnState = this.connState;
if (currentConnState.state == State.CLOSED) {
break;
}
assert currentConnState.connections.length > 0;
++removeAttempt;
int i = 0;
final Object[] connections = currentConnState.connections;
// Search for the connection in the list.
for (; i < connections.length; ++i) {
if (connections[i].equals(connection)) {
break;
}
}
if (i == connections.length) {
// Connection was already removed, nothing to do.
break;
} else if (connections.length == 1) {
assert !Host.isUnhealthy(currentConnState) : "Cannot be UNHEALTHY with #connections > 0";
if (Host.isActive(currentConnState)) {
if (connStateUpdater.compareAndSet(this, currentConnState,
new ConnState(EMPTY_ARRAY, currentConnState.state))) {
break;
}
} else if (currentConnState.state == State.EXPIRED
// We're closing the last connection, close the Host.
// Closing the host will trigger the Host's onClose method, which will remove the host
// from used hosts list. If a race condition appears and a new connection was added
// in the meantime, that would mean the host is available again and the CAS operation
// will allow for determining that. It will prevent closing the Host and will only
// remove the connection (previously considered as the last one) from the array
// in the next iteration.
&& connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) {
this.closeAsync().subscribe();
break;
}
} else {
Object[] newList = new Object[connections.length - 1];
System.arraycopy(connections, 0, newList, 0, i);
System.arraycopy(connections, i + 1, newList, i, newList.length - i);
if (connStateUpdater.compareAndSet(this,
currentConnState, new ConnState(newList, currentConnState.state))) {
break;
}
}
}
LOGGER.trace("{}: removed connection {} from {} after {} attempt(s).",
lbDescription, connection, this, removeAttempt);
}).onErrorComplete(t -> {
// Use onErrorComplete instead of whenOnError to avoid double logging of an error inside subscribe():
// SimpleCompletableSubscriber.
LOGGER.error("{}: unexpected error while processing connection.onClose() for {}.",
lbDescription, connection, t);
return true;
}).subscribe();
return true;
}
// Used for testing only
@SuppressWarnings("unchecked")
Entry<Addr, List<C>> asEntry() {
return new SimpleImmutableEntry<>(address,
Stream.of(connState.connections).map(conn -> (C) conn).collect(toList()));
}
@Override
public Completable closeAsync() {
return closeable.closeAsync();
}
@Override
public Completable closeAsyncGracefully() {
return closeable.closeAsyncGracefully();
}
@Override
public Completable onClose() {
return closeable.onClose();
}
@Override
public Completable onClosing() {
return closeable.onClosing();
}
@SuppressWarnings("unchecked")
private Completable doClose(final Function<? super C, Completable> closeFunction) {
return Completable.defer(() -> {
final ConnState oldState = closeConnState();
cancelIfHealthCheck(oldState);
final Object[] connections = oldState.connections;
return (connections.length == 0 ? completed() :
from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)))
.shareContextOnSubscribe();
});
}
private void cancelIfHealthCheck(ConnState connState) {
if (Host.isUnhealthy(connState)) {
@SuppressWarnings("unchecked")
HealthCheck<Addr, C> healthCheck = (HealthCheck<Addr, C>) connState.state;
LOGGER.debug("{}: health check cancelled for {}.", lbDescription, healthCheck.host);
healthCheck.cancel();
}
}
@Override
public String toString() {
final ConnState connState = this.connState;
return "Host{" +
"lbDescription=" + lbDescription +
", address=" + address +
", state=" + connState.state +
", #connections=" + connState.connections.length +
'}';
}
private static final class ActiveState {
private final int failedConnections;
ActiveState() {
this(0);
}
private ActiveState(int failedConnections) {
this.failedConnections = failedConnections;
}
ActiveState forNextFailedConnection() {
return new ActiveState(addWithOverflowProtection(this.failedConnections, 1));
}
@Override
public String toString() {
return "ACTIVE(failedConnections=" + failedConnections + ')';
}
}
private static final class HealthCheck<ResolvedAddress, C extends LoadBalancedConnection>
extends DelayedCancellable {
private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
private final Host<ResolvedAddress, C> host;
private final Throwable lastError;
private HealthCheck(final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final Host<ResolvedAddress, C> host, final Throwable lastError) {
this.connectionFactory = connectionFactory;
this.host = host;
this.lastError = lastError;
}