-
Notifications
You must be signed in to change notification settings - Fork 458
/
add.go
208 lines (186 loc) · 8.39 KB
/
add.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Copyright (c) 2020 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package managedresource
import (
"context"
"fmt"
"time"
gardenerconstantsv1beta1 "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants"
resourcesv1alpha1 "github.com/gardener/gardener/pkg/apis/resources/v1alpha1"
"github.com/gardener/gardener/pkg/controllerutils/mapper"
predicateutils "github.com/gardener/gardener/pkg/controllerutils/predicate"
reconcilerutils "github.com/gardener/gardener/pkg/controllerutils/reconciler"
managerpredicate "github.com/gardener/gardener/pkg/resourcemanager/predicate"
"github.com/go-logr/logr"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// ControllerName is the name of the managedresource controller.
const ControllerName = "resource"
// defaultControllerConfig is the default config for the controller.
var defaultControllerConfig ControllerConfig
// ControllerOptions are options for adding the controller to a Manager.
type ControllerOptions struct {
maxConcurrentWorkers int
syncPeriod time.Duration
resourceClass string
alwaysUpdate bool
clusterID string
}
// ControllerConfig is the completed configuration for the controller.
type ControllerConfig struct {
MaxConcurrentWorkers int
SyncPeriod time.Duration
ClassFilter *managerpredicate.ClassFilter
AlwaysUpdate bool
ClusterID string
GarbageCollectorActivated bool
TargetCluster cluster.Cluster
// only used for testing, defaults to 5 seconds
RequeueAfterOnDeletionPending time.Duration
}
// AddToManagerWithOptions adds the controller to a Manager with the given config.
func AddToManagerWithOptions(mgr manager.Manager, conf ControllerConfig) error {
mgr.GetLogger().Info("Using cluster id", "clusterID", conf.ClusterID)
c, err := controller.New(ControllerName, mgr, controller.Options{
MaxConcurrentReconciles: conf.MaxConcurrentWorkers,
Reconciler: reconcilerutils.OperationAnnotationWrapper(
func() client.Object { return &resourcesv1alpha1.ManagedResource{} },
&reconciler{
targetClient: conf.TargetCluster.GetClient(),
targetRESTMapper: conf.TargetCluster.GetRESTMapper(),
targetScheme: conf.TargetCluster.GetScheme(),
class: conf.ClassFilter,
alwaysUpdate: conf.AlwaysUpdate,
syncPeriod: conf.SyncPeriod,
clusterID: conf.ClusterID,
garbageCollectorActivated: conf.GarbageCollectorActivated,
requeueAfterOnDeletionPending: conf.RequeueAfterOnDeletionPending,
},
),
RecoverPanic: true,
})
if err != nil {
return fmt.Errorf("unable to set up managedresource controller: %w", err)
}
if err := c.Watch(
&source.Kind{Type: &resourcesv1alpha1.ManagedResource{}},
&handler.EnqueueRequestForObject{},
conf.ClassFilter,
predicate.Or(
predicate.GenerationChangedPredicate{},
managerpredicate.HasOperationAnnotation(),
managerpredicate.ConditionStatusChanged(resourcesv1alpha1.ResourcesHealthy, managerpredicate.ConditionChangedToUnhealthy),
managerpredicate.NoLongerIgnored(),
// we need to reconcile once if the ManagedResource got marked as ignored in order to update the conditions
managerpredicate.GotMarkedAsIgnored(),
),
// TODO: refactor this predicate chain into a single predicate.Funcs that can be properly tested as a whole
predicate.Or(
// Added again here, as otherwise NotIgnored would filter this add/update event out
managerpredicate.GotMarkedAsIgnored(),
managerpredicate.NotIgnored(),
predicateutils.IsDeleting(),
),
); err != nil {
return fmt.Errorf("unable to watch ManagedResources: %w", err)
}
if err := c.Watch(
&source.Kind{Type: &corev1.Secret{}},
mapper.EnqueueRequestsFrom(SecretToManagedResourceMapper(conf.ClassFilter, predicate.Or(
managerpredicate.NotIgnored(),
predicateutils.IsDeleting(),
)), mapper.UpdateWithOldAndNew),
); err != nil {
return fmt.Errorf("unable to watch Secrets mapping to ManagedResources: %w", err)
}
return nil
}
// AddToManager adds the controller to a Manager using the default config.
func AddToManager(mgr manager.Manager) error {
return AddToManagerWithOptions(mgr, defaultControllerConfig)
}
// AddFlags adds the needed command line flags to the given FlagSet.
func (o *ControllerOptions) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.maxConcurrentWorkers, "max-concurrent-workers", 10, "number of worker threads for concurrent reconciliation of resources")
fs.DurationVar(&o.syncPeriod, "sync-period", time.Minute, "duration how often existing resources should be synced")
fs.StringVar(&o.resourceClass, "resource-class", managerpredicate.DefaultClass, "resource class used to filter resource resources")
fs.StringVar(&o.clusterID, "cluster-id", "", "optional cluster id for source cluster")
fs.BoolVar(&o.alwaysUpdate, "always-update", false, "if set to false then a resource will only be updated if its desired state differs from the actual state. otherwise, an update request will be always sent.")
}
// Complete completes the given command line flags and set the defaultControllerConfig accordingly.
func (o *ControllerOptions) Complete() error {
if o.resourceClass == "" {
o.resourceClass = managerpredicate.DefaultClass
}
defaultControllerConfig = ControllerConfig{
MaxConcurrentWorkers: o.maxConcurrentWorkers,
SyncPeriod: o.syncPeriod,
ClassFilter: managerpredicate.NewClassFilter(o.resourceClass),
AlwaysUpdate: o.alwaysUpdate,
ClusterID: o.clusterID,
RequeueAfterOnDeletionPending: 5 * time.Second,
}
return nil
}
// Completed returns the completed ControllerConfig.
func (o *ControllerOptions) Completed() *ControllerConfig {
return &defaultControllerConfig
}
// ApplyDefaultClusterId sets the cluster id according to a dedicated cluster access
func (c *ControllerConfig) ApplyDefaultClusterId(ctx context.Context, log logr.Logger, restcfg *rest.Config) error {
if c.ClusterID == "<cluster>" || c.ClusterID == "<default>" {
log.Info("Trying to get cluster id from cluster")
tmpClient, err := client.New(restcfg, client.Options{})
if err == nil {
c.ClusterID, err = determineClusterIdentity(ctx, tmpClient, c.ClusterID == "<cluster>")
}
if err != nil {
return fmt.Errorf("unable to determine cluster id: %+v", err)
}
}
return nil
}
// determineClusterIdentity is used to extract the cluster identity from the cluster-identity
// config map. This is intended as fallback if no explicit cluster identity is given.
// in seed-shoot scenario, the cluster id for the managed resources must be explicitly given
// to support the migration of a shoot from one seed to another. Here the identity `seed` should
// be set.
func determineClusterIdentity(ctx context.Context, c client.Client, force bool) (string, error) {
cm := corev1.ConfigMap{}
err := c.Get(ctx, client.ObjectKey{Name: gardenerconstantsv1beta1.ClusterIdentity, Namespace: metav1.NamespaceSystem}, &cm)
if err == nil {
if id, ok := cm.Data[gardenerconstantsv1beta1.ClusterIdentity]; ok {
return id, nil
}
if force {
return "", fmt.Errorf("cannot determine cluster identity from configmap: no cluster-identity entry ")
}
} else {
if force || !apierrors.IsNotFound(err) {
return "", fmt.Errorf("cannot determine cluster identity from configmap: %s", err)
}
}
return "", nil
}