/
resource_reconciler.go
90 lines (76 loc) · 2.55 KB
/
resource_reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package common
import (
"context"
"time"
"github.com/Azure/azure-provider-external-dns-e2e/pkgResources/controller/controllername"
"github.com/Azure/azure-provider-external-dns-e2e/pkgResources/controller/metrics"
"github.com/Azure/azure-provider-external-dns-e2e/pkgResources/util"
"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type resourceReconciler struct {
name controllername.ControllerNamer
client client.Client
logger logr.Logger
interval, retryInterval time.Duration
resources []client.Object
}
// NewResourceReconciler creates a reconciler that continuously ensures that the provided resources are provisioned
func NewResourceReconciler(manager ctrl.Manager, name controllername.ControllerNamer, resources []client.Object, reconcileInterval time.Duration) error {
metrics.InitControllerMetrics(name)
rr := &resourceReconciler{
name: name,
client: manager.GetClient(),
logger: name.AddToLogger(manager.GetLogger()),
interval: reconcileInterval,
retryInterval: time.Second,
resources: resources,
}
return manager.Add(rr)
}
func (r *resourceReconciler) Start(ctx context.Context) error {
r.logger.Info("starting resource reconciler")
defer r.logger.Info("stopping resource reconciler")
interval := time.Nanosecond // run immediately when starting up
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(util.Jitter(interval, 0.3)):
}
if err := r.tick(ctx); err != nil {
r.logger.Error(err, "reconciling resources")
interval = r.retryInterval
continue
}
interval = r.interval
}
}
func (r *resourceReconciler) tick(ctx context.Context) error {
var err error
start := time.Now()
r.logger.Info("starting to reconcile resources")
defer func() {
r.logger.Info("finished reconciling resources", "latencySec", time.Since(start).Seconds())
metrics.HandleControllerReconcileMetrics(r.name, ctrl.Result{}, err)
}()
for _, res := range r.resources {
copy := res.DeepCopyObject().(client.Object)
if copy.GetDeletionTimestamp() != nil {
if err = r.client.Delete(ctx, copy); err != nil && !k8serrors.IsNotFound(err) {
r.logger.Error(err, "deleting unneeded resources")
}
continue
}
if err = util.Upsert(ctx, r.client, copy); err != nil {
r.logger.Error(err, "upserting resources")
return err
}
}
return nil
}
func (r *resourceReconciler) NeedLeaderElection() bool {
return true
}