-
Notifications
You must be signed in to change notification settings - Fork 681
/
client.go
1159 lines (1022 loc) · 35.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package kates
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/datawire/dlib/dlog"
"github.com/pkg/errors"
"github.com/spf13/pflag"
// k8s libraries
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/kubectl/pkg/polymorphichelpers"
// k8s types
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// k8s plugins
_ "k8s.io/client-go/plugin/pkg/client/auth"
)
// The Client struct provides an interface to interact with the kubernetes api-server. You can think
// of it like a programatic version of the familiar kubectl command line tool. In fact a goal of
// these APIs is that where possible, your knowledge of kubectl should translate well into using
// these APIs. It provides a golang-friendly way to perform basic CRUD and Watch operations on
// kubernetes resources, as well as providing some additional capabilities.
//
// Differences from kubectl:
//
// - You can also use a Client to update the status of a resource.
// - The Client struct cannot perform an apply operation.
// - The Client provides Read/write coherence (more about this below).
// - The Client provides load shedding via event coalescing for watches.
// - The Client provides bootstrapping of multiple watches.
//
// The biggest difference from kubectl (and also from using client-go directly) is the Read/Write
// coherence it provides. Kubernetes Watches are inherently asynchronous. This means that if a
// kubernetes resource is modified at time T0, a client won't find out about it until some later
// time T1. It is normally difficult to notice this since the delay may be quite small, however if
// you are writing a controller that uses watches in combination with modifying the resources it is
// watching, the delay is big enough that a program will often be "notified" with versions of
// resources that do not included updates made by the program itself. This even happens when a
// program has a lock and is guaranteed to be the only process modifying a given resource. Needless
// to say, programming against an API like this can make for some brain twisting logic. The Client
// struct allows for much simpler code by tracking what changes have been made locally and updating
// all Watch results with the most recent version of an object, thereby providing the guarantee that
// your Watch results will *always* include any changes you have made via the Client performing the
// watch.
//
// Additionally, the Accumulator API provides two key pieces of watch related functionality:
//
// 1. By coalescing multiple updates behind the scenes, the Accumulator API provides a natural
// form of load shedding if a user of the API cannot keep up with every single update.
//
// 2. The Accumulator API is guaranteed to bootstrap (i.e. perform an initial List operation) on
// all watches prior to notifying the user that resources are available to process.
type Client struct {
config *ConfigFlags
cli dynamic.Interface
mapper meta.RESTMapper
disco discovery.CachedDiscoveryInterface
mutex sync.Mutex
canonical map[string]*Unstructured
// This is an internal interface for testing, it lets us deliberately introduce delays into the
// implementation, e.g. effectively increasing the latency to the api server in a controllable
// way and letting us reproduce and test for race conditions far more efficiently than
// otherwise.
watchAdded func(*Unstructured, *Unstructured)
watchUpdated func(*Unstructured, *Unstructured)
watchDeleted func(*Unstructured, *Unstructured)
}
// The ClientConfig struct holds all the parameters and configuration
// that can be passed upon construct of a new Client.
type ClientConfig struct {
Kubeconfig string
Context string
Namespace string
}
// The NewClient function constructs a new client with the supplied ClientConfig.
func NewClient(options ClientConfig) (*Client, error) {
return NewClientFromConfigFlags(config(options))
}
func NewClientFromFlagSet(flags *pflag.FlagSet) (*Client, error) {
config := NewConfigFlags(false)
// We can disable or enable flags by setting them to
// nil/non-nil prior to calling .AddFlags().
//
// .Username and .Password are already disabled by default in
// genericclioptions.NewConfigFlags().
config.AddFlags(flags)
return NewClientFromConfigFlags(config)
}
func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) {
restconfig, err := config.ToRESTConfig()
if err != nil {
return nil, err
}
cli, err := dynamic.NewForConfig(restconfig)
if err != nil {
return nil, err
}
mapper, disco, err := NewRESTMapper(config)
if err != nil {
return nil, err
}
return &Client{
config: config,
cli: cli,
mapper: mapper,
disco: disco,
canonical: make(map[string]*Unstructured),
watchAdded: func(oldObj, newObj *Unstructured) {},
watchUpdated: func(oldObj, newObj *Unstructured) {},
watchDeleted: func(oldObj, newObj *Unstructured) {},
}, nil
}
func NewRESTMapper(config *ConfigFlags) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error) {
// Throttling is scoped to rest.Config, so we use a dedicated
// rest.Config for discovery so we can disable throttling for
// discovery, but leave it in place for normal requests. This
// is largely the same thing that ConfigFlags.ToRESTMapper()
// does, hence the same thing that kubectl does. There are two
// differences we are introducing here: (1) is that if there
// is no cache dir supplied, we fallback to in-memory caching
// rather than not caching discovery requests at all. The
// second thing is that (2) unlike kubectl we do not cache
// non-discovery requests.
restconfig, err := config.ToRESTConfig()
if err != nil {
return nil, nil, err
}
restconfig.QPS = 1000000
restconfig.Burst = 1000000
var cachedDiscoveryClient discovery.CachedDiscoveryInterface
if config.CacheDir != nil {
cachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(restconfig, *config.CacheDir, "",
time.Duration(10*time.Minute))
if err != nil {
return nil, nil, err
}
} else {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(restconfig)
if err != nil {
return nil, nil, err
}
cachedDiscoveryClient = memory.NewMemCacheClient(discoveryClient)
}
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
expander := restmapper.NewShortcutExpander(mapper, cachedDiscoveryClient)
return expander, cachedDiscoveryClient, nil
}
// The InCluster function returns true if the process is running inside a kubernetes cluster, and
// false if it is running outside the cluster. This is determined by heuristics, however it uses the
// exact same heuristics as client-go does. This is copied from
// (client-go/tools/clientcmd/client_config.go), as it is not publically invocable in its original
// place. This should be re-copied if the original code changes.
func InCluster() bool {
fi, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/token")
return os.Getenv("KUBERNETES_SERVICE_HOST") != "" &&
os.Getenv("KUBERNETES_SERVICE_PORT") != "" &&
err == nil && !fi.IsDir()
}
// DynamicInterface is an accessor method to the k8s dynamic client
func (c *Client) DynamicInterface() dynamic.Interface {
return c.cli
}
func (c *Client) WaitFor(ctx context.Context, kindOrResource string) {
for {
_, err := c.mappingFor(kindOrResource)
if err != nil {
_, ok := err.(*unknownResource)
if ok {
select {
case <-time.After(1 * time.Second):
c.InvalidateCache()
continue
case <-ctx.Done():
return
}
}
}
return
}
}
func (c *Client) InvalidateCache() {
// TODO: it's possible that invalidate could be smarter now
// and use the methods on CachedDiscoveryInterface
mapper, disco, err := NewRESTMapper(c.config)
if err != nil {
panic(err)
}
c.mapper = mapper
c.disco = disco
}
// The ServerVersion() method returns a struct with information about
// the kubernetes api-server version.
func (c *Client) ServerVersion() (*VersionInfo, error) {
return c.disco.ServerVersion()
}
// processAPIResourceLists takes a `[]*metav1.APIResourceList` as returned by any of several calls
// to a DiscoveryInterface, and transforms it in to a straight-forward `[]metav1.APIResource`.
//
// If you weren't paying close-enough attention, you might have thought I said it takes a
// `*metav1.APIResourceList` object, and now you're wondering why this needs to be anything more
// than `return input.APIResources`. Well:
//
// 1. The various DiscoveryInterface calls don't return a List, they actually return an array of
// Lists, where the Lists are grouped by the group/version of the resources. So we need to
// flatten those out.
// 2. I guess the reason they group them that way is to avoid repeating the group and version in
// each resource, because the List objects themselvs have .Group and .Version set, but the
// APIresource objects don't. This lets them save 10s of bytes on an infrequently use API
// call! Anyway, we'll need to fill those in on the returned objects because we're discarding
// the grouping.
func processAPIResourceLists(listsByGV []*metav1.APIResourceList) []APIResource {
// Do some book-keeping to allow us to pre-allocate the entire list.
count := 0
for _, list := range listsByGV {
if list != nil {
count += len(list.APIResources)
}
}
if count == 0 {
return nil
}
// Build the processed list to return.
ret := make([]APIResource, 0, count)
for _, list := range listsByGV {
if list != nil {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
continue
}
for _, typeinfo := range list.APIResources {
// I'm not 100% sure that none of the DiscoveryInterface calls fill
// in .Group and .Version, so just in case one of the calls does
// fill them in, we'll only fill them in if they're not already set.
if typeinfo.Group == "" {
typeinfo.Group = gv.Group
}
if typeinfo.Version == "" {
typeinfo.Version = gv.Version
}
ret = append(ret, typeinfo)
}
}
}
return ret
}
// ServerPreferredResources returns the list of resource types that the server supports.
//
// If a resource type supports multiple versions, then *only* the preferred version is returned.
// Use ServerResources to return a list that includes all versions.
func (c *Client) ServerPreferredResources() ([]APIResource, error) {
// It's possible that an error prevented listing some apigroups, but not all; so process the
// output even if there is an error.
listsByGV, err := c.disco.ServerPreferredResources()
return processAPIResourceLists(listsByGV), err
}
// ServerResources returns the list of resource types that the server supports.
//
// If a resource type supports multiple versions, then a list entry for *each* version is returned.
// Use ServerPreferredResources to return a list that includes just the preferred version.
func (c *Client) ServerResources() ([]APIResource, error) {
// It's possible that an error prevented listing some apigroups, but not all; so process the
// output even if there is an error.
_, listsByGV, err := c.disco.ServerGroupsAndResources()
return processAPIResourceLists(listsByGV), err
}
// ==
// TODO: Query is interpreted a bit differently for List and
// Watch. Should either reconcile this or perhaps split Query into two
// separate types.
// A Query holds all the information needed to List or Watch a set of
// kubernetes resources.
type Query struct {
// The Name field holds the name of the Query. This is used by
// Watch to determine how multiple queries are unmarshaled by
// Accumulator.Update(). This is ignored for List.
Name string
// The Kind field indicates what sort of resource is being queried.
Kind string
// The Namespace field holds the namespace to Query.
Namespace string
// The FieldSelector field holds a string in selector syntax
// that is used to filter results based on field values. The
// only field values supported are metadata.name and
// metadata.namespace. This is only supported for List.
FieldSelector string
// The LabelSelector field holds a string in selector syntax
// that is used to filter results based on label values.
LabelSelector string
}
func (c *Client) Watch(ctx context.Context, queries ...Query) *Accumulator {
return newAccumulator(ctx, c, queries...)
}
// ==
func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdate, cli dynamic.ResourceInterface) {
var informer cache.SharedInformer
// we override Watch to let us signal when our initial List is
// complete so we can send an update() even when there are no
// resource instances of the kind being watched
lw := newListWatcher(ctx, cli, query, func(lw *lw) {
if lw.hasSynced() {
target <- rawUpdate{query.Name, true, nil, nil}
}
})
informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute)
// TODO: uncomment this when we get to kubernetes 1.19. Right now errors will get logged by
// klog. With this error handler in place we will log them to our own logger and provide a
// more useful error message:
/*
informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
// This is from client-go/tools/cache/reflector.go:563
isExpiredError := func(err error) bool {
// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
switch {
case isExpiredError(err):
log.Printf("Watch of %s closed with: %v", query.Kind, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
log.Printf("Watch for %s closed with unexpected EOF: %v", query.Kind, err)
default:
log.Printf("Failed to watch %s: %v", query.Kind, err)
}
})
*/
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// This is for testing. It allows us to deliberately increase the probability of
// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
// nicer prettier set of hooks, but for now all we need is this hack for
// better/faster tests.
c.watchAdded(nil, obj.(*Unstructured))
lw.countAddEvent()
target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured)}
},
UpdateFunc: func(oldObj, newObj interface{}) {
old := oldObj.(*Unstructured)
new := newObj.(*Unstructured)
// This is for testing. It allows us to deliberately increase the probability of
// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
// nicer prettier set of hooks, but for now all we need is this hack for
// better/faster tests.
c.watchUpdated(old, new)
target <- rawUpdate{query.Name, lw.hasSynced(), old, new}
},
DeleteFunc: func(obj interface{}) {
var old *Unstructured
switch o := obj.(type) {
case cache.DeletedFinalStateUnknown:
old = o.Obj.(*Unstructured)
case *Unstructured:
old = o
}
// This is for testing. It allows us to deliberately increase the probability of
// race conditions by e.g. introducing sleeps. At some point I'm sure we will want a
// nicer prettier set of hooks, but for now all we need is this hack for
// better/faster tests.
c.watchDeleted(old, nil)
key := unKey(old)
// For the Add and Update cases, we clean out c.canonical in
// patchWatch.
c.mutex.Lock()
delete(c.canonical, key)
c.mutex.Unlock()
target <- rawUpdate{query.Name, lw.hasSynced(), old, nil}
},
},
)
go informer.Run(ctx.Done())
}
type rawUpdate struct {
name string
synced bool
old *unstructured.Unstructured
new *unstructured.Unstructured
}
type lw struct {
// All these fields are read-only and initialized on construction.
ctx context.Context
client dynamic.ResourceInterface
query Query
synced func(*lw)
once sync.Once
// The mutex protects all the read-write fields.
mutex sync.Mutex
initialListDone bool
initialListCount int
addEventCount int
listForbidden bool
}
func newListWatcher(ctx context.Context, client dynamic.ResourceInterface, query Query, synced func(*lw)) *lw {
return &lw{ctx: ctx, client: client, query: query, synced: synced}
}
func (lw *lw) withMutex(f func()) {
lw.mutex.Lock()
defer lw.mutex.Unlock()
f()
}
func (lw *lw) countAddEvent() {
lw.withMutex(func() {
lw.addEventCount++
})
}
// This computes whether we have synced a given watch. We used to use SharedInformer.HasSynced() for
// this, but that seems to be a blatant lie that always return true. My best guess as to why it lies
// is that it is actually reporting the synced state of an internal queue, but because the
// SharedInformer mechanism adds another layer of dispatch on top of that internal queue, the
// syncedness of that internal queue is irrelevant to whether enough layered events have been
// dispatched to consider things synced at the dispatch layer.
//
// So to track syncedness properly for our users, when we do our first List() we remember how many
// resourcees there are and we do not consider ourselves synced until we have dispatched at least as
// many Add events as there are resources.
func (lw *lw) hasSynced() (result bool) {
lw.withMutex(func() {
result = lw.initialListDone && lw.addEventCount >= lw.initialListCount
})
return
}
// List is used by a SharedInformer to get a baseline list of resources
// that can then be maintained by a watch.
func (lw *lw) List(opts ListOptions) (runtime.Object, error) {
// Our SharedInformer will call us every so often. Every time through,
// we'll decide whether we can be synchronized, and whether the list was
// forbidden.
synced := false
forbidden := false
opts.FieldSelector = lw.query.FieldSelector
opts.LabelSelector = lw.query.LabelSelector
result, err := lw.client.List(lw.ctx, opts)
if err == nil {
// No error, the list worked out fine. We can be synced now...
synced = true
// ...and the list was not forbidden.
forbidden = false
} else if apierrors.IsForbidden(err) {
// Forbidden. We'll still consider ourselves synchronized, but
// remember the forbidden error!
// dlog.Debugf(lw.ctx, "couldn't list %s (forbidden)", lw.query.Kind)
synced = true
forbidden = true
// Impedance matching for the SharedInformer interface: pretend
// that we got an empty list and no error.
result = &unstructured.UnstructuredList{}
err = nil
} else {
// Any other error we'll consider transient, and try again later.
// We're neither synced nor forbidden
dlog.Infof(lw.ctx, "couldn't list %s (will retry): %s", lw.query.Kind, err)
}
lw.withMutex(func() {
if synced {
if !lw.initialListDone {
lw.initialListDone = true
lw.initialListCount = len(result.Items)
}
}
lw.listForbidden = forbidden
})
return result, err
}
func (lw *lw) Watch(opts ListOptions) (watch.Interface, error) {
lw.once.Do(func() { lw.synced(lw) })
opts.FieldSelector = lw.query.FieldSelector
opts.LabelSelector = lw.query.LabelSelector
iface, err := lw.client.Watch(lw.ctx, opts)
if err != nil {
// If the list was forbidden, this error will likely just be "unknown", since we
// returned an unstructured.UnstructuredList to fake out the lister, so in that
// case just synthesize a slightly nicer error.
if lw.listForbidden {
err = errors.New(fmt.Sprintf("can't watch %s: forbidden", lw.query.Kind))
} else {
// Not forbidden. Go ahead and make sure the Kind we're querying for is in
// there, though.
err = errors.Wrap(err, fmt.Sprintf("can't watch %s", lw.query.Kind))
}
}
return iface, err
}
// ==
func (c *Client) cliFor(mapping *meta.RESTMapping, namespace string) dynamic.ResourceInterface {
cli := c.cli.Resource(mapping.Resource)
if mapping.Scope.Name() == meta.RESTScopeNameNamespace && namespace != NamespaceAll {
return cli.Namespace(namespace)
} else {
return cli
}
}
func (c *Client) cliForResource(resource *Unstructured) dynamic.ResourceInterface {
mapping, err := c.mappingFor(resource.GroupVersionKind().GroupKind().String())
if err != nil {
panic(err)
}
// this will canonicalize the kind and version so any
// shortcuts are expanded
resource.SetGroupVersionKind(mapping.GroupVersionKind)
ns := resource.GetNamespace()
if ns == "" {
ns = "default"
}
return c.cliFor(mapping, ns)
}
// mappingFor returns the RESTMapping for the Kind given, or the Kind referenced by the resource.
// Prefers a fully specified GroupVersionResource match. If one is not found, we match on a fully
// specified GroupVersionKind, or fallback to a match on GroupKind.
//
// This is copy/pasted from k8s.io/cli-runtime/pkg/resource.Builder.mappingFor() (which is
// unfortunately private), with modified lines marked with "// MODIFIED".
func (c *Client) mappingFor(resourceOrKind string) (*meta.RESTMapping, error) { // MODIFIED: args
fullySpecifiedGVR, groupResource := schema.ParseResourceArg(resourceOrKind)
gvk := schema.GroupVersionKind{}
// MODIFIED: Don't call b.restMapperFn(), use c.mapper instead.
if fullySpecifiedGVR != nil {
gvk, _ = c.mapper.KindFor(*fullySpecifiedGVR)
}
if gvk.Empty() {
gvk, _ = c.mapper.KindFor(groupResource.WithVersion(""))
}
if !gvk.Empty() {
return c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
}
fullySpecifiedGVK, groupKind := schema.ParseKindArg(resourceOrKind)
if fullySpecifiedGVK == nil {
gvk := groupKind.WithVersion("")
fullySpecifiedGVK = &gvk
}
if !fullySpecifiedGVK.Empty() {
if mapping, err := c.mapper.RESTMapping(fullySpecifiedGVK.GroupKind(), fullySpecifiedGVK.Version); err == nil {
return mapping, nil
}
}
mapping, err := c.mapper.RESTMapping(groupKind, gvk.Version)
if err != nil {
// if we error out here, it is because we could not match a resource or a kind
// for the given argument. To maintain consistency with previous behavior,
// announce that a resource type could not be found.
// if the error is _not_ a *meta.NoKindMatchError, then we had trouble doing discovery,
// so we should return the original error since it may help a user diagnose what is actually wrong
if meta.IsNoMatchError(err) {
return nil, &unknownResource{resourceOrKind}
}
return nil, err
}
return mapping, nil
}
type unknownResource struct {
arg string
}
func (e *unknownResource) Error() string {
return fmt.Sprintf("the server doesn't have a resource type %q", e.arg)
}
// ==
func (c *Client) List(ctx context.Context, query Query, target interface{}) error {
mapping, err := c.mappingFor(query.Kind)
if err != nil {
return err
}
items := make([]*Unstructured, 0)
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliFor(mapping, query.Namespace)
res, err := cli.List(ctx, ListOptions{
FieldSelector: query.FieldSelector,
LabelSelector: query.LabelSelector,
})
if err != nil {
return err
}
for _, un := range res.Items {
copy := un.DeepCopy()
key := unKey(copy)
// TODO: Deal with garbage collection in the case
// where there is no watch.
c.canonical[key] = copy
items = append(items, copy)
}
return nil
}(); err != nil {
return err
}
return convert(items, target)
}
// ==
func (c *Client) Get(ctx context.Context, resource interface{}, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
res, err = cli.Get(ctx, un.GetName(), GetOptions{})
if err != nil {
return err
}
key := unKey(res)
// TODO: Deal with garbage collection in the case
// where there is no watch.
c.canonical[key] = res
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) Create(ctx context.Context, resource interface{}, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
res, err = cli.Create(ctx, &un, CreateOptions{})
if err != nil {
return err
}
key := unKey(res)
c.canonical[key] = res
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) Update(ctx context.Context, resource interface{}, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
prev := un.GetResourceVersion()
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
res, err = cli.Update(ctx, &un, UpdateOptions{})
if err != nil {
return err
}
if res.GetResourceVersion() != prev {
key := unKey(res)
c.canonical[key] = res
}
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) Patch(ctx context.Context, resource interface{}, pt PatchType, data []byte, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
prev := un.GetResourceVersion()
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
res, err = cli.Patch(ctx, un.GetName(), pt, data, PatchOptions{})
if err != nil {
return err
}
if res.GetResourceVersion() != prev {
key := unKey(res)
c.canonical[key] = res
}
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) Upsert(ctx context.Context, resource interface{}, source interface{}, target interface{}) error {
if resource == nil || reflect.ValueOf(resource).IsNil() {
resource = source
}
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
var unsrc Unstructured
err = convert(source, &unsrc)
if err != nil {
return err
}
MergeUpdate(&un, &unsrc)
prev := un.GetResourceVersion()
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
create := false
rsrc := &un
if prev == "" {
stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
if err != nil {
if IsNotFound(err) {
create = true
rsrc = &un
} else {
return err
}
} else {
rsrc = stored
MergeUpdate(rsrc, &unsrc)
}
}
if create {
res, err = cli.Create(ctx, rsrc, CreateOptions{})
} else {
// XXX: need to clean up the conflict case and add a test for it
update:
res, err = cli.Update(ctx, rsrc, UpdateOptions{})
if err != nil && IsConflict(err) {
stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
if err != nil {
return err
}
rsrc = stored
MergeUpdate(rsrc, &unsrc)
goto update
}
}
if err != nil {
return err
}
if res.GetResourceVersion() != prev {
key := unKey(res)
c.canonical[key] = res
}
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) UpdateStatus(ctx context.Context, resource interface{}, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
prev := un.GetResourceVersion()
var res *Unstructured
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
res, err = cli.UpdateStatus(ctx, &un, UpdateOptions{})
if err != nil {
return err
}
if res.GetResourceVersion() != prev {
key := unKey(res)
c.canonical[key] = res
}
return nil
}(); err != nil {
return err
}
return convert(res, target)
}
// ==
func (c *Client) Delete(ctx context.Context, resource interface{}, target interface{}) error {
var un Unstructured
err := convert(resource, &un)
if err != nil {
return err
}
if err := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
cli := c.cliForResource(&un)
err = cli.Delete(ctx, un.GetName(), DeleteOptions{})
if err != nil {
return err
}
key := unKey(&un)
c.canonical[key] = nil
return nil
}(); err != nil {
return err
}
return convert(nil, target)
}
// ==
// Update the result of a watch with newer items from our local cache. This guarantees we never give
// back stale objects that are known to be modified by us.
func (c *Client) patchWatch(field *field) {
c.mutex.Lock()
defer c.mutex.Unlock()
// The canonical map holds all local changes made by this client which have not been reflected
// back to it through a watch. This should normally be quite small since objects only occupy
// this map for the duration of a round trip to the API server. (XXX: We don't yet handle the
// case of modifying objects that are not watched. Those objects will get stuck in this map, but
// that is ok for our current set of use cases.)
// Loop over the canonical map and patch the watch result.
for key, can := range c.canonical {
item, ok := field.values[key]
if ok {
// An object is both in our canonical map and in the watch.
if can == nil {
// The object is deleted, but has not yet been reported so by the apiserver, so we
// remove it.
log.Println("Patching delete", field.mapping.GroupVersionKind.Kind, key)
delete(field.values, key)
field.deltas[key] = newDelta(ObjectDelete, can)
} else if gteq(item.GetResourceVersion(), can.GetResourceVersion()) {
// The object in the watch result is the same or newer than our canonical value, so
// no need to track it anymore.
log.Println("Patching synced", field.mapping.GroupVersionKind.Kind, key)
delete(c.canonical, key)
} else {
// The object in the watch result is stale, so we update it with the canonical
// version and track it as a delta.
log.Println("Patching update", field.mapping.GroupVersionKind.Kind, key)
field.values[key] = can
field.deltas[key] = newDelta(ObjectUpdate, can)
}
} else if can != nil && can.GroupVersionKind() == field.mapping.GroupVersionKind &&
field.selector.Matches(LabelSet(can.GetLabels())) {
// An object that was created locally is not yet present in the watch result, so we add it.
log.Println("Patching add", field.mapping.GroupVersionKind.Kind, key)
field.values[key] = can
field.deltas[key] = newDelta(ObjectAdd, can)
}
}
}
// ==
// The LogEvent struct is used to communicate log output from a pod. It includes PodID and Timestamp information so that
// LogEvents from different pods can be interleaved without losing information about total ordering and/or pod identity.
type LogEvent struct {
// The PodID field indicates what pod the log output is associated with.
PodID string `json:"podID"`
// The Timestamp field indicates when the log output was created.
Timestamp string `json:"timestamp"`