-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
rls_child_policy.go
131 lines (115 loc) · 3.97 KB
/
rls_child_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package e2e
import (
"encoding/json"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
const (
// RLSChildPolicyTargetNameField is a top-level field name to add to the child
// policy's config, whose value is set to the target for the child policy.
RLSChildPolicyTargetNameField = "Backend"
// RLSChildPolicyBadTarget is a value which is considered a bad target by the
// child policy. This is useful to test bad child policy configuration.
RLSChildPolicyBadTarget = "bad-target"
)
// ErrParseConfigBadTarget is the error returned from ParseConfig when the
// backend field is set to RLSChildPolicyBadTarget.
var ErrParseConfigBadTarget = errors.New("backend field set to RLSChildPolicyBadTarget")
// BalancerFuncs is a set of callbacks which get invoked when the corresponding
// method on the child policy is invoked.
type BalancerFuncs struct {
UpdateClientConnState func(cfg *RLSChildPolicyConfig) error
Close func()
}
// RegisterRLSChildPolicy registers a balancer builder with the given name, to
// be used as a child policy for the RLS LB policy.
//
// The child policy uses a pickfirst balancer under the hood to send all traffic
// to the single backend specified by the `RLSChildPolicyTargetNameField` field
// in its configuration which looks like: {"Backend": "Backend-address"}.
func RegisterRLSChildPolicy(name string, bf *BalancerFuncs) {
balancer.Register(bb{name: name, bf: bf})
}
type bb struct {
name string
bf *BalancerFuncs
}
func (bb bb) Name() string { return bb.name }
func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
pf := balancer.Get(grpc.PickFirstBalancerName)
b := &bal{
Balancer: pf.Build(cc, opts),
bf: bb.bf,
done: grpcsync.NewEvent(),
}
go b.run()
return b
}
func (bb bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
cfg := &RLSChildPolicyConfig{}
if err := json.Unmarshal(c, cfg); err != nil {
return nil, err
}
if cfg.Backend == RLSChildPolicyBadTarget {
return nil, ErrParseConfigBadTarget
}
return cfg, nil
}
type bal struct {
balancer.Balancer
bf *BalancerFuncs
done *grpcsync.Event
}
// RLSChildPolicyConfig is the LB config for the test child policy.
type RLSChildPolicyConfig struct {
serviceconfig.LoadBalancingConfig
Backend string // The target for which this child policy was created.
Random string // A random field to test child policy config changes.
}
func (b *bal) UpdateClientConnState(c balancer.ClientConnState) error {
cfg, ok := c.BalancerConfig.(*RLSChildPolicyConfig)
if !ok {
return fmt.Errorf("received balancer config of type %T, want %T", c.BalancerConfig, &RLSChildPolicyConfig{})
}
if b.bf != nil && b.bf.UpdateClientConnState != nil {
b.bf.UpdateClientConnState(cfg)
}
return b.Balancer.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{{Addr: cfg.Backend}}},
})
}
func (b *bal) Close() {
b.Balancer.Close()
if b.bf != nil && b.bf.Close != nil {
b.bf.Close()
}
b.done.Fire()
}
// run is a dummy goroutine to make sure that child policies are closed at the
// end of tests. If they are not closed, these goroutines will be picked up by
// the leakcheker and tests will fail.
func (b *bal) run() {
<-b.done.Done()
}