-
Notifications
You must be signed in to change notification settings - Fork 75
/
event_based_watchdog.go
107 lines (97 loc) · 3.24 KB
/
event_based_watchdog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"context"
"errors"
"strings"
"time"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/go-logr/logr"
"golang.org/x/exp/maps"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/dds/reconcile"
"github.com/apache/dubbo-kubernetes/pkg/events"
util_maps "github.com/apache/dubbo-kubernetes/pkg/util/maps"
)
type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
Log logr.Logger
NewFlushTicker func() *time.Ticker
NewFullResyncTicker func() *time.Ticker
}
func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
})
flushTicker := e.NewFlushTicker()
defer flushTicker.Stop()
fullResyncTicker := e.NewFullResyncTicker()
defer fullResyncTicker.Stop()
// for the first reconcile assign all types
changedTypes := maps.Clone(e.ProvidedTypes)
reasons := map[string]struct{}{
ReasonResync: {},
}
for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
if err != nil && errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
} else {
changedTypes = map[model.ResourceType]struct{}{}
reasons = map[string]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}
}