-
Notifications
You must be signed in to change notification settings - Fork 363
/
client.go
188 lines (172 loc) · 5.95 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright 2019 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package antctl
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/serializer"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/rest"
agentapiserver "antrea.io/antrea/pkg/agent/apiserver"
"antrea.io/antrea/pkg/antctl/runtime"
"antrea.io/antrea/pkg/apis"
controllerapiserver "antrea.io/antrea/pkg/apiserver"
flowaggregatorapiserver "antrea.io/antrea/pkg/flowaggregator/apiserver"
)
// requestOption describes options to issue requests.
type requestOption struct {
commandDefinition *commandDefinition
// kubeconfig is the path to the config file for kubectl.
kubeconfig string
// args are the parameters of the ongoing resourceRequest.
args map[string]string
// timeout specifies a time limit for requests made by the client. The timeout
// duration includes connection setup, all redirects, and reading of the
// response body.
timeout time.Duration
// server is the address and port of the APIServer specified by user explicitly.
// If not set, antctl will connect to 127.0.0.1:10350 in agent mode, and will
// connect to the server set in kubeconfig in controller mode.
// It set, it takes precedence over the above default endpoints.
server string
}
type AntctlClient interface {
request(opt *requestOption) (io.Reader, error)
}
// client issues requests to endpoints.
type client struct {
// codec is the CodecFactory for this command, it is needed for remote accessing.
codec serializer.CodecFactory
}
func newClient(codec serializer.CodecFactory) AntctlClient {
return &client{codec: codec}
}
// resolveKubeconfig tries to load the kubeconfig specified in the requestOption.
// It will return error if the stating of the file failed or the kubeconfig is malformed.
// If the default kubeconfig not exists, it will try to use an in-cluster config.
func (c *client) resolveKubeconfig(opt *requestOption) (*rest.Config, error) {
var kubeconfig *rest.Config
if runtime.InPod {
kubeconfig = &rest.Config{}
kubeconfig.Insecure = true
kubeconfig.CAFile = ""
kubeconfig.CAData = nil
if runtime.Mode == runtime.ModeAgent {
kubeconfig.Host = net.JoinHostPort("127.0.0.1", fmt.Sprint(apis.AntreaAgentAPIPort))
kubeconfig.BearerTokenFile = agentapiserver.TokenPath
} else if runtime.Mode == runtime.ModeController {
kubeconfig.Host = net.JoinHostPort("127.0.0.1", fmt.Sprint(apis.AntreaControllerAPIPort))
kubeconfig.BearerTokenFile = controllerapiserver.TokenPath
} else if runtime.Mode == runtime.ModeFlowAggregator {
kubeconfig.Host = net.JoinHostPort("127.0.0.1", fmt.Sprint(apis.FlowAggregatorAPIPort))
kubeconfig.BearerTokenFile = flowaggregatorapiserver.TokenPath
}
} else {
var err error
if kubeconfig, err = runtime.ResolveKubeconfig(opt.kubeconfig); err != nil {
return nil, err
}
}
kubeconfig.NegotiatedSerializer = c.codec
return kubeconfig, nil
}
func (c *client) request(opt *requestOption) (io.Reader, error) {
var e *endpoint
if runtime.Mode == runtime.ModeAgent {
e = opt.commandDefinition.agentEndpoint
} else if runtime.Mode == runtime.ModeFlowAggregator {
e = opt.commandDefinition.flowAggregatorEndpoint
} else {
e = opt.commandDefinition.controllerEndpoint
}
if e.resourceEndpoint != nil {
return c.resourceRequest(e.resourceEndpoint, opt)
}
return c.nonResourceRequest(e.nonResourceEndpoint, opt)
}
func (c *client) nonResourceRequest(e *nonResourceEndpoint, opt *requestOption) (io.Reader, error) {
kubeconfig, err := c.resolveKubeconfig(opt)
if err != nil {
return nil, err
}
if opt.server != "" {
kubeconfig.Host = opt.server
}
restClient, err := rest.UnversionedRESTClientFor(kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to create rest client: %w", err)
}
u := url.URL{Path: e.path}
q := u.Query()
for k, v := range opt.args {
q.Set(k, v)
}
u.RawQuery = q.Encode()
getter := restClient.Get().RequestURI(u.RequestURI()).Timeout(opt.timeout)
result, err := getter.DoRaw(context.TODO())
if err != nil {
statusErr, ok := err.(*errors.StatusError)
if !ok {
return nil, err
}
return nil, generateMessageForStatusErr(opt.commandDefinition, opt.args, statusErr)
}
return bytes.NewReader(result), nil
}
func (c *client) resourceRequest(e *resourceEndpoint, opt *requestOption) (io.Reader, error) {
kubeconfig, err := c.resolveKubeconfig(opt)
if err != nil {
return nil, err
}
if opt.server != "" {
kubeconfig.Host = opt.server
}
gv := e.groupVersionResource.GroupVersion()
kubeconfig.GroupVersion = &gv
kubeconfig.APIPath = genericapiserver.APIGroupPrefix
restClient, err := rest.RESTClientFor(kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to create rest client: %w", err)
}
// If timeout is zero, there will be no timeout.
restClient.Client.Timeout = opt.timeout
resGetter := restClient.Get().
NamespaceIfScoped(opt.args["namespace"], e.namespaced).
Resource(e.groupVersionResource.Resource)
if len(e.resourceName) != 0 {
resGetter = resGetter.Name(e.resourceName)
} else if name, ok := opt.args["name"]; ok {
resGetter = resGetter.Name(name)
}
for arg, val := range opt.args {
if arg != "name" && arg != "namespace" {
resGetter = resGetter.Param(arg, val)
}
}
result := resGetter.Do(context.TODO())
if result.Error() != nil {
return nil, generateMessage(opt, result)
}
raw, err := result.Raw()
if err != nil {
return nil, err
}
return bytes.NewReader(raw), nil
}