forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
508 lines (453 loc) · 19.5 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"os/user"
"path"
"sort"
"strconv"
"strings"
"time"
"github.com/emicklei/go-restful/swagger"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/printers"
)
const (
FlagMatchBinaryVersion = "match-server-version"
)
// Factory provides abstractions that allow the Kubectl command to be extended across multiple types
// of resources and different API sets.
// The rings are here for a reason. In order for composers to be able to provide alternative factory implementations
// they need to provide low level pieces of *certain* functions so that when the factory calls back into itself
// it uses the custom version of the function. Rather than try to enumerate everything that someone would want to override
// we split the factory into rings, where each ring can depend on methods an earlier ring, but cannot depend
// upon peer methods in its own ring.
// TODO: make the functions interfaces
// TODO: pass the various interfaces on the factory directly into the command constructors (so the
// commands are decoupled from the factory).
type Factory interface {
ClientAccessFactory
ObjectMappingFactory
BuilderFactory
}
type DiscoveryClientFactory interface {
// Returns a discovery client
DiscoveryClient() (discovery.CachedDiscoveryInterface, error)
}
// ClientAccessFactory holds the first level of factory methods.
// Generally provides discovery, negotiation, and no-dep calls.
// TODO The polymorphic calls probably deserve their own interface.
type ClientAccessFactory interface {
DiscoveryClientFactory
// ClientSet gives you back an internal, generated clientset
ClientSet() (internalclientset.Interface, error)
// Returns a RESTClient for accessing Kubernetes resources or an error.
RESTClient() (*restclient.RESTClient, error)
// Returns a client.Config for accessing the Kubernetes server.
ClientConfig() (*restclient.Config, error)
// BareClientConfig returns a client.Config that has NOT been negotiated. It's
// just directions to the server. People use this to build RESTMappers on top of
BareClientConfig() (*restclient.Config, error)
// TODO this should probably be removed and collapsed into whatever we want to use long term
// probably returning a restclient for a version and leaving contruction up to someone else
FederationClientSetForVersion(version *schema.GroupVersion) (fedclientset.Interface, error)
// TODO remove this should be rolled into restclient with the right version
FederationClientForVersion(version *schema.GroupVersion) (*restclient.RESTClient, error)
// TODO remove. This should be rolled into `ClientSet`
ClientSetForVersion(requiredVersion *schema.GroupVersion) (internalclientset.Interface, error)
// TODO remove. This should be rolled into `ClientConfig`
ClientConfigForVersion(requiredVersion *schema.GroupVersion) (*restclient.Config, error)
// Returns interfaces for decoding objects - if toInternal is set, decoded objects will be converted
// into their internal form (if possible). Eventually the internal form will be removed as an option,
// and only versioned objects will be returned.
Decoder(toInternal bool) runtime.Decoder
// Returns an encoder capable of encoding a provided object into JSON in the default desired version.
JSONEncoder() runtime.Encoder
// UpdatePodSpecForObject will call the provided function on the pod spec this object supports,
// return false if no pod spec is supported, or return an error.
UpdatePodSpecForObject(obj runtime.Object, fn func(*api.PodSpec) error) (bool, error)
// MapBasedSelectorForObject returns the map-based selector associated with the provided object. If a
// new set-based selector is provided, an error is returned if the selector cannot be converted to a
// map-based selector
MapBasedSelectorForObject(object runtime.Object) (string, error)
// PortsForObject returns the ports associated with the provided object
PortsForObject(object runtime.Object) ([]string, error)
// ProtocolsForObject returns the <port, protocol> mapping associated with the provided object
ProtocolsForObject(object runtime.Object) (map[string]string, error)
// LabelsForObject returns the labels associated with the provided object
LabelsForObject(object runtime.Object) (map[string]string, error)
// Returns internal flagset
FlagSet() *pflag.FlagSet
// Command will stringify and return all environment arguments ie. a command run by a client
// using the factory.
Command(cmd *cobra.Command, showSecrets bool) string
// BindFlags adds any flags that are common to all kubectl sub commands.
BindFlags(flags *pflag.FlagSet)
// BindExternalFlags adds any flags defined by external projects (not part of pflags)
BindExternalFlags(flags *pflag.FlagSet)
// TODO: Break the dependency on cmd here.
DefaultResourceFilterOptions(cmd *cobra.Command, withNamespace bool) *printers.PrintOptions
// DefaultResourceFilterFunc returns a collection of FilterFuncs suitable for filtering specific resource types.
DefaultResourceFilterFunc() kubectl.Filters
// SuggestedPodTemplateResources returns a list of resource types that declare a pod template
SuggestedPodTemplateResources() []schema.GroupResource
// Returns a Printer for formatting objects of the given type or an error.
Printer(mapping *meta.RESTMapping, options printers.PrintOptions) (printers.ResourcePrinter, error)
// Pauser marks the object in the info as paused. Currently supported only for Deployments.
// Returns the patched object in bytes and any error that occured during the encoding or
// in case the object is already paused.
Pauser(info *resource.Info) ([]byte, error)
// Resumer resumes a paused object inside the info. Currently supported only for Deployments.
// Returns the patched object in bytes and any error that occured during the encoding or
// in case the object is already resumed.
Resumer(info *resource.Info) ([]byte, error)
// ResolveImage resolves the image names. For kubernetes this function is just
// passthrough but it allows to perform more sophisticated image name resolving for
// third-party vendors.
ResolveImage(imageName string) (string, error)
// Returns the default namespace to use in cases where no
// other namespace is specified and whether the namespace was
// overridden.
DefaultNamespace() (string, bool, error)
// Generators returns the generators for the provided command
Generators(cmdName string) map[string]kubectl.Generator
// Check whether the kind of resources could be exposed
CanBeExposed(kind schema.GroupKind) error
// Check whether the kind of resources could be autoscaled
CanBeAutoscaled(kind schema.GroupKind) error
// EditorEnvs returns a group of environment variables that the edit command
// can range over in order to determine if the user has specified an editor
// of their choice.
EditorEnvs() []string
// PrintObjectSpecificMessage prints object-specific messages on the provided writer
PrintObjectSpecificMessage(obj runtime.Object, out io.Writer)
}
// ObjectMappingFactory holds the second level of factory methods. These functions depend upon ClientAccessFactory methods.
// Generally they provide object typing and functions that build requests based on the negotiated clients.
type ObjectMappingFactory interface {
// Returns interfaces for dealing with arbitrary runtime.Objects.
Object() (meta.RESTMapper, runtime.ObjectTyper)
// Returns interfaces for dealing with arbitrary
// runtime.Unstructured. This performs API calls to discover types.
UnstructuredObject() (meta.RESTMapper, runtime.ObjectTyper, error)
// Returns interface for expanding categories like `all`.
CategoryExpander() resource.CategoryExpander
// Returns a RESTClient for working with the specified RESTMapping or an error. This is intended
// for working with arbitrary resources and is not guaranteed to point to a Kubernetes APIServer.
ClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error)
// Returns a RESTClient for working with Unstructured objects.
UnstructuredClientForMapping(mapping *meta.RESTMapping) (resource.RESTClient, error)
// Returns a Describer for displaying the specified RESTMapping type or an error.
Describer(mapping *meta.RESTMapping) (printers.Describer, error)
// LogsForObject returns a request for the logs associated with the provided object
LogsForObject(object, options runtime.Object, timeout time.Duration) (*restclient.Request, error)
// Returns a Scaler for changing the size of the specified RESTMapping type or an error
Scaler(mapping *meta.RESTMapping) (kubectl.Scaler, error)
// Returns a Reaper for gracefully shutting down resources.
Reaper(mapping *meta.RESTMapping) (kubectl.Reaper, error)
// Returns a HistoryViewer for viewing change history
HistoryViewer(mapping *meta.RESTMapping) (kubectl.HistoryViewer, error)
// Returns a Rollbacker for changing the rollback version of the specified RESTMapping type or an error
Rollbacker(mapping *meta.RESTMapping) (kubectl.Rollbacker, error)
// Returns a StatusViewer for printing rollout status.
StatusViewer(mapping *meta.RESTMapping) (kubectl.StatusViewer, error)
// AttachablePodForObject returns the pod to which to attach given an object.
AttachablePodForObject(object runtime.Object, timeout time.Duration) (*api.Pod, error)
// Returns a schema that can validate objects stored on disk.
Validator(validate bool, cacheDir string) (validation.Schema, error)
// SwaggerSchema returns the schema declaration for the provided group version kind.
SwaggerSchema(schema.GroupVersionKind) (*swagger.ApiDeclaration, error)
}
// BuilderFactory holds the second level of factory methods. These functions depend upon ObjectMappingFactory and ClientAccessFactory methods.
// Generally they depend upon client mapper functions
type BuilderFactory interface {
// PrinterForCommand returns the default printer for the command. It requires that certain options
// are declared on the command (see AddPrinterFlags). Returns a printer, true if the printer is
// generic (is not internal), or an error if a printer could not be found.
// TODO: Break the dependency on cmd here.
PrinterForCommand(cmd *cobra.Command) (printers.ResourcePrinter, bool, error)
// PrinterForMapping returns a printer suitable for displaying the provided resource type.
// Requires that printer flags have been added to cmd (see AddPrinterFlags).
PrinterForMapping(cmd *cobra.Command, mapping *meta.RESTMapping, withNamespace bool) (printers.ResourcePrinter, error)
// PrintObject prints an api object given command line flags to modify the output format
PrintObject(cmd *cobra.Command, mapper meta.RESTMapper, obj runtime.Object, out io.Writer) error
// One stop shopping for a Builder
NewBuilder() *resource.Builder
}
func getGroupVersionKinds(gvks []schema.GroupVersionKind, group string) []schema.GroupVersionKind {
result := []schema.GroupVersionKind{}
for ix := range gvks {
if gvks[ix].Group == group {
result = append(result, gvks[ix])
}
}
return result
}
type factory struct {
ClientAccessFactory
ObjectMappingFactory
BuilderFactory
}
// NewFactory creates a factory with the default Kubernetes resources defined
// if optionalClientConfig is nil, then flags will be bound to a new clientcmd.ClientConfig.
// if optionalClientConfig is not nil, then this factory will make use of it.
func NewFactory(optionalClientConfig clientcmd.ClientConfig) Factory {
clientAccessFactory := NewClientAccessFactory(optionalClientConfig)
objectMappingFactory := NewObjectMappingFactory(clientAccessFactory)
builderFactory := NewBuilderFactory(clientAccessFactory, objectMappingFactory)
return &factory{
ClientAccessFactory: clientAccessFactory,
ObjectMappingFactory: objectMappingFactory,
BuilderFactory: builderFactory,
}
}
// GetFirstPod returns a pod matching the namespace and label selector
// and the number of all pods that match the label selector.
func GetFirstPod(client coreclient.PodsGetter, namespace string, selector labels.Selector, timeout time.Duration, sortBy func([]*v1.Pod) sort.Interface) (*api.Pod, int, error) {
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := client.Pods(namespace).List(options)
if err != nil {
return nil, 0, err
}
pods := []*v1.Pod{}
for i := range podList.Items {
pod := podList.Items[i]
externalPod := &v1.Pod{}
v1.Convert_api_Pod_To_v1_Pod(&pod, externalPod, nil)
pods = append(pods, externalPod)
}
if len(pods) > 0 {
sort.Sort(sortBy(pods))
internalPod := &api.Pod{}
v1.Convert_v1_Pod_To_api_Pod(pods[0], internalPod, nil)
return internalPod, len(podList.Items), nil
}
// Watch until we observe a pod
options.ResourceVersion = podList.ResourceVersion
w, err := client.Pods(namespace).Watch(options)
if err != nil {
return nil, 0, err
}
defer w.Stop()
condition := func(event watch.Event) (bool, error) {
return event.Type == watch.Added || event.Type == watch.Modified, nil
}
event, err := watch.Until(timeout, w, condition)
if err != nil {
return nil, 0, err
}
pod, ok := event.Object.(*api.Pod)
if !ok {
return nil, 0, fmt.Errorf("%#v is not a pod event", event)
}
return pod, 1, nil
}
func makePortsString(ports []api.ServicePort, useNodePort bool) string {
pieces := make([]string, len(ports))
for ix := range ports {
var port int32
if useNodePort {
port = ports[ix].NodePort
} else {
port = ports[ix].Port
}
pieces[ix] = fmt.Sprintf("%s:%d", strings.ToLower(string(ports[ix].Protocol)), port)
}
return strings.Join(pieces, ",")
}
func getPorts(spec api.PodSpec) []string {
result := []string{}
for _, container := range spec.Containers {
for _, port := range container.Ports {
result = append(result, strconv.Itoa(int(port.ContainerPort)))
}
}
return result
}
func getProtocols(spec api.PodSpec) map[string]string {
result := make(map[string]string)
for _, container := range spec.Containers {
for _, port := range container.Ports {
result[strconv.Itoa(int(port.ContainerPort))] = string(port.Protocol)
}
}
return result
}
// Extracts the ports exposed by a service from the given service spec.
func getServicePorts(spec api.ServiceSpec) []string {
result := []string{}
for _, servicePort := range spec.Ports {
result = append(result, strconv.Itoa(int(servicePort.Port)))
}
return result
}
// Extracts the protocols exposed by a service from the given service spec.
func getServiceProtocols(spec api.ServiceSpec) map[string]string {
result := make(map[string]string)
for _, servicePort := range spec.Ports {
result[strconv.Itoa(int(servicePort.Port))] = string(servicePort.Protocol)
}
return result
}
type clientSwaggerSchema struct {
c restclient.Interface
cacheDir string
}
const schemaFileName = "schema.json"
type schemaClient interface {
Get() *restclient.Request
}
func recursiveSplit(dir string) []string {
parent, file := path.Split(dir)
if len(parent) == 0 {
return []string{file}
}
return append(recursiveSplit(parent[:len(parent)-1]), file)
}
func substituteUserHome(dir string) (string, error) {
if len(dir) == 0 || dir[0] != '~' {
return dir, nil
}
parts := recursiveSplit(dir)
if len(parts[0]) == 1 {
parts[0] = os.Getenv("HOME")
} else {
usr, err := user.Lookup(parts[0][1:])
if err != nil {
return "", err
}
parts[0] = usr.HomeDir
}
return path.Join(parts...), nil
}
func writeSchemaFile(schemaData []byte, cacheDir, cacheFile, prefix, groupVersion string) error {
if err := os.MkdirAll(path.Join(cacheDir, prefix, groupVersion), 0755); err != nil {
return err
}
tmpFile, err := ioutil.TempFile(cacheDir, "schema")
if err != nil {
// If we can't write, keep going.
if os.IsPermission(err) {
return nil
}
return err
}
if _, err := io.Copy(tmpFile, bytes.NewBuffer(schemaData)); err != nil {
return err
}
if err := os.Link(tmpFile.Name(), cacheFile); err != nil {
// If we can't write due to file existing, or permission problems, keep going.
if os.IsExist(err) || os.IsPermission(err) {
return nil
}
return err
}
return nil
}
func getSchemaAndValidate(c schemaClient, data []byte, prefix, groupVersion, cacheDir string, delegate validation.Schema) (err error) {
var schemaData []byte
var firstSeen bool
fullDir, err := substituteUserHome(cacheDir)
if err != nil {
return err
}
cacheFile := path.Join(fullDir, prefix, groupVersion, schemaFileName)
if len(cacheDir) != 0 {
if schemaData, err = ioutil.ReadFile(cacheFile); err != nil && !os.IsNotExist(err) {
return err
}
}
if schemaData == nil {
firstSeen = true
schemaData, err = downloadSchemaAndStore(c, cacheDir, fullDir, cacheFile, prefix, groupVersion)
if err != nil {
return err
}
}
schema, err := validation.NewSwaggerSchemaFromBytes(schemaData, delegate)
if err != nil {
return err
}
err = schema.ValidateBytes(data)
if _, ok := err.(validation.TypeNotFoundError); ok && !firstSeen {
// As a temporary hack, kubectl would re-get the schema if validation
// fails for type not found reason.
// TODO: runtime-config settings needs to make into the file's name
schemaData, err = downloadSchemaAndStore(c, cacheDir, fullDir, cacheFile, prefix, groupVersion)
if err != nil {
return err
}
schema, err := validation.NewSwaggerSchemaFromBytes(schemaData, delegate)
if err != nil {
return err
}
return schema.ValidateBytes(data)
}
return err
}
// Download swagger schema from apiserver and store it to file.
func downloadSchemaAndStore(c schemaClient, cacheDir, fullDir, cacheFile, prefix, groupVersion string) (schemaData []byte, err error) {
schemaData, err = c.Get().
AbsPath("/swaggerapi", prefix, groupVersion).
Do().
Raw()
if err != nil {
return
}
if len(cacheDir) != 0 {
if err = writeSchemaFile(schemaData, fullDir, cacheFile, prefix, groupVersion); err != nil {
return
}
}
return
}
func (c *clientSwaggerSchema) ValidateBytes(data []byte) error {
gvk, err := json.DefaultMetaFactory.Interpret(data)
if err != nil {
return err
}
if ok := api.Registry.IsEnabledVersion(gvk.GroupVersion()); !ok {
// if we don't have this in our scheme, just skip validation because its an object we don't recognize
return nil
}
switch gvk.Group {
case api.GroupName:
return getSchemaAndValidate(c.c, data, "api", gvk.GroupVersion().String(), c.cacheDir, c)
default:
return getSchemaAndValidate(c.c, data, "apis/", gvk.GroupVersion().String(), c.cacheDir, c)
}
}