forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
patch.go
479 lines (429 loc) · 17.4 KB
/
patch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handlers
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/evanphx/json-patch"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
)
// PatchResource returns a function that will handle a resource patch
// TODO: Eventually PatchResource should just use GuaranteedUpdate and this routine should be a bit cleaner
func PatchResource(r rest.Patcher, scope RequestScope, admit admission.Interface, converter runtime.ObjectConvertor) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// TODO: we either want to remove timeout or document it (if we
// document, move timeout out of this function and declare it in
// api_installer)
timeout := parseTimeout(req.URL.Query().Get("timeout"))
namespace, name, err := scope.Namer.Name(req)
if err != nil {
scope.err(err, w, req)
return
}
ctx := scope.ContextFunc(req)
ctx = request.WithNamespace(ctx, namespace)
versionedObj, err := converter.ConvertToVersion(r.New(), scope.Kind.GroupVersion())
if err != nil {
scope.err(err, w, req)
return
}
// TODO: handle this in negotiation
contentType := req.Header.Get("Content-Type")
// Remove "; charset=" if included in header.
if idx := strings.Index(contentType, ";"); idx > 0 {
contentType = contentType[:idx]
}
patchType := types.PatchType(contentType)
patchJS, err := readBody(req)
if err != nil {
scope.err(err, w, req)
return
}
ae := request.AuditEventFrom(ctx)
audit.LogRequestPatch(ae, patchJS)
s, ok := runtime.SerializerInfoForMediaType(scope.Serializer.SupportedMediaTypes(), runtime.ContentTypeJSON)
if !ok {
scope.err(fmt.Errorf("no serializer defined for JSON"), w, req)
return
}
gv := scope.Kind.GroupVersion()
codec := runtime.NewCodec(
scope.Serializer.EncoderForVersion(s.Serializer, gv),
scope.Serializer.DecoderToVersion(s.Serializer, schema.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal}),
)
userInfo, _ := request.UserFrom(ctx)
staticAdmissionAttributes := admission.NewAttributesRecord(nil, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo)
updateMutation := func(updatedObject runtime.Object, currentObject runtime.Object) error {
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && admit.Handles(admission.Update) {
return mutatingAdmission.Admit(admission.NewAttributesRecord(updatedObject, currentObject, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Update, userInfo))
}
return nil
}
result, err := patchResource(
ctx,
updateMutation,
rest.AdmissionToValidateObjectFunc(admit, staticAdmissionAttributes),
rest.AdmissionToValidateObjectUpdateFunc(admit, staticAdmissionAttributes),
timeout, versionedObj,
r,
name,
patchType,
patchJS,
scope.Namer, scope.Creater, scope.Defaulter, scope.UnsafeConvertor, scope.Kind, scope.Resource, codec)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
}
if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
scope.err(err, w, req)
return
}
transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
}
}
type mutateObjectUpdateFunc func(obj, old runtime.Object) error
// patchResource divides PatchResource for easier unit testing
func patchResource(
ctx request.Context,
updateMutation mutateObjectUpdateFunc,
createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc,
timeout time.Duration,
versionedObj runtime.Object,
patcher rest.Patcher,
name string,
patchType types.PatchType,
patchJS []byte,
namer ScopeNamer,
creater runtime.ObjectCreater,
defaulter runtime.ObjectDefaulter,
unsafeConvertor runtime.ObjectConvertor,
kind schema.GroupVersionKind,
resource schema.GroupVersionResource,
codec runtime.Codec,
) (runtime.Object, error) {
namespace := request.NamespaceValue(ctx)
var (
originalObjJS []byte
originalPatchedObjJS []byte
originalObjMap map[string]interface{}
getOriginalPatchMap func() (map[string]interface{}, error)
lastConflictErr error
originalResourceVersion string
)
// applyPatch is called every time GuaranteedUpdate asks for the updated object,
// and is given the currently persisted object as input.
applyPatch := func(_ request.Context, _, currentObject runtime.Object) (runtime.Object, error) {
// Make sure we actually have a persisted currentObject
if hasUID, err := hasUID(currentObject); err != nil {
return nil, err
} else if !hasUID {
return nil, errors.NewNotFound(resource.GroupResource(), name)
}
currentResourceVersion := ""
if currentMetadata, err := meta.Accessor(currentObject); err == nil {
currentResourceVersion = currentMetadata.GetResourceVersion()
}
switch {
case originalObjJS == nil && originalObjMap == nil:
// first time through,
// 1. apply the patch
// 2. save the original and patched to detect whether there were conflicting changes on retries
originalResourceVersion = currentResourceVersion
objToUpdate := patcher.New()
// For performance reasons, in case of strategicpatch, we avoid json
// marshaling and unmarshaling and operate just on map[string]interface{}.
// In case of other patch types, we still have to operate on JSON
// representations.
switch patchType {
case types.JSONPatchType, types.MergePatchType:
originalJS, patchedJS, err := patchObjectJSON(patchType, codec, currentObject, patchJS, objToUpdate, versionedObj)
if err != nil {
return nil, interpretPatchError(err)
}
originalObjJS, originalPatchedObjJS = originalJS, patchedJS
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
var originalPatchBytes []byte
getOriginalPatchMap = func() (map[string]interface{}, error) {
if originalPatchBytes == nil {
// Compute once
originalPatchBytes, err = strategicpatch.CreateTwoWayMergePatch(originalObjJS, originalPatchedObjJS, versionedObj)
if err != nil {
return nil, interpretPatchError(err)
}
}
// Return a fresh map every time
originalPatchMap := make(map[string]interface{})
if err := json.Unmarshal(originalPatchBytes, &originalPatchMap); err != nil {
return nil, errors.NewBadRequest(err.Error())
}
return originalPatchMap, nil
}
case types.StrategicMergePatchType:
// Since the patch is applied on versioned objects, we need to convert the
// current object to versioned representation first.
currentVersionedObject, err := unsafeConvertor.ConvertToVersion(currentObject, kind.GroupVersion())
if err != nil {
return nil, err
}
versionedObjToUpdate, err := creater.New(kind)
if err != nil {
return nil, err
}
// Capture the original object map and patch for possible retries.
originalMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject)
if err != nil {
return nil, err
}
if err := strategicPatchObject(codec, defaulter, currentVersionedObject, patchJS, versionedObjToUpdate, versionedObj); err != nil {
return nil, err
}
// Convert the object back to unversioned.
gvk := kind.GroupKind().WithVersion(runtime.APIVersionInternal)
unversionedObjToUpdate, err := unsafeConvertor.ConvertToVersion(versionedObjToUpdate, gvk.GroupVersion())
if err != nil {
return nil, err
}
objToUpdate = unversionedObjToUpdate
// Store unstructured representation for possible retries.
originalObjMap = originalMap
// Make a getter that can return a fresh strategic patch map if needed for conflict retries
// We have to rebuild it each time we need it, because the map gets mutated when being applied
getOriginalPatchMap = func() (map[string]interface{}, error) {
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return nil, errors.NewBadRequest(err.Error())
}
return patchMap, nil
}
}
if err := checkName(objToUpdate, name, namespace, namer); err != nil {
return nil, err
}
return objToUpdate, nil
default:
// on a conflict,
// 1. build a strategic merge patch from originalJS and the patchedJS. Different patch types can
// be specified, but a strategic merge patch should be expressive enough handle them. Build the
// patch with this type to handle those cases.
// 2. build a strategic merge patch from originalJS and the currentJS
// 3. ensure no conflicts between the two patches
// 4. apply the #1 patch to the currentJS object
// Since the patch is applied on versioned objects, we need to convert the
// current object to versioned representation first.
currentVersionedObject, err := unsafeConvertor.ConvertToVersion(currentObject, kind.GroupVersion())
if err != nil {
return nil, err
}
currentObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(currentVersionedObject)
if err != nil {
return nil, err
}
var currentPatchMap map[string]interface{}
if originalObjMap != nil {
var err error
currentPatchMap, err = strategicpatch.CreateTwoWayMergeMapPatch(originalObjMap, currentObjMap, versionedObj)
if err != nil {
return nil, interpretPatchError(err)
}
} else {
// Compute current patch.
currentObjJS, err := runtime.Encode(codec, currentObject)
if err != nil {
return nil, err
}
currentPatch, err := strategicpatch.CreateTwoWayMergePatch(originalObjJS, currentObjJS, versionedObj)
if err != nil {
return nil, interpretPatchError(err)
}
currentPatchMap = make(map[string]interface{})
if err := json.Unmarshal(currentPatch, ¤tPatchMap); err != nil {
return nil, errors.NewBadRequest(err.Error())
}
}
// Get a fresh copy of the original strategic patch each time through, since applying it mutates the map
originalPatchMap, err := getOriginalPatchMap()
if err != nil {
return nil, err
}
hasConflicts, err := mergepatch.HasConflicts(originalPatchMap, currentPatchMap)
if err != nil {
return nil, err
}
if hasConflicts {
diff1, _ := json.Marshal(currentPatchMap)
diff2, _ := json.Marshal(originalPatchMap)
patchDiffErr := fmt.Errorf("there is a meaningful conflict (firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", originalResourceVersion, currentResourceVersion, string(diff1), string(diff2))
glog.V(4).Infof("patchResource failed for resource %s, because there is a meaningful conflict(firstResourceVersion: %q, currentResourceVersion: %q):\n diff1=%v\n, diff2=%v\n", name, originalResourceVersion, currentResourceVersion, string(diff1), string(diff2))
// Return the last conflict error we got if we have one
if lastConflictErr != nil {
return nil, lastConflictErr
}
// Otherwise manufacture one of our own
return nil, errors.NewConflict(resource.GroupResource(), name, patchDiffErr)
}
versionedObjToUpdate, err := creater.New(kind)
if err != nil {
return nil, err
}
if err := applyPatchToObject(codec, defaulter, currentObjMap, originalPatchMap, versionedObjToUpdate, versionedObj); err != nil {
return nil, err
}
// Convert the object back to unversioned.
gvk := kind.GroupKind().WithVersion(runtime.APIVersionInternal)
objToUpdate, err := unsafeConvertor.ConvertToVersion(versionedObjToUpdate, gvk.GroupVersion())
if err != nil {
return nil, err
}
return objToUpdate, nil
}
}
// applyAdmission is called every time GuaranteedUpdate asks for the updated object,
// and is given the currently persisted object and the patched object as input.
applyAdmission := func(ctx request.Context, patchedObject runtime.Object, currentObject runtime.Object) (runtime.Object, error) {
return patchedObject, updateMutation(patchedObject, currentObject)
}
updatedObjectInfo := rest.DefaultUpdatedObjectInfo(nil, applyPatch, applyAdmission)
return finishRequest(timeout, func() (runtime.Object, error) {
updateObject, _, updateErr := patcher.Update(ctx, name, updatedObjectInfo, createValidation, updateValidation)
for i := 0; i < MaxRetryWhenPatchConflicts && (errors.IsConflict(updateErr)); i++ {
lastConflictErr = updateErr
updateObject, _, updateErr = patcher.Update(ctx, name, updatedObjectInfo, createValidation, updateValidation)
}
return updateObject, updateErr
})
}
// patchObjectJSON patches the <originalObject> with <patchJS> and stores
// the result in <objToUpdate>.
// Currently it also returns the original and patched objects serialized to
// JSONs (this may not be needed once we can apply patches at the
// map[string]interface{} level).
func patchObjectJSON(
patchType types.PatchType,
codec runtime.Codec,
originalObject runtime.Object,
patchJS []byte,
objToUpdate runtime.Object,
versionedObj runtime.Object,
) (originalObjJS []byte, patchedObjJS []byte, retErr error) {
js, err := runtime.Encode(codec, originalObject)
if err != nil {
return nil, nil, err
}
originalObjJS = js
switch patchType {
case types.JSONPatchType:
patchObj, err := jsonpatch.DecodePatch(patchJS)
if err != nil {
return nil, nil, err
}
if patchedObjJS, err = patchObj.Apply(originalObjJS); err != nil {
return nil, nil, err
}
case types.MergePatchType:
if patchedObjJS, err = jsonpatch.MergePatch(originalObjJS, patchJS); err != nil {
return nil, nil, err
}
case types.StrategicMergePatchType:
if patchedObjJS, err = strategicpatch.StrategicMergePatch(originalObjJS, patchJS, versionedObj); err != nil {
return nil, nil, err
}
default:
// only here as a safety net - go-restful filters content-type
return nil, nil, fmt.Errorf("unknown Content-Type header for patch: %v", patchType)
}
if err := runtime.DecodeInto(codec, patchedObjJS, objToUpdate); err != nil {
return nil, nil, err
}
return
}
// strategicPatchObject applies a strategic merge patch of <patchJS> to
// <originalObject> and stores the result in <objToUpdate>.
// It additionally returns the map[string]interface{} representation of the
// <originalObject> and <patchJS>.
// NOTE: Both <originalObject> and <objToUpdate> are supposed to be versioned.
func strategicPatchObject(
codec runtime.Codec,
defaulter runtime.ObjectDefaulter,
originalObject runtime.Object,
patchJS []byte,
objToUpdate runtime.Object,
versionedObj runtime.Object,
) error {
originalObjMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(originalObject)
if err != nil {
return err
}
patchMap := make(map[string]interface{})
if err := json.Unmarshal(patchJS, &patchMap); err != nil {
return errors.NewBadRequest(err.Error())
}
if err := applyPatchToObject(codec, defaulter, originalObjMap, patchMap, objToUpdate, versionedObj); err != nil {
return err
}
return nil
}
// applyPatchToObject applies a strategic merge patch of <patchMap> to
// <originalMap> and stores the result in <objToUpdate>.
// NOTE: <objToUpdate> must be a versioned object.
func applyPatchToObject(
codec runtime.Codec,
defaulter runtime.ObjectDefaulter,
originalMap map[string]interface{},
patchMap map[string]interface{},
objToUpdate runtime.Object,
versionedObj runtime.Object,
) error {
patchedObjMap, err := strategicpatch.StrategicMergeMapPatch(originalMap, patchMap, versionedObj)
if err != nil {
return interpretPatchError(err)
}
// Rather than serialize the patched map to JSON, then decode it to an object, we go directly from a map to an object
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(patchedObjMap, objToUpdate); err != nil {
return err
}
// Decoding from JSON to a versioned object would apply defaults, so we do the same here
defaulter.Default(objToUpdate)
return nil
}
// interpretPatchError interprets the error type and returns an error with appropriate HTTP code.
func interpretPatchError(err error) error {
switch err {
case mergepatch.ErrBadJSONDoc, mergepatch.ErrBadPatchFormatForPrimitiveList, mergepatch.ErrBadPatchFormatForRetainKeys, mergepatch.ErrBadPatchFormatForSetElementOrderList, mergepatch.ErrUnsupportedStrategicMergePatchFormat:
return errors.NewBadRequest(err.Error())
case mergepatch.ErrNoListOfLists, mergepatch.ErrPatchContentNotMatchRetainKeys:
return errors.NewGenericServerResponse(http.StatusUnprocessableEntity, "", schema.GroupResource{}, "", err.Error(), 0, false)
default:
return err
}
}