/
watcher.go
80 lines (68 loc) · 2.73 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package connector
import (
"reflect"
"github.com/flomesh-io/fsm/pkg/announcements"
configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3"
"github.com/flomesh-io/fsm/pkg/k8s/events"
"github.com/flomesh-io/fsm/pkg/logger"
"github.com/flomesh-io/fsm/pkg/messaging"
)
var (
GatewayAPIEnabled = false
)
// WatchMeshConfigUpdated watches update of meshconfig
func WatchMeshConfigUpdated(
connectController ConnectController,
msgBroker *messaging.Broker,
stop <-chan struct{}) {
kubePubSub := msgBroker.GetKubeEventPubSub()
meshCfgUpdateChan := kubePubSub.Sub(announcements.MeshConfigUpdated.String())
defer msgBroker.Unsub(kubePubSub, meshCfgUpdateChan)
for {
select {
case <-stop:
log.Info().Msg("Received stop signal, exiting log level update routine")
return
case event := <-meshCfgUpdateChan:
msg, ok := event.(events.PubSubMessage)
if !ok {
log.Error().Msgf("Error casting to PubSubMessage, got type %T", msg)
continue
}
prevObj, prevOk := msg.OldObj.(*configv1alpha3.MeshConfig)
newObj, newOk := msg.NewObj.(*configv1alpha3.MeshConfig)
if !prevOk || !newOk {
log.Error().Msgf("Error casting to *MeshConfig, got type prev=%T, new=%T", prevObj, newObj)
}
// Update the log level if necessary
if prevObj.Spec.Observability.FSMLogLevel != newObj.Spec.Observability.FSMLogLevel {
if err := logger.SetLogLevel(newObj.Spec.Observability.FSMLogLevel); err != nil {
log.Error().Err(err).Msgf("Error setting controller log level to %s", newObj.Spec.Observability.FSMLogLevel)
}
}
if prevObj.Spec.GatewayAPI.Enabled != newObj.Spec.GatewayAPI.Enabled {
GatewayAPIEnabled = newObj.Spec.GatewayAPI.Enabled
}
if prevObj.Spec.ClusterSet.Name != newObj.Spec.ClusterSet.Name &&
prevObj.Spec.ClusterSet.Group != newObj.Spec.ClusterSet.Group &&
prevObj.Spec.ClusterSet.Zone != newObj.Spec.ClusterSet.Zone &&
prevObj.Spec.ClusterSet.Region != newObj.Spec.ClusterSet.Region {
connectController.SetClusterSet(newObj.Spec.ClusterSet.Name,
newObj.Spec.ClusterSet.Group,
newObj.Spec.ClusterSet.Zone,
newObj.Spec.ClusterSet.Region)
}
if !reflect.DeepEqual(prevObj.Spec.Connector, newObj.Spec.Connector) {
viaGateway := &newObj.Spec.Connector.ViaGateway
if len(viaGateway.IngressAddr) > 0 && len(viaGateway.EgressAddr) > 0 {
connectController.SetViaIngressAddr(viaGateway.IngressAddr)
connectController.SetViaIngressHTTPPort(viaGateway.IngressHTTPPort)
connectController.SetViaIngressGRPCPort(viaGateway.IngressGRPCPort)
connectController.SetViaEgressAddr(viaGateway.EgressAddr)
connectController.SetViaEgressHTTPPort(viaGateway.EgressHTTPPort)
connectController.SetViaEgressGRPCPort(viaGateway.EgressGRPCPort)
}
}
}
}
}