/
controller.go
249 lines (219 loc) · 9.26 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and Gardener contributors.
//
// SPDX-License-Identifier: Apache-2.0
package controller
import (
"context"
"fmt"
"github.com/gardener/landscaper/controller-utils/pkg/logging"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/gardener/k8syncer/pkg/config"
"github.com/gardener/k8syncer/pkg/persist"
"github.com/gardener/k8syncer/pkg/persist/transformers"
"github.com/gardener/k8syncer/pkg/state"
"github.com/gardener/k8syncer/pkg/utils"
"github.com/gardener/k8syncer/pkg/utils/constants"
)
var basicTransformer = transformers.NewBasic() // will probably be configurable somehow in the future
type Controller struct {
Client client.Client
Config *config.K8SyncerConfiguration
SyncConfig *config.SyncConfig
StorageConfigs []*StorageConfiguration
GVK schema.GroupVersionKind
StateDisplay state.StateDisplay
}
// StorageConfiguration is a helper struct to bundle a storage reference with its definition.
type StorageConfiguration struct {
*config.StorageReference
*config.StorageDefinition
Persister persist.Persister
Transformer persist.Transformer
}
func (sc *StorageConfiguration) Name() string {
// doesn't matter whether sc.StorageReference.Name or sc.StorageDefinition.Name is returned, they should always be identical
return sc.StorageReference.Name
}
func NewController(client client.Client, cfg *config.K8SyncerConfiguration, syncConfig *config.SyncConfig, persisters map[string]persist.Persister) (*Controller, error) {
ctrl := &Controller{
Client: client,
Config: cfg,
SyncConfig: syncConfig,
}
// set GVK
ctrl.GVK = schema.GroupVersionKind{
Group: syncConfig.Resource.Group,
Version: syncConfig.Resource.Version,
Kind: syncConfig.Resource.Kind,
}
// configure state display, if any
if syncConfig.State != nil && syncConfig.State.Type != config.STATE_TYPE_NONE {
sdCfg := syncConfig.State
switch sdCfg.Type {
case config.STATE_TYPE_ANNOTATION:
ctrl.StateDisplay = state.NewAnnotationStateDisplay(state.StateVerbosity(sdCfg.Verbosity))
case config.STATE_TYPE_STATUS:
stCfg := sdCfg.StatusStateConfig
if stCfg == nil {
// should be prevented by validation
return nil, fmt.Errorf("missing state configuration for state type '%s' in sync configuration with id %s", string(syncConfig.State.Type), syncConfig.ID)
}
ctrl.StateDisplay = state.NewStatusStateDisplay(stCfg.GenerationPath, stCfg.PhasePath, stCfg.DetailPath, state.StateVerbosity(sdCfg.Verbosity))
default:
// should not happen, as this check is already part of the config validation
return nil, fmt.Errorf("unknown state type '%s' in sync configuration with id %s", string(syncConfig.State.Type), syncConfig.ID)
}
}
// build storage configurations
ctrl.StorageConfigs = make([]*StorageConfiguration, len(syncConfig.StorageRefs))
for idx, stRef := range syncConfig.StorageRefs {
var stCfg *StorageConfiguration
found := false
for _, stDef := range cfg.StorageDefinitions {
if stDef.Name == stRef.Name {
found = true
stCfg = &StorageConfiguration{stRef, stDef, persisters[stDef.Name], basicTransformer}
break
}
}
if !found {
// should not happen, as this check is already part of the config validation
return nil, fmt.Errorf("unable to find storage definition '%s', which is referenced at index %d in sync configuration with id %s", syncConfig.StorageRefs[idx].Name, idx, syncConfig.ID)
}
ctrl.StorageConfigs[idx] = stCfg
}
return ctrl, nil
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := logging.FromContextOrDiscard(ctx).WithValues(constants.Logging.KEY_RESOURCE_NAME, req.Name, constants.Logging.KEY_RESOURCE_NAMESPACE, req.Namespace)
ctx = logging.NewContext(ctx, log)
log.Info("Starting reconcile")
obj := &unstructured.Unstructured{}
obj.SetName(req.Name)
obj.SetNamespace(req.Namespace)
obj.SetGroupVersionKind(c.GVK)
err := c.Client.Get(ctx, client.ObjectKeyFromObject(obj), obj)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, c.handleDelete(ctx, obj)
}
return reconcile.Result{}, fmt.Errorf("error fetching resource from cluster: %w", err)
}
if del := obj.GetDeletionTimestamp(); del != nil && !del.IsZero() {
return reconcile.Result{}, c.handleDelete(ctx, obj)
}
return reconcile.Result{}, c.handleCreateOrUpdate(ctx, obj)
}
func (c *Controller) handleCreateOrUpdate(ctx context.Context, obj *unstructured.Unstructured) error {
log := logging.FromContextOrDiscard(ctx)
log.Info("Handling creation or update")
// add finalizer, if needed
if c.SyncConfig.Finalize != nil && *c.SyncConfig.Finalize && !utils.HasFinalizer(obj) {
err := c.updateWithRetry(ctx, obj, func(obj *unstructured.Unstructured) (sets.Set[string], error) {
utils.AddFinalizer(obj)
return sets.New[string]("metadata"), nil
}, retryLimit)
if err != nil {
errMsg := "error adding finalizer"
log.Error(err, errMsg)
errs := utils.NewErrorList(fmt.Errorf("%s: %w", errMsg, err))
err2 := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_ERROR, state.STATE_FIELD_DETAIL, errs.Aggregate().Error())
errs.Append(err2)
return errs.Aggregate()
}
}
// if state display with phase is configured, update phase to progressing
err := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_PROGRESSING, state.STATE_FIELD_DETAIL, "")
if err != nil {
return err
}
for _, storage := range c.StorageConfigs {
curLog := log.WithValues(constants.Logging.KEY_RESOURCE_STORAGE_ID, storage.Name())
curCtx := logging.NewContext(ctx, curLog)
// persist changes
_, changed, err := storage.Persister.Persist(curCtx, obj, storage.Transformer, storage.SubPath)
if err != nil {
errMsg := "error while persisting resource"
curLog.Error(err, errMsg)
errs := utils.NewErrorList(fmt.Errorf("[%s] %s: %w", storage.Name(), errMsg, err))
err2 := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_ERROR, state.STATE_FIELD_DETAIL, errs.Aggregate().Error())
errs.Append(err2)
return errs.Aggregate()
}
// if corresponding resource exists in storage
if !changed {
curLog.Debug("No relevant fields have changed, resource has not been updated in storage")
}
}
err = c.updateStateOnResource(ctx, obj, state.STATE_FIELD_LAST_SYNCED_GENERATION, obj.GetGeneration(), state.STATE_FIELD_PHASE, state.PHASE_FINISHED, state.STATE_FIELD_DETAIL, "")
if err != nil {
return err
}
return nil
}
func (c *Controller) handleDelete(ctx context.Context, obj *unstructured.Unstructured) error {
log := logging.FromContextOrDiscard(ctx)
log.Info("Handling deletion")
hasFinalizer := utils.HasFinalizer(obj)
if hasFinalizer {
// only update state if there is a finalizer on the resource, otherwise it could be gone before the state can be written
err := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_DELETING, state.STATE_FIELD_DETAIL, "")
if err != nil {
return err
}
}
for _, storage := range c.StorageConfigs {
curLog := log.WithValues(constants.Logging.KEY_RESOURCE_STORAGE_ID, storage.Name())
curCtx := logging.NewContext(ctx, curLog)
exists, err := storage.Persister.Exists(curCtx, obj.GetName(), obj.GetNamespace(), c.GVK, storage.SubPath)
if err != nil {
errMsg := "error while checking for data existence"
curLog.Error(err, errMsg)
errs := utils.NewErrorList(fmt.Errorf("%s: %w", errMsg, err))
if hasFinalizer {
err2 := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_ERROR_DELETING, state.STATE_FIELD_DETAIL, errs.Aggregate().Error())
errs.Append(err2)
}
return errs.Aggregate()
}
if exists {
err = storage.Persister.Delete(curCtx, obj.GetName(), obj.GetNamespace(), c.GVK, storage.SubPath)
if err != nil {
errMsg := "error while deleting data"
curLog.Error(err, errMsg)
errs := utils.NewErrorList(fmt.Errorf("%s: %w", errMsg, err))
if hasFinalizer {
err2 := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_ERROR_DELETING, state.STATE_FIELD_DETAIL, errs.Aggregate().Error())
errs.Append(err2)
}
return errs.Aggregate()
}
} else {
curLog.Debug("No data found for current resource")
}
}
// remove finalizer if any
if hasFinalizer {
err := c.updateWithRetry(ctx, obj, func(obj *unstructured.Unstructured) (sets.Set[string], error) {
utils.RemoveFinalizer(obj)
return sets.New[string]("metadata"), nil
}, retryLimit)
if err != nil {
errMsg := "error removing finalizer"
log.Error(err, errMsg)
errs := utils.NewErrorList(fmt.Errorf("%s: %w", errMsg, err))
err2 := c.updateStateOnResource(ctx, obj, state.STATE_FIELD_PHASE, state.PHASE_ERROR, state.STATE_FIELD_DETAIL, errs.Aggregate().Error())
errs.Append(err2)
return errs.Aggregate()
}
}
return nil
}