forked from knative/eventing
/
reconcile.go
113 lines (89 loc) · 4.05 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
util "github.com/knative/eventing/pkg/provisioners"
)
const (
// Name is the name of the kafka ClusterChannelProvisioner.
Name = "kafka"
)
// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Provisioner resource
// with the current status of the resource.
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.TODO()
r.logger.Info("reconciling ClusterChannelProvisioner", zap.Any("request", request))
// Workaround until https://github.com/kubernetes-sigs/controller-runtime/issues/214 is fixed.
// The reconcile requests triggered because of objects owned by this ClusterChannelProvisioner (e.g k8s service)
// will contain the namespace of that object. Since ClusterChannelProvisioner is cluster-scoped we need to unset the
// namespace or otherwise the provisioner object cannot be found.
request.NamespacedName.Namespace = ""
provisioner := &v1alpha1.ClusterChannelProvisioner{}
err := r.client.Get(context.TODO(), request.NamespacedName, provisioner)
if errors.IsNotFound(err) {
r.logger.Info("could not find ClusterChannelProvisioner", zap.Any("request", request))
return reconcile.Result{}, nil
}
if err != nil {
r.logger.Error("could not fetch ClusterChannelProvisioner", zap.Error(err))
return reconcile.Result{}, err
}
// Skip channel provisioners that we don't manage
if provisioner.Name != Name {
r.logger.Info("not reconciling ClusterChannelProvisioner, it is not controlled by this Controller", zap.Any("request", request))
return reconcile.Result{}, nil
}
newProvisioner := provisioner.DeepCopy()
// Reconcile this copy of the Provisioner and then write back any status
// updates regardless of whether the reconcile error out.
err = r.reconcile(ctx, newProvisioner)
if err != nil {
r.logger.Info("error reconciling ClusterProvisioner", zap.Error(err))
// Note that we do not return the error here, because we want to update the Status
// regardless of the error.
}
if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, newProvisioner); updateStatusErr != nil {
r.logger.Info("error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr))
return reconcile.Result{}, updateStatusErr
}
// Requeue if the resource is not ready:
return reconcile.Result{}, err
}
func (r *reconciler) reconcile(ctx context.Context, provisioner *v1alpha1.ClusterChannelProvisioner) error {
// See if the provisioner has been deleted
if provisioner.DeletionTimestamp != nil {
r.logger.Info(fmt.Sprintf("DeletionTimestamp: %v", provisioner.DeletionTimestamp))
return nil
}
provisioner.Status.InitializeConditions()
svc, err := util.CreateDispatcherService(ctx, r.client, provisioner)
if err != nil {
r.logger.Info("error creating the ClusterProvisioner's K8s Service", zap.Error(err))
return err
}
// Check if this ClusterChannelProvisioner is the owner of the K8s service.
if !metav1.IsControlledBy(svc, provisioner) {
r.logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", provisioner), zap.Any("service", svc))
}
// Update Status as Ready
provisioner.Status.MarkReady()
return nil
}