-
Notifications
You must be signed in to change notification settings - Fork 717
/
controller.go
107 lines (95 loc) · 3.41 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package eventbus
import (
"context"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/controllers"
"github.com/argoproj/argo-events/controllers/eventbus/installer"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)
const (
// ControllerName is name of the controller
ControllerName = "eventbus-controller"
finalizerName = ControllerName
)
type reconciler struct {
client client.Client
kubeClient kubernetes.Interface
scheme *runtime.Scheme
config *controllers.GlobalConfig
logger *zap.SugaredLogger
}
// NewReconciler returns a new reconciler
func NewReconciler(client client.Client, kubeClient kubernetes.Interface, scheme *runtime.Scheme, config *controllers.GlobalConfig, logger *zap.SugaredLogger) reconcile.Reconciler {
return &reconciler{client: client, scheme: scheme, config: config, kubeClient: kubeClient, logger: logger}
}
func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
eventBus := &v1alpha1.EventBus{}
if err := r.client.Get(ctx, req.NamespacedName, eventBus); err != nil {
if apierrors.IsNotFound(err) {
r.logger.Warnw("WARNING: eventbus not found", "request", req)
return reconcile.Result{}, nil
}
r.logger.Errorw("unable to get eventbus ctl", zap.Any("request", req), zap.Error(err))
return ctrl.Result{}, err
}
log := r.logger.With("namespace", eventBus.Namespace).With("eventbus", eventBus.Name)
ctx = logging.WithLogger(ctx, log)
busCopy := eventBus.DeepCopy()
reconcileErr := r.reconcile(ctx, busCopy)
if reconcileErr != nil {
log.Errorw("reconcile error", zap.Error(reconcileErr))
}
if r.needsUpdate(eventBus, busCopy) {
if err := r.client.Update(ctx, busCopy); err != nil {
return reconcile.Result{}, err
}
}
if err := r.client.Status().Update(ctx, busCopy); err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{}, reconcileErr
}
// reconcile does the real logic
func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus) error {
log := logging.FromContext(ctx)
if !eventBus.DeletionTimestamp.IsZero() {
log.Info("deleting eventbus")
if controllerutil.ContainsFinalizer(eventBus, finalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(ctx, eventBus, r.client, r.kubeClient, r.config, log); err != nil {
log.Errorw("failed to uninstall", zap.Error(err))
return err
}
controllerutil.RemoveFinalizer(eventBus, finalizerName)
}
return nil
}
controllerutil.AddFinalizer(eventBus, finalizerName)
eventBus.Status.InitConditions()
if err := ValidateEventBus(eventBus); err != nil {
log.Errorw("validation failed", zap.Error(err))
eventBus.Status.MarkNotConfigured("InvalidSpec", err.Error())
return err
} else {
eventBus.Status.MarkConfigured()
}
return installer.Install(ctx, eventBus, r.client, r.kubeClient, r.config, log)
}
func (r *reconciler) needsUpdate(old, new *v1alpha1.EventBus) bool {
if old == nil {
return true
}
if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) {
return true
}
return false
}