-
Notifications
You must be signed in to change notification settings - Fork 76
/
dataplane_status_tracker.go
292 lines (249 loc) · 9.78 KB
/
dataplane_status_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package callbacks
import (
"context"
"os"
"strings"
"sync"
)
import (
"google.golang.org/protobuf/proto"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
util_proto "github.com/apache/dubbo-kubernetes/pkg/util/proto"
util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
)
var statusTrackerLog = core.Log.WithName("xds").WithName("status-tracker")
type DataplaneStatusTracker interface {
util_xds.Callbacks
GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool)
}
type SubscriptionStatusAccessor interface {
GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription)
}
type DataplaneInsightSinkFactoryFunc = func(core_model.ResourceType, SubscriptionStatusAccessor) DataplaneInsightSink
func NewDataplaneStatusTracker(
runtimeInfo core_runtime.RuntimeInfo,
createStatusSink DataplaneInsightSinkFactoryFunc,
) DataplaneStatusTracker {
return &dataplaneStatusTracker{
runtimeInfo: runtimeInfo,
createStatusSink: createStatusSink,
streams: make(map[int64]*streamState),
}
}
var _ DataplaneStatusTracker = &dataplaneStatusTracker{}
type dataplaneStatusTracker struct {
util_xds.NoopCallbacks
runtimeInfo core_runtime.RuntimeInfo
createStatusSink DataplaneInsightSinkFactoryFunc
mu sync.RWMutex // protects access to the fields below
streams map[int64]*streamState
}
type streamState struct {
stop chan struct{} // is used for stopping a goroutine that flushes Dataplane status periodically
mu sync.RWMutex // protects access to the fields below
dataplaneId core_model.ResourceKey
subscription *mesh_proto.DiscoverySubscription
}
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (c *dataplaneStatusTracker) OnStreamOpen(ctx context.Context, streamID int64, typ string) error {
c.mu.Lock() // write access to the map of all ADS streams
defer c.mu.Unlock()
// initialize subscription
now := core.Now()
subscription := &mesh_proto.DiscoverySubscription{
Id: core.NewUUID(),
ControlPlaneInstanceId: c.runtimeInfo.GetInstanceId(),
ConnectTime: util_proto.MustTimestampProto(now),
Status: mesh_proto.NewSubscriptionStatus(now),
Version: mesh_proto.NewVersion(),
}
// initialize state per ADS stream
state := &streamState{
stop: make(chan struct{}),
subscription: subscription,
}
// save
c.streams[streamID] = state
statusTrackerLog.V(1).Info("proxy connecting", "streamID", streamID, "type", typ, "subscriptionID", subscription.Id)
return nil
}
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (c *dataplaneStatusTracker) OnStreamClosed(streamID int64) {
c.mu.Lock() // write access to the map of all ADS streams
defer c.mu.Unlock()
state := c.streams[streamID]
if state == nil {
statusTrackerLog.Info("[WARNING] proxy disconnected but no state in the status_tracker", "streamID", streamID)
return
}
delete(c.streams, streamID)
// finilize subscription
state.mu.Lock() // write access to the per Dataplane info
subscription := state.subscription
subscription.DisconnectTime = util_proto.MustTimestampProto(core.Now())
state.mu.Unlock()
// trigger final flush
state.Close()
log := statusTrackerLog.WithValues(
"streamID", streamID,
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"subscriptionID", state.subscription.Id,
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues("subscription", subscription)
}
log.Info("proxy disconnected")
}
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error {
c.mu.RLock() // read access to the map of all ADS streams
defer c.mu.RUnlock()
state := c.streams[streamID]
state.mu.Lock() // write access to the per Dataplane info
defer state.mu.Unlock()
if state.dataplaneId == (core_model.ResourceKey{}) {
// Infer the Dataplane ID.
if proxyId, err := core_xds.ParseProxyIdFromString(req.NodeId()); err == nil {
state.dataplaneId = proxyId.ToResourceKey()
var dpType core_model.ResourceType
md := core_xds.DataplaneMetadataFromXdsMetadata(req.Metadata(), os.TempDir(), state.dataplaneId)
// If the dataplane was started with a resource YAML, then it
// will be serialized in the node metadata and we would know
// the underlying type directly. Since that is optional, we
// can't depend on it here, so we map from the proxy type,
// which is guaranteed.
switch md.GetProxyType() {
case mesh_proto.IngressProxyType:
dpType = core_mesh.ZoneIngressType
case mesh_proto.DataplaneProxyType:
dpType = core_mesh.DataplaneType
}
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", md.GetProxyType(),
"subscriptionID", state.subscription.Id,
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues("node", req.Node())
}
log.Info("proxy connected")
statusTrackerLog.Error(err, "failed to extract version out of the Envoy metadata", "streamid", streamID, "metadata", req.Metadata())
// Kick off the async Dataplane status flusher.
go c.createStatusSink(dpType, state).Start(state.stop)
} else {
statusTrackerLog.Error(err, "failed to parse Dataplane Id out of DiscoveryRequest", "streamid", streamID, "req", req)
}
}
subscription := state.subscription
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", shortEnvoyType(req.GetTypeUrl()),
"resourceVersion", req.VersionInfo(),
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues(
"resourceNames", req.GetResourceNames(),
"subscriptionID", subscription.Id,
"nonce", req.GetResponseNonce(),
)
}
// update Dataplane status
if req.GetResponseNonce() != "" {
subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
if req.HasErrors() {
log.Info("config rejected")
subscription.Status.Total.ResponsesRejected++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesRejected++
} else {
log.V(1).Info("config accepted")
subscription.Status.Total.ResponsesAcknowledged++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesAcknowledged++
}
} else {
if !statusTrackerLog.V(1).Enabled() { // it was already added, no need to add it twice
log = log.WithValues("resourceNames", req.GetResourceNames())
}
log.Info("config requested")
}
return nil
}
// OnStreamResponse is called immediately prior to sending a response on a stream.
func (c *dataplaneStatusTracker) OnStreamResponse(streamID int64, req util_xds.DiscoveryRequest, resp util_xds.DiscoveryResponse) {
c.mu.RLock() // read access to the map of all ADS streams
defer c.mu.RUnlock()
state := c.streams[streamID]
state.mu.Lock() // write access to the per Dataplane info
defer state.mu.Unlock()
// update Dataplane status
subscription := state.subscription
subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
subscription.Status.Total.ResponsesSent++
subscription.Status.StatsOf(resp.GetTypeUrl()).ResponsesSent++
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", shortEnvoyType(req.GetTypeUrl()),
"resourceVersion", resp.VersionInfo(),
"requestedResourceNames", req.GetResourceNames(),
"resourceCount", len(resp.GetResources()),
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues(
"subscriptionID", subscription.Id,
"nonce", resp.GetNonce(),
)
}
log.V(1).Info("config sent")
}
// To keep logs short, we want to log "Listeners" instead of full qualified Envoy type url name
func shortEnvoyType(typeURL string) string {
segments := strings.Split(typeURL, ".")
if len(segments) <= 1 {
return typeURL
}
return segments[len(segments)-1]
}
func (c *dataplaneStatusTracker) GetStatusAccessor(streamID int64) (SubscriptionStatusAccessor, bool) {
state, ok := c.streams[streamID]
return state, ok
}
var _ SubscriptionStatusAccessor = &streamState{}
func (s *streamState) GetStatus() (core_model.ResourceKey, *mesh_proto.DiscoverySubscription) {
s.mu.RLock() // read access to the per Dataplane info
defer s.mu.RUnlock()
return s.dataplaneId, proto.Clone(s.subscription).(*mesh_proto.DiscoverySubscription)
}
func (s *streamState) Close() {
close(s.stop)
}