/
provide.go
91 lines (78 loc) · 2.47 KB
/
provide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package crwatcher
import (
"context"
"os"
"go.uber.org/fx"
"github.com/fluxninja/aperture/v2/pkg/config"
"github.com/fluxninja/aperture/v2/pkg/log"
"github.com/fluxninja/aperture/v2/pkg/notifiers"
)
const (
// ConfigKey is the key to the Kubernetes watcher config.
ConfigKey = "policies.cr_watcher"
)
// CRWatcherConfig holds fields to configure the Kubernetes watcher for Aperture Policy custom resource.
// swagger:model
// +kubebuilder:object:generate=true
type CRWatcherConfig struct {
// Enabled indicates whether the Kubernetes watcher is enabled.
Enabled bool `json:"enabled" default:"false"`
}
// Constructor holds fields to create an annotated instance of Kubernetes Watcher.
type Constructor struct {
// Name of tracker instance.
Name string
// Name of dynamic config tracker instance.
DynamicConfigName string
}
// Annotate creates an annotated instance of Kubernetes Watcher.
func (constructor Constructor) Annotate() fx.Option {
if constructor.Name == "" || constructor.DynamicConfigName == "" {
log.Panic().Msg("Kubernetes watcher name is required")
}
policyTrackersName := config.NameTag(constructor.Name)
policyDynamicConfigTrackersName := config.NameTag(constructor.DynamicConfigName)
return fx.Options(
fx.Invoke(
fx.Annotate(
constructor.setupWatcher,
fx.ParamTags(policyTrackersName, policyDynamicConfigTrackersName),
),
),
)
}
// setupWatcher creates a Kubernetes watcher to watch the Policy Custom Resource.
func (constructor Constructor) setupWatcher(policyTrackers, policyDynamicConfigTrackers notifiers.Trackers, unmarshaller config.Unmarshaller, lifecycle fx.Lifecycle) error {
var config CRWatcherConfig
err := unmarshaller.UnmarshalKey(ConfigKey, &config)
if err != nil {
log.Error().Err(err).Msg("Failed to unmarshal Kubernetes watcher config")
return err
}
if !config.Enabled {
log.Info().Msg("Kubernetes watcher is disabled")
return nil
}
if os.Getenv("APERTURE_CONTROLLER_NAMESPACE") == "" {
os.Setenv("APERTURE_CONTROLLER_NAMESPACE", "default")
}
watcher, err := NewWatcher(policyTrackers, policyDynamicConfigTrackers)
if err != nil {
log.Error().Err(err).Msg("Failed to create Policy Kubernetes watcher")
return err
}
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
err := watcher.Start()
if err != nil {
log.Error().Err(err).Msg("Failed to start watcher")
return err
}
return nil
},
OnStop: func(context.Context) error {
return watcher.Stop()
},
})
return nil
}