/
controller.go
346 lines (326 loc) · 13.3 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package istioconnector
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/servicecomb-service-center/istio/pkg/event"
"github.com/apache/servicecomb-service-center/istio/pkg/utils"
"github.com/go-chassis/cari/discovery"
"istio.io/client-go/pkg/apis/networking/v1alpha3"
"istio.io/client-go/pkg/clientset/versioned"
"istio.io/pkg/log"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
k8s "sigs.k8s.io/controller-runtime/pkg/client/config"
)
// Controller receives service center updates and pushes converted Istio ServiceEntry(s) to k8s api server
type Controller struct {
// Istio istioClient for k8s API
istioClient *versioned.Clientset
// Channel used to send and receive service center change events from the service center controller
events chan []event.ChangeEvent
// Cache of converted service entries, mapped to original service center service id
convertedServiceCache sync.Map
}
func NewController(kubeconfigPath string, e chan []event.ChangeEvent) (*Controller, error) {
controller := &Controller{
events: e,
convertedServiceCache: sync.Map{},
}
// get kubernetes config info, used for creating k8s client
client, err := newKubeClient(kubeconfigPath)
if err != nil {
log.Errorf("failed to create istio client: %v\n", err)
return nil, err
}
controller.istioClient = client
return controller, nil
}
// Return a debounced version of a function `fn` that will not run until `wait` seconds have passed
// after it was last called or until `maxWait` seconds have passed since its first call.
// Once `fn` is executed, the max wait timer is reset.
func debounce(fn func(), wait time.Duration, maxWait time.Duration) func() {
// Main timer, time seconds elapsed since last execution
var timer *time.Timer
// Max wait timer, time seconds elapsed since first call
var maxTimer *time.Timer
return func() {
if maxTimer == nil {
// First debounced event, start max wait timer
// will only run target func if not called again after `maxWait` duration
maxTimer = time.AfterFunc(maxWait, func() {
// Reset all timers when max wait time is reached
log.Debugf("debounce: maximum wait time reached, running target fn\n")
if timer.Stop() {
// Only run target func if main timer hasn't already
fn()
}
timer = nil
maxTimer = nil
})
log.Debugf("debounce: max timer started, will wait max time of %s\n", maxWait)
}
if timer != nil {
// Timer already started; function was called within `wait` duration, debounce this event by resetting timer
timer.Stop()
}
// Start timer, will only run target func if not called again after `wait` duration
timer = time.AfterFunc(wait, func() {
log.Debugf("debounce: timer completed, running target fn\n")
// Reset all timers and run target func when wait time is reached
fn()
maxTimer.Stop()
maxTimer = nil
timer = nil
})
log.Debugf("debounce: timer started, will wait %s\n", wait)
}
}
// Run until a signal is received, this function won't block
func (c *Controller) Run(ctx context.Context) {
go c.watchServiceCenterUpdate(ctx)
}
// Return a debounced version of the push2istio method that merges the passed events on each call.
func (c *Controller) getIstioPushDebouncer(wait time.Duration, maxWait time.Duration, maxEvents int) func([]event.ChangeEvent) {
var eventQueue []event.ChangeEvent // Queue of events merged from arguments of each call to debounced function
// Make a debounced version of push2istio, with provided wait and maxWait times
debouncedFn := debounce(func() {
log.Debugf("debounce: push callback fired, pushing events to Istio: %v\n", eventQueue)
// Timeout reached, push events to istio and reset queue
c.push2Istio(eventQueue)
eventQueue = nil
}, wait, maxWait)
return func(newEvents []event.ChangeEvent) {
log.Debugf("debounce: received and merged %d new events\n", len(newEvents))
// Merge new events with existing event queue for each received call
eventQueue = append(eventQueue, newEvents...)
log.Debugf("debounce: new total number of events in queue is %d\n", len(eventQueue))
if len(eventQueue) > maxEvents {
c.push2Istio(eventQueue)
eventQueue = nil
} else {
// Make call to debounced push2istio
debouncedFn()
}
}
}
// Watch the Service Center controller for service and instance change events.
func (c *Controller) watchServiceCenterUpdate(ctx context.Context) {
// Make a debounced push2istio function
debouncedPush := c.getIstioPushDebouncer(utils.PUSH_DEBOUNCE_INTERVAL, utils.PUSH_DEBOUNCE_MAX_INTERVAL, utils.PUSH_DEBOUNCE_MAX_EVENTS)
for {
select {
case <-ctx.Done():
return
case events := <-c.events:
// Received service center change event, use debounced push to Istio.
// Debouncing introduces latency between the time when change events are received from service center, and when they are pushed to Istio.
// Debounce latency will take on the range [PUSH_DEBOUNCE_INTERVAL, PUSH_DEBOUNCE_MAX_INTERVAL] in seconds.
debouncedPush(events)
}
}
}
// Push the received service center service/instance change events to Istio.
func (c *Controller) push2Istio(events []event.ChangeEvent) {
cachedServiceEntries := deepCopyCache(c.convertedServiceCache) // Get cached serviceentries
for _, e := range events {
switch ev := e.Event.(type) {
case *event.MicroserviceEntry:
// service center service-level change events
err := c.pushServiceEvent(e.Event.(*event.MicroserviceEntry), e.Action, cachedServiceEntries)
if err != nil {
log.Errorf("failed to push a service center service event to Istio, err[%v]\n", err)
}
case *event.InstanceEntry:
// service center instance-level change events
err := c.pushEndpointEvents(e.Event.(*event.InstanceEntry), e.Action, cachedServiceEntries)
if err != nil {
log.Errorf("failed to push a service center instance event to Istio, err[%v]\n", err)
}
default:
log.Errorf("failed to push service center event, event type %T is invalid\n", ev)
}
}
// Save updates to ServiceEntry cache
c.refreshCache(cachedServiceEntries)
}
// Convert and push service center service-level change events to Istio.
func (c *Controller) pushServiceEvent(e *event.MicroserviceEntry, action discovery.EventType, svcCache sync.Map) error {
serviceId := e.MicroService.ServiceId
var se *event.ServiceEntry
// Convert the service center MicroService to an Istio ServiceEntry
if res := e.Convert(); res == nil {
return fmt.Errorf("failed to convert service center Service event to Istio ServiceEntry")
} else {
se = res
}
name := se.ServiceEntry.GetName()
log.Debugf("syncing %s SERVICE event for service center service id %s...\n", string(action), serviceId)
switch action {
case discovery.EVT_CREATE:
// CREATE still requires check to determine whether the service already exists; UPDATE is used in this case.
// e.g. ServiceEntry fell out of local cache due to controller restart, but in fact already exists in Istio registry.
fallthrough
case discovery.EVT_UPDATE:
existingSe, err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Get(context.TODO(), name, v1.GetOptions{})
var returnedSe *v1alpha3.ServiceEntry
if err != nil {
returnedSe, err = c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Create(context.TODO(), se.ServiceEntry, v1.CreateOptions{})
if err != nil {
return err
}
} else {
se.ServiceEntry.Spec.Endpoints = existingSe.Spec.Endpoints // Restore endpoints, only the service itself is being updated
se.ServiceEntry.Spec.Ports = existingSe.Spec.Ports
returnedSe, err = c.pushServiceEntryUpdate(existingSe, se.ServiceEntry)
if err != nil {
return err
}
}
svcCache.Store(serviceId, returnedSe)
case discovery.EVT_DELETE:
err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Delete(context.TODO(), name, v1.DeleteOptions{})
if err != nil {
return err
}
svcCache.Delete(serviceId)
}
log.Infof("synced %s SERVICE event to Istio\n", string(action))
return nil
}
// Push an update for an existing ServiceEntry to Istio.
func (c *Controller) pushServiceEntryUpdate(oldServiceEntry, newServiceEntry *v1alpha3.ServiceEntry) (*v1alpha3.ServiceEntry, error) {
newServiceEntry.SetResourceVersion(oldServiceEntry.GetResourceVersion())
returnedSe, err := c.istioClient.NetworkingV1alpha3().ServiceEntries(utils.ISTIO_SYSTEM).Update(context.TODO(), newServiceEntry, v1.UpdateOptions{})
if err != nil {
return nil, err
}
return returnedSe, nil
}
// Convert and push service center instance-level change events to Istio.
func (c *Controller) pushEndpointEvents(e *event.InstanceEntry, action discovery.EventType, svcCache sync.Map) error {
serviceId := e.ServiceId
log.Debugf("syncing %s INSTANCE event for instance %s of service center service %s...\n", string(action), e.InstanceId, serviceId)
value, ok := svcCache.Load(serviceId)
if !ok {
return fmt.Errorf("serviceEntry for service center Service with id %s was not found", e.ServiceId)
}
se := value.(*v1alpha3.ServiceEntry)
newSe := se.DeepCopy()
// Apply changes to the ServiceEntry's endpoints
err := updateIstioServiceEndpoints(newSe, action, e)
if err != nil {
return err
}
// Pushed updated ServiceEntry to Istio
updatedSe, err := c.pushServiceEntryUpdate(se, newSe)
if err != nil {
return err
}
log.Infof("pushed %s INSTANCE event to Istio\n", string(action))
svcCache.Store(serviceId, updatedSe)
return nil
}
// Apply an update event to a ServiceEntry's endpoint(s).
func updateIstioServiceEndpoints(se *v1alpha3.ServiceEntry, action discovery.EventType, targetInst *event.InstanceEntry) error {
targetInstanceId := targetInst.InstanceId
newInsts := []*event.InstanceEntry{}
var seAsMSE *event.MicroserviceEntry
// Convert ServiceEntry back to service center service to apply changes to its service center instances
if res := event.NewServiceEntry(se).Convert(); res == nil {
return fmt.Errorf("failed to parse existing Istio ServiceEntry")
} else {
seAsMSE = res
}
switch discovery.EventType(action) {
case discovery.EVT_DELETE:
// Filter out the deleted instance
for _, existingInst := range seAsMSE.Instances {
if existingInst.InstanceId != targetInstanceId {
newInsts = append(newInsts, existingInst)
}
}
if len(seAsMSE.Instances) == len(newInsts) {
log.Warnf("could not push delete for target Service Center instance id %s, instance was not found\n", targetInstanceId)
}
seAsMSE.Instances = newInsts
case discovery.EVT_CREATE:
// CREATE still requires check to determine whether the endpoint already exists; UPDATE is used in this case.
fallthrough
case discovery.EVT_UPDATE:
updated := false
for i, existingInst := range seAsMSE.Instances {
if existingInst.InstanceId == targetInstanceId {
// Found existing instance, update with new instance
seAsMSE.Instances[i] = targetInst
updated = true
break
}
}
if !updated {
// Instance does not already exist, add as new instance
seAsMSE.Instances = append(seAsMSE.Instances, targetInst)
}
}
// Convert the microservice entry back to istio service entry; the serviceports for the changed endpoints will be regenerated appropriately by conversion logic
var regenedSe *v1alpha3.ServiceEntry
if res := seAsMSE.Convert(); res == nil {
return fmt.Errorf("failed to parse changes for Istio ServiceEntry")
} else {
regenedSe = res.ServiceEntry
}
// Only take regened ports and new workloadentries, preserves rest of original serviceentry
se.Spec.Endpoints = regenedSe.Spec.Endpoints
se.Spec.Ports = regenedSe.Spec.Ports
return nil
}
// Save Istio ServiceEntry(s) converted from service center updates.
func (c *Controller) refreshCache(serviceEntries sync.Map) {
c.convertedServiceCache = serviceEntries
}
// Get a deep copy of the converted Istio ServiceEntry(s) pushed from service center.
func deepCopyCache(m sync.Map) sync.Map {
newMap := sync.Map{}
m.Range(func(key, value interface{}) bool {
sn := value.(*v1alpha3.ServiceEntry)
newMap.Store(key, sn.DeepCopy())
return true
})
return newMap
}
// newKubeClient creates new kube client
func newKubeClient(kubeconfigPath string) (*versioned.Clientset, error) {
var err error
var kubeConf *rest.Config
if kubeconfigPath == "" {
// creates the in-cluster config
kubeConf, err = k8s.GetConfig()
if err != nil {
return nil, fmt.Errorf("build default in cluster kube config failed: %w", err)
}
} else {
kubeConf, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("build kube client config from config file failed: %w", err)
}
}
return versioned.NewForConfig(kubeConf)
}