forked from k3s-io/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
customresource_handler.go
899 lines (779 loc) · 30.9 KB
/
customresource_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"net/http"
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/go-openapi/spec"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/validate"
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/scale"
"k8s.io/client-go/scale/scheme/autoscalingv1"
"k8s.io/client-go/tools/cache"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apiextensions-apiserver/pkg/apiserver/conversion"
apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
informers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion/apiextensions/internalversion"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
"k8s.io/apiextensions-apiserver/pkg/controller/establish"
"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
"k8s.io/apiextensions-apiserver/pkg/crdserverscheme"
apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
"k8s.io/apiextensions-apiserver/pkg/registry/customresource/tableconvertor"
)
// crdHandler serves the `/apis` endpoint.
// This is registered as a filter so that it never collides with any explicitly registered endpoints
type crdHandler struct {
versionDiscoveryHandler *versionDiscoveryHandler
groupDiscoveryHandler *groupDiscoveryHandler
customStorageLock sync.Mutex
// customStorage contains a crdStorageMap
// atomic.Value has a very good read performance compared to sync.RWMutex
// see https://gist.github.com/dim/152e6bf80e1384ea72e17ac717a5000a
// which is suited for most read and rarely write cases
customStorage atomic.Value
crdLister listers.CustomResourceDefinitionLister
delegate http.Handler
restOptionsGetter generic.RESTOptionsGetter
admission admission.Interface
establishingController *establish.EstablishingController
// MasterCount is used to implement sleep to improve
// CRD establishing process for HA clusters.
masterCount int
}
// crdInfo stores enough information to serve the storage for the custom resource
type crdInfo struct {
// spec and acceptedNames are used to compare against if a change is made on a CRD. We only update
// the storage if one of these changes.
spec *apiextensions.CustomResourceDefinitionSpec
acceptedNames *apiextensions.CustomResourceDefinitionNames
// Storage per version
storages map[string]customresource.CustomResourceStorage
// Request scope per version
requestScopes map[string]handlers.RequestScope
// Scale scope per version
scaleRequestScopes map[string]handlers.RequestScope
// Status scope per version
statusRequestScopes map[string]handlers.RequestScope
// storageVersion is the CRD version used when storing the object in etcd.
storageVersion string
}
// crdStorageMap goes from customresourcedefinition to its storage
type crdStorageMap map[types.UID]*crdInfo
func NewCustomResourceDefinitionHandler(
versionDiscoveryHandler *versionDiscoveryHandler,
groupDiscoveryHandler *groupDiscoveryHandler,
crdInformer informers.CustomResourceDefinitionInformer,
delegate http.Handler,
restOptionsGetter generic.RESTOptionsGetter,
admission admission.Interface,
establishingController *establish.EstablishingController,
masterCount int) *crdHandler {
ret := &crdHandler{
versionDiscoveryHandler: versionDiscoveryHandler,
groupDiscoveryHandler: groupDiscoveryHandler,
customStorage: atomic.Value{},
crdLister: crdInformer.Lister(),
delegate: delegate,
restOptionsGetter: restOptionsGetter,
admission: admission,
establishingController: establishingController,
masterCount: masterCount,
}
crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: ret.updateCustomResourceDefinition,
DeleteFunc: func(obj interface{}) {
ret.removeDeadStorage()
},
})
ret.customStorage.Store(crdStorageMap{})
return ret
}
func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
return
}
if !requestInfo.IsResourceRequest {
pathParts := splitPath(requestInfo.Path)
// only match /apis/<group>/<version>
// only registered under /apis
if len(pathParts) == 3 {
r.versionDiscoveryHandler.ServeHTTP(w, req)
return
}
// only match /apis/<group>
if len(pathParts) == 2 {
r.groupDiscoveryHandler.ServeHTTP(w, req)
return
}
r.delegate.ServeHTTP(w, req)
return
}
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
if apierrors.IsNotFound(err) {
r.delegate.ServeHTTP(w, req)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !apiextensions.HasServedCRDVersion(crd, requestInfo.APIVersion) {
r.delegate.ServeHTTP(w, req)
return
}
// There is a small chance that a CRD is being served because NamesAccepted condition is true,
// but it becomes "unserved" because another names update leads to a conflict
// and EstablishingController wasn't fast enough to put the CRD into the Established condition.
// We accept this as the problem is small and self-healing.
if !apiextensions.IsCRDConditionTrue(crd, apiextensions.NamesAccepted) &&
!apiextensions.IsCRDConditionTrue(crd, apiextensions.Established) {
r.delegate.ServeHTTP(w, req)
return
}
terminating := apiextensions.IsCRDConditionTrue(crd, apiextensions.Terminating)
crdInfo, err := r.getOrCreateServingInfoFor(crd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
verb := strings.ToUpper(requestInfo.Verb)
resource := requestInfo.Resource
subresource := requestInfo.Subresource
scope := metrics.CleanScope(requestInfo)
supportedTypes := []string{
string(types.JSONPatchType),
string(types.MergePatchType),
}
var handler http.HandlerFunc
switch {
case subresource == "status" && crd.Spec.Subresources != nil && crd.Spec.Subresources.Status != nil:
handler = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case subresource == "scale" && crd.Spec.Subresources != nil && crd.Spec.Subresources.Scale != nil:
handler = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
case len(subresource) == 0:
handler = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
default:
http.Error(w, "the server could not find the requested resource", http.StatusNotFound)
}
if handler != nil {
handler = metrics.InstrumentHandlerFunc(verb, resource, subresource, scope, handler)
handler(w, req)
return
}
}
func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
requestScope := crdInfo.requestScopes[requestInfo.APIVersion]
storage := crdInfo.storages[requestInfo.APIVersion].CustomResource
minRequestTimeout := 1 * time.Minute
switch requestInfo.Verb {
case "get":
return handlers.GetResource(storage, storage, requestScope)
case "list":
forceWatch := false
return handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
case "watch":
forceWatch := true
return handlers.ListResource(storage, storage, requestScope, forceWatch, minRequestTimeout)
case "create":
if terminating {
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
return nil
}
return handlers.CreateResource(storage, requestScope, r.admission)
case "update":
return handlers.UpdateResource(storage, requestScope, r.admission)
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
case "delete":
allowsOptions := true
return handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
case "deletecollection":
checkBody := true
return handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
return nil
}
}
func (r *crdHandler) serveStatus(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
requestScope := crdInfo.statusRequestScopes[requestInfo.APIVersion]
storage := crdInfo.storages[requestInfo.APIVersion].Status
switch requestInfo.Verb {
case "get":
return handlers.GetResource(storage, nil, requestScope)
case "update":
return handlers.UpdateResource(storage, requestScope, r.admission)
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
return nil
}
}
func (r *crdHandler) serveScale(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
requestScope := crdInfo.scaleRequestScopes[requestInfo.APIVersion]
storage := crdInfo.storages[requestInfo.APIVersion].Scale
switch requestInfo.Verb {
case "get":
return handlers.GetResource(storage, nil, requestScope)
case "update":
return handlers.UpdateResource(storage, requestScope, r.admission)
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
return nil
}
}
func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
oldCRD := oldObj.(*apiextensions.CustomResourceDefinition)
newCRD := newObj.(*apiextensions.CustomResourceDefinition)
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
// Add CRD to the establishing controller queue.
// For HA clusters, we want to prevent race conditions when changing status to Established,
// so we want to be sure that CRD is Installing at least for 5 seconds before Establishing it.
// TODO: find a real HA safe checkpointing mechanism instead of an arbitrary wait.
if !apiextensions.IsCRDConditionTrue(newCRD, apiextensions.Established) &&
apiextensions.IsCRDConditionTrue(newCRD, apiextensions.NamesAccepted) {
if r.masterCount > 1 {
r.establishingController.QueueCRD(newCRD.Name, 5*time.Second)
} else {
r.establishingController.QueueCRD(newCRD.Name, 0)
}
}
storageMap := r.customStorage.Load().(crdStorageMap)
oldInfo, found := storageMap[newCRD.UID]
if !found {
return
}
if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
glog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
return
}
glog.V(4).Infof("Updating customresourcedefinition %s", oldCRD.Name)
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()
if oldInfo, ok := storageMap2[types.UID(oldCRD.UID)]; ok {
for _, storage := range oldInfo.storages {
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
storage.CustomResource.DestroyFunc()
}
delete(storageMap2, types.UID(oldCRD.UID))
}
r.customStorage.Store(storageMap2)
}
// removeDeadStorage removes REST storage that isn't being used
func (r *crdHandler) removeDeadStorage() {
allCustomResourceDefinitions, err := r.crdLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
storageMap := r.customStorage.Load().(crdStorageMap)
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere
storageMap2 := storageMap.clone()
for uid, s := range storageMap2 {
found := false
for _, crd := range allCustomResourceDefinitions {
if crd.UID == uid {
found = true
break
}
}
if !found {
glog.V(4).Infof("Removing dead CRD storage for %s/%s", s.spec.Group, s.spec.Names.Kind)
for _, storage := range s.storages {
// destroy only the main storage. Those for the subresources share cacher and etcd clients.
storage.CustomResource.DestroyFunc()
}
delete(storageMap2, uid)
}
}
r.customStorage.Store(storageMap2)
}
// GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
// the given crd.
func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensions.CustomResourceDefinition) (finalizer.ListerCollectionDeleter, error) {
info, err := r.getOrCreateServingInfoFor(crd)
if err != nil {
return nil, err
}
return info.storages[info.storageVersion].CustomResource, nil
}
var swaggerMetadataDescriptions = metav1.ObjectMeta{}.SwaggerDoc()
func (r *crdHandler) getOrCreateServingInfoFor(crd *apiextensions.CustomResourceDefinition) (*crdInfo, error) {
storageMap := r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[crd.UID]; ok {
return ret, nil
}
r.customStorageLock.Lock()
defer r.customStorageLock.Unlock()
storageMap = r.customStorage.Load().(crdStorageMap)
if ret, ok := storageMap[crd.UID]; ok {
return ret, nil
}
storageVersion, err := apiextensions.GetCRDStorageVersion(crd)
if err != nil {
return nil, err
}
// Scope/Storages per version.
requestScopes := map[string]handlers.RequestScope{}
storages := map[string]customresource.CustomResourceStorage{}
statusScopes := map[string]handlers.RequestScope{}
scaleScopes := map[string]handlers.RequestScope{}
for _, v := range crd.Spec.Versions {
safeConverter, unsafeConverter := conversion.NewCRDConverter(crd)
// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
// decode unversioned Options objects, so we delegate to parameterScheme for such types.
parameterScheme := runtime.NewScheme()
parameterScheme.AddUnversionedTypes(schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
&metav1.ListOptions{},
&metav1.ExportOptions{},
&metav1.GetOptions{},
&metav1.DeleteOptions{},
)
parameterCodec := runtime.NewParameterCodec(parameterScheme)
kind := schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.Kind}
typer := newUnstructuredObjectTyper(parameterScheme)
creator := unstructuredCreator{}
validator, _, err := apiservervalidation.NewSchemaValidator(crd.Spec.Validation)
if err != nil {
return nil, err
}
var statusSpec *apiextensions.CustomResourceSubresourceStatus
var statusValidator *validate.SchemaValidator
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceSubresources) && crd.Spec.Subresources != nil && crd.Spec.Subresources.Status != nil {
statusSpec = crd.Spec.Subresources.Status
// for the status subresource, validate only against the status schema
if crd.Spec.Validation != nil && crd.Spec.Validation.OpenAPIV3Schema != nil && crd.Spec.Validation.OpenAPIV3Schema.Properties != nil {
if statusSchema, ok := crd.Spec.Validation.OpenAPIV3Schema.Properties["status"]; ok {
openapiSchema := &spec.Schema{}
if err := apiservervalidation.ConvertJSONSchemaProps(&statusSchema, openapiSchema); err != nil {
return nil, err
}
statusValidator = validate.NewSchemaValidator(openapiSchema, nil, "", strfmt.Default)
}
}
}
var scaleSpec *apiextensions.CustomResourceSubresourceScale
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceSubresources) && crd.Spec.Subresources != nil && crd.Spec.Subresources.Scale != nil {
scaleSpec = crd.Spec.Subresources.Scale
}
table, err := tableconvertor.New(crd.Spec.AdditionalPrinterColumns)
if err != nil {
glog.V(2).Infof("The CRD for %v has an invalid printer specification, falling back to default printing: %v", kind, err)
}
storages[v.Name] = customresource.NewStorage(
schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Status.AcceptedNames.Plural},
schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.Kind},
schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.ListKind},
customresource.NewStrategy(
typer,
crd.Spec.Scope == apiextensions.NamespaceScoped,
kind,
validator,
statusValidator,
statusSpec,
scaleSpec,
),
crdConversionRESTOptionsGetter{
RESTOptionsGetter: r.restOptionsGetter,
converter: safeConverter,
decoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
encoderVersion: schema.GroupVersion{Group: crd.Spec.Group, Version: storageVersion},
},
crd.Status.AcceptedNames.Categories,
table,
)
selfLinkPrefix := ""
switch crd.Spec.Scope {
case apiextensions.ClusterScoped:
selfLinkPrefix = "/" + path.Join("apis", crd.Spec.Group, v.Name) + "/" + crd.Status.AcceptedNames.Plural + "/"
case apiextensions.NamespaceScoped:
selfLinkPrefix = "/" + path.Join("apis", crd.Spec.Group, v.Name, "namespaces") + "/"
}
clusterScoped := crd.Spec.Scope == apiextensions.ClusterScoped
requestScopes[v.Name] = handlers.RequestScope{
Namer: handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
},
Serializer: unstructuredNegotiatedSerializer{typer: typer, creator: creator, converter: safeConverter},
ParameterCodec: parameterCodec,
Creater: creator,
Convertor: safeConverter,
Defaulter: unstructuredDefaulter{parameterScheme},
Typer: typer,
UnsafeConvertor: unsafeConverter,
Resource: schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural},
Kind: kind,
// a handler for a specific group-version of a custom resource uses that version as the in-memory representation
HubGroupVersion: kind.GroupVersion(),
MetaGroupVersion: metav1.SchemeGroupVersion,
TableConvertor: storages[v.Name].CustomResource,
}
// override scaleSpec subresource values
// shallow copy
scaleScope := requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/scale",
}
scaleScopes[v.Name] = scaleScope
// override status subresource values
// shallow copy
statusScope := requestScopes[v.Name]
statusScope.Subresource = "status"
statusScope.Namer = handlers.ContextBasedNaming{
SelfLinker: meta.NewAccessor(),
ClusterScoped: clusterScoped,
SelfLinkPathPrefix: selfLinkPrefix,
SelfLinkPathSuffix: "/status",
}
statusScopes[v.Name] = statusScope
}
ret := &crdInfo{
spec: &crd.Spec,
acceptedNames: &crd.Status.AcceptedNames,
storages: storages,
requestScopes: requestScopes,
scaleRequestScopes: scaleScopes,
statusRequestScopes: statusScopes,
storageVersion: storageVersion,
}
// Copy because we cannot write to storageMap without a race
// as it is used without locking elsewhere.
storageMap2 := storageMap.clone()
storageMap2[crd.UID] = ret
r.customStorage.Store(storageMap2)
return ret, nil
}
type unstructuredNegotiatedSerializer struct {
typer runtime.ObjectTyper
creator runtime.ObjectCreater
converter runtime.ObjectConvertor
}
func (s unstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
return []runtime.SerializerInfo{
{
MediaType: "application/json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, true),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, s.creator, s.typer, false),
Framer: json.Framer,
},
},
{
MediaType: "application/yaml",
EncodesAsText: true,
Serializer: json.NewYAMLSerializer(json.DefaultMetaFactory, s.creator, s.typer),
},
}
}
func (s unstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return versioning.NewCodec(encoder, nil, s.converter, Scheme, Scheme, Scheme, gv, nil, "crdNegotiatedSerializer")
}
func (s unstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
d := schemaCoercingDecoder{delegate: decoder, validator: unstructuredSchemaCoercer{}}
return versioning.NewDefaultingCodecForScheme(Scheme, nil, d, nil, gv)
}
type UnstructuredObjectTyper struct {
Delegate runtime.ObjectTyper
UnstructuredTyper runtime.ObjectTyper
}
func newUnstructuredObjectTyper(Delegate runtime.ObjectTyper) UnstructuredObjectTyper {
return UnstructuredObjectTyper{
Delegate: Delegate,
UnstructuredTyper: crdserverscheme.NewUnstructuredObjectTyper(),
}
}
func (t UnstructuredObjectTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
// Delegate for things other than Unstructured.
if _, ok := obj.(runtime.Unstructured); !ok {
return t.Delegate.ObjectKinds(obj)
}
return t.UnstructuredTyper.ObjectKinds(obj)
}
func (t UnstructuredObjectTyper) Recognizes(gvk schema.GroupVersionKind) bool {
return t.Delegate.Recognizes(gvk) || t.UnstructuredTyper.Recognizes(gvk)
}
type unstructuredCreator struct{}
func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) {
ret := &unstructured.Unstructured{}
ret.SetGroupVersionKind(kind)
return ret, nil
}
type unstructuredDefaulter struct {
delegate runtime.ObjectDefaulter
}
func (d unstructuredDefaulter) Default(in runtime.Object) {
// Delegate for things other than Unstructured.
if _, ok := in.(runtime.Unstructured); !ok {
d.delegate.Default(in)
}
}
type CRDRESTOptionsGetter struct {
StorageConfig storagebackend.Config
StoragePrefix string
EnableWatchCache bool
DefaultWatchCacheSize int
EnableGarbageCollection bool
DeleteCollectionWorkers int
CountMetricPollPeriod time.Duration
}
func (t CRDRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &t.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: t.EnableGarbageCollection,
DeleteCollectionWorkers: t.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: t.CountMetricPollPeriod,
}
if t.EnableWatchCache {
ret.Decorator = genericregistry.StorageWithCacher(t.DefaultWatchCacheSize)
}
return ret, nil
}
// clone returns a clone of the provided crdStorageMap.
// The clone is a shallow copy of the map.
func (in crdStorageMap) clone() crdStorageMap {
if in == nil {
return nil
}
out := make(crdStorageMap, len(in))
for key, value := range in {
out[key] = value
}
return out
}
// crdConversionRESTOptionsGetter overrides the codec with one using the
// provided custom converter and custom encoder and decoder version.
type crdConversionRESTOptionsGetter struct {
generic.RESTOptionsGetter
converter runtime.ObjectConvertor
encoderVersion schema.GroupVersion
decoderVersion schema.GroupVersion
}
func (t crdConversionRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret, err := t.RESTOptionsGetter.GetRESTOptions(resource)
if err == nil {
d := schemaCoercingDecoder{delegate: ret.StorageConfig.Codec, validator: unstructuredSchemaCoercer{
// drop invalid fields while decoding old CRs (before we had any ObjectMeta validation)
dropInvalidMetadata: true,
}}
c := schemaCoercingConverter{delegate: t.converter, validator: unstructuredSchemaCoercer{}}
ret.StorageConfig.Codec = versioning.NewCodec(
ret.StorageConfig.Codec,
d,
c,
&unstructuredCreator{},
crdserverscheme.NewUnstructuredObjectTyper(),
&unstructuredDefaulter{delegate: Scheme},
t.encoderVersion,
t.decoderVersion,
"crdRESTOptions",
)
}
return ret, err
}
// schemaCoercingDecoder calls the delegate decoder, and then applies the Unstructured schema validator
// to coerce the schema.
type schemaCoercingDecoder struct {
delegate runtime.Decoder
validator unstructuredSchemaCoercer
}
var _ runtime.Decoder = schemaCoercingDecoder{}
func (d schemaCoercingDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
obj, gvk, err := d.delegate.Decode(data, defaults, into)
if err != nil {
return nil, gvk, err
}
if u, ok := obj.(*unstructured.Unstructured); ok {
if err := d.validator.apply(u); err != nil {
return nil, gvk, err
}
}
return obj, gvk, nil
}
// schemaCoercingConverter calls the delegate converter and applies the Unstructured validator to
// coerce the schema.
type schemaCoercingConverter struct {
delegate runtime.ObjectConvertor
validator unstructuredSchemaCoercer
}
var _ runtime.ObjectConvertor = schemaCoercingConverter{}
func (v schemaCoercingConverter) Convert(in, out, context interface{}) error {
if err := v.delegate.Convert(in, out, context); err != nil {
return err
}
if u, ok := out.(*unstructured.Unstructured); ok {
if err := v.validator.apply(u); err != nil {
return err
}
}
return nil
}
func (v schemaCoercingConverter) ConvertToVersion(in runtime.Object, gv runtime.GroupVersioner) (runtime.Object, error) {
out, err := v.delegate.ConvertToVersion(in, gv)
if err != nil {
return nil, err
}
if u, ok := out.(*unstructured.Unstructured); ok {
if err := v.validator.apply(u); err != nil {
return nil, err
}
}
return out, nil
}
func (v schemaCoercingConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, value string) (string, string, error) {
return v.delegate.ConvertFieldLabel(gvk, label, value)
}
// unstructuredSchemaCoercer does the validation for Unstructured that json.Unmarshal
// does for native types. This includes:
// - validating and pruning ObjectMeta (here with optional error instead of pruning)
// - TODO: application of an OpenAPI validator (against the whole object or a top-level field of it).
// - TODO: optionally application of post-validation algorithms like defaulting and/or OpenAPI based pruning.
type unstructuredSchemaCoercer struct {
dropInvalidMetadata bool
}
func (v *unstructuredSchemaCoercer) apply(u *unstructured.Unstructured) error {
// save implicit meta fields that don't have to be specified in the validation spec
kind, foundKind, err := unstructured.NestedString(u.UnstructuredContent(), "kind")
if err != nil {
return err
}
apiVersion, foundApiVersion, err := unstructured.NestedString(u.UnstructuredContent(), "apiVersion")
if err != nil {
return err
}
objectMeta, foundObjectMeta, err := getObjectMeta(u, v.dropInvalidMetadata)
if err != nil {
return err
}
// restore meta fields, starting clean
if foundKind {
u.SetKind(kind)
}
if foundApiVersion {
u.SetAPIVersion(apiVersion)
}
if foundObjectMeta {
if err := setObjectMeta(u, objectMeta); err != nil {
return err
}
}
return nil
}
var encodingjson = json.CaseSensitiveJsonIterator()
func getObjectMeta(u *unstructured.Unstructured, dropMalformedFields bool) (*metav1.ObjectMeta, bool, error) {
metadata, found := u.UnstructuredContent()["metadata"]
if !found {
return nil, false, nil
}
// round-trip through JSON first, hoping that unmarshaling just works
objectMeta := &metav1.ObjectMeta{}
metadataBytes, err := encodingjson.Marshal(metadata)
if err != nil {
return nil, false, err
}
if err = encodingjson.Unmarshal(metadataBytes, objectMeta); err == nil {
// if successful, return
return objectMeta, true, nil
}
if !dropMalformedFields {
// if we're not trying to drop malformed fields, return the error
return nil, true, err
}
metadataMap, ok := metadata.(map[string]interface{})
if !ok {
return nil, false, fmt.Errorf("invalid metadata: expected object, got %T", metadata)
}
// Go field by field accumulating into the metadata object.
// This takes advantage of the fact that you can repeatedly unmarshal individual fields into a single struct,
// each iteration preserving the old key-values.
accumulatedObjectMeta := &metav1.ObjectMeta{}
testObjectMeta := &metav1.ObjectMeta{}
for k, v := range metadataMap {
// serialize a single field
if singleFieldBytes, err := encodingjson.Marshal(map[string]interface{}{k: v}); err == nil {
// do a test unmarshal
if encodingjson.Unmarshal(singleFieldBytes, testObjectMeta) == nil {
// if that succeeds, unmarshal for real
encodingjson.Unmarshal(singleFieldBytes, accumulatedObjectMeta)
}
}
}
return accumulatedObjectMeta, true, nil
}
func setObjectMeta(u *unstructured.Unstructured, objectMeta *metav1.ObjectMeta) error {
if objectMeta == nil {
unstructured.RemoveNestedField(u.UnstructuredContent(), "metadata")
return nil
}
metadata, err := runtime.DefaultUnstructuredConverter.ToUnstructured(objectMeta)
if err != nil {
return err
}
u.UnstructuredContent()["metadata"] = metadata
return nil
}