-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
load_store_wrapper.go
120 lines (109 loc) · 3.74 KB
/
load_store_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package loadstore contains the loadStoreWrapper shared by the balancers.
package loadstore
import (
"sync"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
// NewWrapper creates a Wrapper.
func NewWrapper() *Wrapper {
return &Wrapper{}
}
// Wrapper wraps a load store with cluster and edsService.
//
// It's store and cluster/edsService can be updated separately. And it will
// update its internal perCluster store so that new stats will be added to the
// correct perCluster.
//
// Note that this struct is a temporary walkaround before we implement graceful
// switch for EDS. Any update to the clusterName and serviceName is too early,
// the perfect timing is when the picker is updated with the new connection.
// This early update could cause picks for the old SubConn being reported to the
// new services.
//
// When the graceful switch in EDS is done, there should be no need for this
// struct. The policies that record/report load shouldn't need to handle update
// of lrsServerName/cluster/edsService. Its parent should do a graceful switch
// of the whole tree when one of that changes.
type Wrapper struct {
mu sync.RWMutex
cluster string
edsService string
// store and perCluster are initialized as nil. They are only set by the
// balancer when LRS is enabled. Before that, all functions to record loads
// are no-op.
store *load.Store
perCluster load.PerClusterReporter
}
// UpdateClusterAndService updates the cluster name and eds service for this
// wrapper. If any one of them is changed from before, the perCluster store in
// this wrapper will also be updated.
func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if cluster == lsw.cluster && edsService == lsw.edsService {
return
}
lsw.cluster = cluster
lsw.edsService = edsService
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
// UpdateLoadStore updates the load store for this wrapper. If it is changed
// from before, the perCluster store in this wrapper will also be updated.
func (lsw *Wrapper) UpdateLoadStore(store *load.Store) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
}
// CallStarted records a call started in the store.
func (lsw *Wrapper) CallStarted(locality string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallStarted(locality)
}
}
// CallFinished records a call finished in the store.
func (lsw *Wrapper) CallFinished(locality string, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallFinished(locality, err)
}
}
// CallServerLoad records the server load in the store.
func (lsw *Wrapper) CallServerLoad(locality, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallServerLoad(locality, name, val)
}
}
// CallDropped records a call dropped in the store.
func (lsw *Wrapper) CallDropped(category string) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallDropped(category)
}
}