forked from knative/eventing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reconcile.go
131 lines (106 loc) · 4.72 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clusterchannelprovisioner
import (
"context"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logging"
util "github.com/knative/eventing/pkg/provisioners"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
// Name is the name of the GCP PubSub ClusterChannelProvisioner.
Name = "gcp-pubsub"
// Name of the corev1.Events emitted from the reconciliation process
ccpReconciled = "CcpReconciled"
ccpReconcileFailed = "CcpReconcileFailed"
ccpUpdateStatusFailed = "CcpUpdateStatusFailed"
)
type reconciler struct {
client client.Client
recorder record.EventRecorder
logger *zap.Logger
}
// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &reconciler{}
func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.TODO()
ctx = logging.WithLogger(ctx, r.logger.With(zap.Any("request", request)))
ccp := &eventingv1alpha1.ClusterChannelProvisioner{}
err := r.client.Get(ctx, request.NamespacedName, ccp)
// The ClusterChannelProvisioner may have been deleted since it was added to the workqueue. If
// so, there is nothing to be done.
if errors.IsNotFound(err) {
logging.FromContext(ctx).Info("Could not find ClusterChannelProvisioner", zap.Error(err))
return reconcile.Result{}, nil
}
// Any other error should be retried in another reconciliation.
if err != nil {
logging.FromContext(ctx).Error("Unable to Get ClusterChannelProvisioner", zap.Error(err))
return reconcile.Result{}, err
}
// Does this Controller control this ClusterChannelProvisioner?
if !shouldReconcile(ccp.Namespace, ccp.Name) {
logging.FromContext(ctx).Info("Not reconciling ClusterChannelProvisioner, it is not controlled by this Controller", zap.String("APIVersion", ccp.APIVersion), zap.String("Kind", ccp.Kind), zap.String("Namespace", ccp.Namespace), zap.String("name", ccp.Name))
return reconcile.Result{}, nil
}
logging.FromContext(ctx).Info("Reconciling ClusterChannelProvisioner.")
// Modify a copy of this object, rather than the original.
ccp = ccp.DeepCopy()
reconcileErr := r.reconcile(ctx, ccp)
if reconcileErr != nil {
logging.FromContext(ctx).Info("Error reconciling ClusterChannelProvisioner", zap.Error(reconcileErr))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpReconcileFailed, "ClusterChannelProvisioner reconciliation failed: %v", err)
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
} else {
logging.FromContext(ctx).Info("ClusterChannelProvisioner reconciled")
r.recorder.Eventf(ccp, corev1.EventTypeNormal, ccpReconciled, "ClusterChannelProvisioner reconciled: %q", ccp.Name)
}
if err = util.UpdateClusterChannelProvisionerStatus(ctx, r.client, ccp); err != nil {
logging.FromContext(ctx).Info("Error updating ClusterChannelProvisioner Status", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, ccpUpdateStatusFailed, "Failed to update ClusterChannelProvisioner's status: %v", err)
return reconcile.Result{}, err
}
return reconcile.Result{}, reconcileErr
}
// IsControlled determines if the gcp-pubsub Channel Controller should control (and therefore
// reconcile) a given object, based on that object's ClusterChannelProvisioner reference.
func IsControlled(ref *corev1.ObjectReference) bool {
if ref != nil {
return shouldReconcile(ref.Namespace, ref.Name)
}
return false
}
// shouldReconcile determines if this Controller should control (and therefore reconcile) a given
// ClusterChannelProvisioner. This Controller only handles gcp-pubsub channels.
func shouldReconcile(namespace, name string) bool {
return namespace == "" && name == Name
}
func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error {
// We are syncing nothing! Just mark it ready.
if ccp.DeletionTimestamp != nil {
return nil
}
ccp.Status.MarkReady()
return nil
}