-
Notifications
You must be signed in to change notification settings - Fork 10.4k
/
xds.cc
1865 lines (1699 loc) · 73.3 KB
/
xds.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
#include "src/core/ext/filters/client_channel/lb_policy_factory.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/xds/xds_client.h"
#include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
#define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000
#define GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS (15 * 60 * 1000)
#define GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS 10000
namespace grpc_core {
TraceFlag grpc_lb_xds_trace(false, "xds");
namespace {
constexpr char kXds[] = "xds_experimental";
class ParsedXdsConfig : public LoadBalancingPolicy::Config {
public:
ParsedXdsConfig(const char* balancer_name,
RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy)
: balancer_name_(balancer_name),
child_policy_(std::move(child_policy)),
fallback_policy_(std::move(fallback_policy)) {}
const char* name() const override { return kXds; }
const char* balancer_name() const { return balancer_name_; };
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy() const {
return fallback_policy_;
}
private:
const char* balancer_name_ = nullptr;
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_;
};
class XdsLb : public LoadBalancingPolicy {
public:
explicit XdsLb(Args args);
const char* name() const override { return kXds; }
void UpdateLocked(UpdateArgs args) override;
void ResetBackoffLocked() override;
private:
class ClusterWatcher;
class EndpointWatcher;
// We need this wrapper for the following reasons:
// 1. To process per-locality load reporting.
// 2. Since pickers are UniquePtrs we use this RefCounted wrapper to control
// references to it by the xds picker and the locality.
class PickerWrapper : public RefCounted<PickerWrapper> {
public:
PickerWrapper(UniquePtr<SubchannelPicker> picker,
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats)
: picker_(std::move(picker)),
locality_stats_(std::move(locality_stats)) {
locality_stats_->RefByPicker();
}
~PickerWrapper() { locality_stats_->UnrefByPicker(); }
PickResult Pick(PickArgs args);
private:
UniquePtr<SubchannelPicker> picker_;
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_;
};
// The picker will use a stateless weighting algorithm to pick the locality to
// use for each request.
class Picker : public SubchannelPicker {
public:
// Maintains a weighted list of pickers from each locality that is in ready
// state. The first element in the pair represents the end of a range
// proportional to the locality's weight. The start of the range is the
// previous value in the vector and is 0 for the first element.
using PickerList =
InlinedVector<std::pair<uint32_t, RefCountedPtr<PickerWrapper>>, 1>;
Picker(RefCountedPtr<XdsLb> xds_policy, PickerList pickers)
: xds_policy_(std::move(xds_policy)),
pickers_(std::move(pickers)),
drop_config_(xds_policy_->drop_config_) {}
PickResult Pick(PickArgs args) override;
private:
// Calls the picker of the locality that the key falls within.
PickResult PickFromLocality(const uint32_t key, PickArgs args);
RefCountedPtr<XdsLb> xds_policy_;
PickerList pickers_;
RefCountedPtr<XdsDropConfig> drop_config_;
};
class FallbackHelper : public ChannelControlHelper {
public:
explicit FallbackHelper(RefCountedPtr<XdsLb> parent)
: parent_(std::move(parent)) {}
~FallbackHelper() { parent_.reset(DEBUG_LOCATION, "FallbackHelper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
void RequestReresolution() override;
void AddTraceEvent(TraceSeverity severity, StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingFallback() const;
bool CalledByCurrentFallback() const;
RefCountedPtr<XdsLb> parent_;
LoadBalancingPolicy* child_ = nullptr;
};
// There is only one PriorityList instance, which has the same lifetime with
// the XdsLb instance.
class PriorityList {
public:
// Each LocalityMap holds a ref to the XdsLb.
class LocalityMap : public InternallyRefCounted<LocalityMap> {
public:
// Each Locality holds a ref to the LocalityMap it is in.
class Locality : public InternallyRefCounted<Locality> {
public:
Locality(RefCountedPtr<LocalityMap> locality_map,
RefCountedPtr<XdsLocalityName> name);
~Locality();
void UpdateLocked(uint32_t locality_weight,
ServerAddressList serverlist);
void ShutdownLocked();
void ResetBackoffLocked();
void DeactivateLocked();
void Orphan() override;
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
uint32_t weight() const { return weight_; }
RefCountedPtr<PickerWrapper> picker_wrapper() const {
return picker_wrapper_;
}
void set_locality_map(RefCountedPtr<LocalityMap> locality_map) {
locality_map_ = std::move(locality_map);
}
private:
class Helper : public ChannelControlHelper {
public:
explicit Helper(RefCountedPtr<Locality> locality)
: locality_(std::move(locality)) {}
~Helper() { locality_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr<SubchannelInterface> CreateSubchannel(
const grpc_channel_args& args) override;
void UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) override;
// This is a no-op, because we get the addresses from the xds
// client, which is a watch-based API.
void RequestReresolution() override {}
void AddTraceEvent(TraceSeverity severity,
StringView message) override;
void set_child(LoadBalancingPolicy* child) { child_ = child; }
private:
bool CalledByPendingChild() const;
bool CalledByCurrentChild() const;
RefCountedPtr<Locality> locality_;
LoadBalancingPolicy* child_ = nullptr;
};
// Methods for dealing with the child policy.
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const char* name, const grpc_channel_args* args);
grpc_channel_args* CreateChildPolicyArgsLocked(
const grpc_channel_args* args);
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
XdsLb* xds_policy() const { return locality_map_->xds_policy(); }
// The owning locality map.
RefCountedPtr<LocalityMap> locality_map_;
RefCountedPtr<XdsLocalityName> name_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
RefCountedPtr<PickerWrapper> picker_wrapper_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
uint32_t weight_;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
bool shutdown_ = false;
};
LocalityMap(RefCountedPtr<XdsLb> xds_policy, uint32_t priority);
void UpdateLocked(
const XdsPriorityListUpdate::LocalityMap& locality_map_update);
void ResetBackoffLocked();
void UpdateXdsPickerLocked();
OrphanablePtr<Locality> ExtractLocalityLocked(
const RefCountedPtr<XdsLocalityName>& name);
void DeactivateLocked();
// Returns true if this locality map becomes the currently used one (i.e.,
// its priority is selected) after reactivation.
bool MaybeReactivateLocked();
void MaybeCancelFailoverTimerLocked();
void Orphan() override;
XdsLb* xds_policy() const { return xds_policy_.get(); }
uint32_t priority() const { return priority_; }
grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
}
bool failover_timer_callback_pending() const {
return failover_timer_callback_pending_;
}
private:
void OnLocalityStateUpdateLocked();
void UpdateConnectivityStateLocked();
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error);
static void OnFailoverTimerLocked(void* arg, grpc_error* error);
PriorityList* priority_list() const {
return &xds_policy_->priority_list_;
}
const XdsPriorityListUpdate& priority_list_update() const {
return xds_policy_->priority_list_update_;
}
const XdsPriorityListUpdate::LocalityMap* locality_map_update() const {
return xds_policy_->priority_list_update_.Find(priority_);
}
RefCountedPtr<XdsLb> xds_policy_;
Map<RefCountedPtr<XdsLocalityName>, OrphanablePtr<Locality>,
XdsLocalityName::Less>
localities_;
const uint32_t priority_;
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
// States for delayed removal.
grpc_timer delayed_removal_timer_;
grpc_closure on_delayed_removal_timer_;
bool delayed_removal_timer_callback_pending_ = false;
// States of failover.
grpc_timer failover_timer_;
grpc_closure on_failover_timer_;
bool failover_timer_callback_pending_ = false;
};
explicit PriorityList(XdsLb* xds_policy) : xds_policy_(xds_policy) {}
void UpdateLocked();
void ResetBackoffLocked();
void ShutdownLocked();
void UpdateXdsPickerLocked();
const XdsPriorityListUpdate& priority_list_update() const {
return xds_policy_->priority_list_update_;
}
uint32_t current_priority() const { return current_priority_; }
private:
void MaybeCreateLocalityMapLocked(uint32_t priority);
void FailoverOnConnectionFailureLocked();
void FailoverOnDisconnectionLocked(uint32_t failed_priority);
void SwitchToHigherPriorityLocked(uint32_t priority);
void DeactivatePrioritiesLowerThan(uint32_t priority);
OrphanablePtr<LocalityMap::Locality> ExtractLocalityLocked(
const RefCountedPtr<XdsLocalityName>& name, uint32_t exclude_priority);
// Callers should make sure the priority list is non-empty.
uint32_t LowestPriority() const {
return static_cast<uint32_t>(priorities_.size()) - 1;
}
bool Contains(uint32_t priority) { return priority < priorities_.size(); }
XdsLb* xds_policy_;
// The list of locality maps, indexed by priority. P0 is the highest
// priority.
InlinedVector<OrphanablePtr<LocalityMap>, 2> priorities_;
// The priority that is being used.
uint32_t current_priority_ = UINT32_MAX;
};
~XdsLb();
void ShutdownLocked() override;
// Parses the xds config given the JSON node of the first child of XdsConfig.
// If parsing succeeds, updates \a balancer_name, and updates \a
// child_policy_config_ and \a fallback_policy_config_ if they are also
// found. Does nothing upon failure.
void ParseLbConfig(const ParsedXdsConfig* xds_config);
// Methods for dealing with fallback state.
void MaybeCancelFallbackAtStartupChecks();
static void OnFallbackTimerLocked(void* arg, grpc_error* error);
void UpdateFallbackPolicyLocked();
OrphanablePtr<LoadBalancingPolicy> CreateFallbackPolicyLocked(
const char* name, const grpc_channel_args* args);
void MaybeExitFallbackMode();
// Name of the backend server to connect to.
const char* server_name_ = nullptr;
// Name of the balancer to connect to.
UniquePtr<char> balancer_name_;
// Current channel args from the resolver.
const grpc_channel_args* args_ = nullptr;
// Internal state.
bool shutting_down_ = false;
// The xds client.
OrphanablePtr<XdsClient> xds_client_;
// A pointer to the endpoint watcher, to be used when cancelling the watch.
// Note that this is not owned, so this pointer must never be derefernced.
EndpointWatcher* endpoint_watcher_ = nullptr;
// Whether the checks for fallback at startup are ALL pending. There are
// several cases where this can be reset:
// 1. The fallback timer fires, we enter fallback mode.
// 2. Before the fallback timer fires, the endpoint watcher reports an
// error, we enter fallback mode.
// 3. Before the fallback timer fires, if any child policy in the locality map
// becomes READY, we cancel the fallback timer.
bool fallback_at_startup_checks_pending_ = false;
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
const grpc_millis lb_fallback_timeout_ms_;
// The backend addresses from the resolver.
ServerAddressList fallback_backend_addresses_;
// Fallback timer.
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
// The policy to use for the fallback backends.
RefCountedPtr<LoadBalancingPolicy::Config> fallback_policy_config_;
// Non-null iff we are in fallback mode.
OrphanablePtr<LoadBalancingPolicy> fallback_policy_;
OrphanablePtr<LoadBalancingPolicy> pending_fallback_policy_;
// The policy to use for the backends.
RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
const grpc_millis locality_retention_interval_ms_;
const grpc_millis locality_map_failover_timeout_ms_;
// A list of locality maps indexed by priority.
PriorityList priority_list_;
// The update for priority_list_.
XdsPriorityListUpdate priority_list_update_;
// The config for dropping calls.
RefCountedPtr<XdsDropConfig> drop_config_;
// The stats for client-side load reporting.
XdsClientStats client_stats_;
};
//
// XdsLb::PickerWrapper::Pick
//
LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
LoadBalancingPolicy::PickArgs args) {
// Forward the pick to the picker returned from the child policy.
PickResult result = picker_->Pick(args);
if (result.type != PickResult::PICK_COMPLETE ||
result.subchannel == nullptr || locality_stats_ == nullptr) {
return result;
}
// Record a call started.
locality_stats_->AddCallStarted();
// Intercept the recv_trailing_metadata op to record call completion.
XdsClientStats::LocalityStats* locality_stats =
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// combiner or in the data plane mutex.
[locality_stats](grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
};
return result;
}
//
// XdsLb::Picker
//
XdsLb::PickResult XdsLb::Picker::Pick(PickArgs args) {
// Handle drop.
const UniquePtr<char>* drop_category;
if (drop_config_->ShouldDrop(&drop_category)) {
xds_policy_->client_stats_.AddCallDropped(*drop_category);
PickResult result;
result.type = PickResult::PICK_COMPLETE;
return result;
}
// Generate a random number in [0, total weight).
const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
// Forward pick to whichever locality maps to the range in which the
// random number falls in.
return PickFromLocality(key, args);
}
XdsLb::PickResult XdsLb::Picker::PickFromLocality(const uint32_t key,
PickArgs args) {
size_t mid = 0;
size_t start_index = 0;
size_t end_index = pickers_.size() - 1;
size_t index = 0;
while (end_index > start_index) {
mid = (start_index + end_index) / 2;
if (pickers_[mid].first > key) {
end_index = mid;
} else if (pickers_[mid].first < key) {
start_index = mid + 1;
} else {
index = mid + 1;
break;
}
}
if (index == 0) index = start_index;
GPR_ASSERT(pickers_[index].first > key);
return pickers_[index].second->Pick(args);
}
//
// XdsLb::FallbackHelper
//
bool XdsLb::FallbackHelper::CalledByPendingFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->pending_fallback_policy_.get();
}
bool XdsLb::FallbackHelper::CalledByCurrentFallback() const {
GPR_ASSERT(child_ != nullptr);
return child_ == parent_->fallback_policy_.get();
}
RefCountedPtr<SubchannelInterface> XdsLb::FallbackHelper::CreateSubchannel(
const grpc_channel_args& args) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return nullptr;
}
return parent_->channel_control_helper()->CreateSubchannel(args);
}
void XdsLb::FallbackHelper::UpdateState(grpc_connectivity_state state,
UniquePtr<SubchannelPicker> picker) {
if (parent_->shutting_down_) return;
// If this request is from the pending fallback policy, ignore it until
// it reports READY, at which point we swap it into place.
if (CalledByPendingFallback()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(
GPR_INFO,
"[xdslb %p helper %p] pending fallback policy %p reports state=%s",
parent_.get(), this, parent_->pending_fallback_policy_.get(),
ConnectivityStateName(state));
}
if (state != GRPC_CHANNEL_READY) return;
grpc_pollset_set_del_pollset_set(
parent_->fallback_policy_->interested_parties(),
parent_->interested_parties());
parent_->fallback_policy_ = std::move(parent_->pending_fallback_policy_);
} else if (!CalledByCurrentFallback()) {
// This request is from an outdated fallback policy, so ignore it.
return;
}
parent_->channel_control_helper()->UpdateState(state, std::move(picker));
}
void XdsLb::FallbackHelper::RequestReresolution() {
if (parent_->shutting_down_) return;
const LoadBalancingPolicy* latest_fallback_policy =
parent_->pending_fallback_policy_ != nullptr
? parent_->pending_fallback_policy_.get()
: parent_->fallback_policy_.get();
if (child_ != latest_fallback_policy) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Re-resolution requested from the fallback policy (%p).",
parent_.get(), child_);
}
parent_->channel_control_helper()->RequestReresolution();
}
void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
StringView message) {
if (parent_->shutting_down_ ||
(!CalledByPendingFallback() && !CalledByCurrentFallback())) {
return;
}
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}
//
// XdsLb::ClusterWatcher
//
class XdsLb::ClusterWatcher : public XdsClient::ClusterWatcherInterface {
public:
explicit ClusterWatcher(RefCountedPtr<XdsLb> xds_policy)
: xds_policy_(std::move(xds_policy)) {}
void OnClusterChanged(CdsUpdate update) override {
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
if (update.drop_all) xds_policy_->MaybeExitFallbackMode();
// Update the drop config.
const bool drop_config_changed =
xds_policy_->drop_config_ == nullptr ||
*xds_policy_->drop_config_ != *update.drop_config;
xds_policy_->drop_config_ = std::move(update.drop_config);
// Ignore identical locality update.
if (xds_policy_->priority_list_update_ == update.priority_list_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Incoming locality update identical to current, "
"ignoring. (drop_config_changed=%d)",
xds_policy_.get(), drop_config_changed);
}
if (drop_config_changed) {
xds_policy_->priority_list_.UpdateXdsPickerLocked();
}
return;
}
// Update the priority list.
xds_policy_->priority_list_update_ = std::move(update.priority_list_update);
xds_policy_->priority_list_.UpdateLocked();
}
void OnError(grpc_error* error) override {
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the
// fallback-at-startup case.
if (xds_policy_->fallback_at_startup_checks_pending_) {
gpr_log(GPR_INFO,
"[xdslb %p] xds watcher reported error; entering fallback "
"mode: %s",
xds_policy_.get(), grpc_error_string(error));
xds_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&xds_policy_->lb_fallback_timer_);
xds_policy_->UpdateFallbackPolicyLocked();
// If the xds call failed, request re-resolution.
// TODO(roth): We check the error string contents here to
// differentiate between the xds call failing and the xds channel
// going into TRANSIENT_FAILURE. This is a pretty ugly hack,
// but it's okay for now, since we're not yet sure whether we will
// continue to support the current fallback functionality. If we
// decide to keep the fallback approach, then we should either
// find a cleaner way to expose the difference between these two
// cases or decide that we're okay re-resolving in both cases.
// Note that even if we do keep the current fallback functionality,
// this re-resolution will only be necessary if we are going to be
// using this LB policy with resolvers other than the xds resolver.
if (strstr(grpc_error_string(error), "xds call failed")) {
xds_policy_->channel_control_helper()->RequestReresolution();
}
}
GRPC_ERROR_UNREF(error);
}
private:
RefCountedPtr<XdsLb> xds_policy_;
};
//
// XdsLb::EndpointWatcher
//
class XdsLb::EndpointWatcher : public XdsClient::EndpointWatcherInterface {
public:
explicit EndpointWatcher(RefCountedPtr<XdsLb> xds_policy)
: xds_policy_(std::move(xds_policy)) {}
void OnEndpointChanged(EdsUpdate update) override {
// If the balancer tells us to drop all the calls, we should exit fallback
// mode immediately.
if (update.drop_all) xds_policy_->MaybeExitFallbackMode();
// Update the drop config.
const bool drop_config_changed =
xds_policy_->drop_config_ == nullptr ||
*xds_policy_->drop_config_ != *update.drop_config;
xds_policy_->drop_config_ = std::move(update.drop_config);
// Ignore identical locality update.
if (xds_policy_->priority_list_update_ == update.priority_list_update) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Incoming locality update identical to current, "
"ignoring. (drop_config_changed=%d)",
xds_policy_.get(), drop_config_changed);
}
if (drop_config_changed) {
xds_policy_->priority_list_.UpdateXdsPickerLocked();
}
return;
}
// Update the priority list.
xds_policy_->priority_list_update_ = std::move(update.priority_list_update);
xds_policy_->priority_list_.UpdateLocked();
}
void OnError(grpc_error* error) override {
// If the fallback-at-startup checks are pending, go into fallback mode
// immediately. This short-circuits the timeout for the
// fallback-at-startup case.
if (xds_policy_->fallback_at_startup_checks_pending_) {
gpr_log(GPR_INFO,
"[xdslb %p] xds watcher reported error; entering fallback "
"mode: %s",
xds_policy_.get(), grpc_error_string(error));
xds_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&xds_policy_->lb_fallback_timer_);
xds_policy_->UpdateFallbackPolicyLocked();
// If the xds call failed, request re-resolution.
// TODO(roth): We check the error string contents here to
// differentiate between the xds call failing and the xds channel
// going into TRANSIENT_FAILURE. This is a pretty ugly hack,
// but it's okay for now, since we're not yet sure whether we will
// continue to support the current fallback functionality. If we
// decide to keep the fallback approach, then we should either
// find a cleaner way to expose the difference between these two
// cases or decide that we're okay re-resolving in both cases.
// Note that even if we do keep the current fallback functionality,
// this re-resolution will only be necessary if we are going to be
// using this LB policy with resolvers other than the xds resolver.
if (strstr(grpc_error_string(error), "xds call failed")) {
xds_policy_->channel_control_helper()->RequestReresolution();
}
}
GRPC_ERROR_UNREF(error);
}
private:
RefCountedPtr<XdsLb> xds_policy_;
};
//
// ctor and dtor
//
XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
lb_fallback_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_XDS_FALLBACK_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX})),
locality_retention_interval_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_LOCALITY_RETENTION_INTERVAL_MS,
{GRPC_XDS_DEFAULT_LOCALITY_RETENTION_INTERVAL_MS, 0, INT_MAX})),
locality_map_failover_timeout_ms_(grpc_channel_args_find_integer(
args.args, GRPC_ARG_XDS_FAILOVER_TIMEOUT_MS,
{GRPC_XDS_DEFAULT_FAILOVER_TIMEOUT_MS, 0, INT_MAX})),
priority_list_(this) {
// Record server name.
const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
const char* server_uri = grpc_channel_arg_get_string(arg);
GPR_ASSERT(server_uri != nullptr);
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO,
"[xdslb %p] Will use '%s' as the server name for LB request.", this,
server_name_);
}
grpc_uri_destroy(uri);
}
XdsLb::~XdsLb() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] destroying xds LB policy", this);
}
gpr_free((void*)server_name_);
grpc_channel_args_destroy(args_);
}
void XdsLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] shutting down", this);
}
shutting_down_ = true;
MaybeCancelFallbackAtStartupChecks();
priority_list_.ShutdownLocked();
if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
interested_parties());
}
if (pending_fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(
pending_fallback_policy_->interested_parties(), interested_parties());
}
fallback_policy_.reset();
pending_fallback_policy_.reset();
// Cancel the endpoint watch here instead of in our dtor, because the
// watcher holds a ref to us.
if (xds_client_ != nullptr) {
xds_client_->CancelEndpointDataWatch(StringView(server_name_),
endpoint_watcher_);
xds_client_->RemoveClientStats(StringView(server_name_), &client_stats_);
xds_client_.reset();
}
}
//
// public methods
//
void XdsLb::ResetBackoffLocked() {
// TODO(roth): When we instantiate the XdsClient in the resolver
// instead of in this LB policy, this should be done in the resolver
// instead of here.
if (xds_client_ != nullptr) xds_client_->ResetBackoff();
priority_list_.ResetBackoffLocked();
if (fallback_policy_ != nullptr) {
fallback_policy_->ResetBackoffLocked();
}
if (pending_fallback_policy_ != nullptr) {
pending_fallback_policy_->ResetBackoffLocked();
}
}
void XdsLb::ParseLbConfig(const ParsedXdsConfig* xds_config) {
if (xds_config == nullptr || xds_config->balancer_name() == nullptr) return;
// TODO(yashykt) : does this need to be a gpr_strdup
// TODO(juanlishen): Read balancer name from bootstrap file.
balancer_name_ = UniquePtr<char>(gpr_strdup(xds_config->balancer_name()));
child_policy_config_ = xds_config->child_policy();
fallback_policy_config_ = xds_config->fallback_policy();
}
void XdsLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = xds_client_ == nullptr;
ParseLbConfig(static_cast<const ParsedXdsConfig*>(args.config.get()));
// TODO(roth): This check should go away once we are getting the xds
// server from the bootstrap file.
if (balancer_name_ == nullptr) {
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
return;
}
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
// Update args.
grpc_channel_args_destroy(args_);
args_ = args.args;
args.args = nullptr;
// Create an xds client if we don't have one yet.
if (xds_client_ == nullptr) {
xds_client_ = MakeOrphanable<XdsClient>(
combiner(), interested_parties(), balancer_name_.get(),
StringView(server_name_), nullptr /* service config watcher */, *args_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Created xds client %p", this,
xds_client_.get());
}
endpoint_watcher_ = New<EndpointWatcher>(Ref());
xds_client_->WatchEndpointData(
StringView(server_name_),
UniquePtr<XdsClient::EndpointWatcherInterface>(endpoint_watcher_));
xds_client_->AddClientStats(StringView(server_name_), &client_stats_);
}
// Update priority list.
priority_list_.UpdateLocked();
// Update the existing fallback policy. The fallback policy config and/or the
// fallback addresses may be new.
if (fallback_policy_ != nullptr) UpdateFallbackPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks.
if (is_initial_update) {
grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Held by closure
GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this,
grpc_combiner_scheduler(combiner()));
fallback_at_startup_checks_pending_ = true;
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
}
}
//
// fallback-related methods
//
void XdsLb::MaybeCancelFallbackAtStartupChecks() {
if (!fallback_at_startup_checks_pending_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Cancelling fallback timer", this);
}
grpc_timer_cancel(&lb_fallback_timer_);
fallback_at_startup_checks_pending_ = false;
}
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
// If some fallback-at-startup check is done after the timer fires but before
// this callback actually runs, don't fall back.
if (xdslb_policy->fallback_at_startup_checks_pending_ &&
!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO,
"[xdslb %p] Child policy not ready after fallback timeout; "
"entering fallback mode",
xdslb_policy);
xdslb_policy->fallback_at_startup_checks_pending_ = false;
xdslb_policy->UpdateFallbackPolicyLocked();
}
xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
}
void XdsLb::UpdateFallbackPolicyLocked() {
if (shutting_down_) return;
// Construct update args.
UpdateArgs update_args;
update_args.addresses = fallback_backend_addresses_;
update_args.config = fallback_policy_config_ == nullptr
? nullptr
: fallback_policy_config_->Ref();
update_args.args = grpc_channel_args_copy(args_);
// If the child policy name changes, we need to create a new child
// policy. When this happens, we leave child_policy_ as-is and store
// the new child policy in pending_child_policy_. Once the new child
// policy transitions into state READY, we swap it into child_policy_,
// replacing the original child policy. So pending_child_policy_ is
// non-null only between when we apply an update that changes the child
// policy name and when the new child reports state READY.
//
// Updates can arrive at any point during this transition. We always
// apply updates relative to the most recently created child policy,
// even if the most recent one is still in pending_child_policy_. This
// is true both when applying the updates to an existing child policy
// and when determining whether we need to create a new policy.
//
// As a result of this, there are several cases to consider here:
//
// 1. We have no existing child policy (i.e., we have started up but
// have not yet received a serverlist from the balancer or gone
// into fallback mode; in this case, both child_policy_ and
// pending_child_policy_ are null). In this case, we create a
// new child policy and store it in child_policy_.
//
// 2. We have an existing child policy and have no pending child policy
// from a previous update (i.e., either there has not been a
// previous update that changed the policy name, or we have already
// finished swapping in the new policy; in this case, child_policy_
// is non-null but pending_child_policy_ is null). In this case:
// a. If child_policy_->name() equals child_policy_name, then we
// update the existing child policy.
// b. If child_policy_->name() does not equal child_policy_name,
// we create a new policy. The policy will be stored in
// pending_child_policy_ and will later be swapped into
// child_policy_ by the helper when the new child transitions
// into state READY.
//
// 3. We have an existing child policy and have a pending child policy
// from a previous update (i.e., a previous update set
// pending_child_policy_ as per case 2b above and that policy has
// not yet transitioned into state READY and been swapped into
// child_policy_; in this case, both child_policy_ and
// pending_child_policy_ are non-null). In this case:
// a. If pending_child_policy_->name() equals child_policy_name,
// then we update the existing pending child policy.
// b. If pending_child_policy->name() does not equal
// child_policy_name, then we create a new policy. The new
// policy is stored in pending_child_policy_ (replacing the one
// that was there before, which will be immediately shut down)
// and will later be swapped into child_policy_ by the helper
// when the new child transitions into state READY.
const char* fallback_policy_name = fallback_policy_config_ == nullptr
? "round_robin"
: fallback_policy_config_->name();
const bool create_policy =
// case 1
fallback_policy_ == nullptr ||
// case 2b
(pending_fallback_policy_ == nullptr &&
strcmp(fallback_policy_->name(), fallback_policy_name) != 0) ||
// case 3b
(pending_fallback_policy_ != nullptr &&
strcmp(pending_fallback_policy_->name(), fallback_policy_name) != 0);
LoadBalancingPolicy* policy_to_update = nullptr;
if (create_policy) {
// Cases 1, 2b, and 3b: create a new child policy.
// If child_policy_ is null, we set it (case 1), else we set
// pending_child_policy_ (cases 2b and 3b).
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(GPR_INFO, "[xdslb %p] Creating new %sfallback policy %s", this,
fallback_policy_ == nullptr ? "" : "pending ",
fallback_policy_name);
}
auto& lb_policy = fallback_policy_ == nullptr ? fallback_policy_
: pending_fallback_policy_;
lb_policy =
CreateFallbackPolicyLocked(fallback_policy_name, update_args.args);
policy_to_update = lb_policy.get();
} else {
// Cases 2a and 3a: update an existing policy.
// If we have a pending child policy, send the update to the pending
// policy (case 3a), else send it to the current policy (case 2a).
policy_to_update = pending_fallback_policy_ != nullptr
? pending_fallback_policy_.get()
: fallback_policy_.get();
}
GPR_ASSERT(policy_to_update != nullptr);
// Update the policy.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_trace)) {
gpr_log(
GPR_INFO, "[xdslb %p] Updating %sfallback policy %p", this,
policy_to_update == pending_fallback_policy_.get() ? "pending " : "",
policy_to_update);
}
policy_to_update->UpdateLocked(std::move(update_args));
}