forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
idling.go
468 lines (379 loc) · 16 KB
/
idling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
package idling
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"strings"
"sync"
"time"
g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kapiv1 "k8s.io/kubernetes/pkg/api/v1"
unidlingproxy "github.com/openshift/origin/pkg/proxy/unidler"
unidlingapi "github.com/openshift/origin/pkg/unidling/api"
exutil "github.com/openshift/origin/test/extended/util"
)
func tryEchoUDPOnce(ip net.IP, udpPort int, expectedBuff []byte, readTimeout time.Duration) ([]byte, error) {
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: ip, Port: udpPort})
if err != nil {
return nil, fmt.Errorf("unable to connect to service: %v", err)
}
defer conn.Close()
var n int
if n, err = conn.Write(expectedBuff); err != nil {
// It's technically possible to get some errors on write while switching over
return nil, nil
} else if n != len(expectedBuff) {
return nil, fmt.Errorf("unable to write entire %v bytes to UDP echo server socket", len(expectedBuff))
}
if err = conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
return nil, fmt.Errorf("unable to set deadline on read from echo server: %v", err)
}
actualBuff := make([]byte, n)
var amtRead int
amtRead, _, err = conn.ReadFromUDP(actualBuff)
if err != nil {
return nil, fmt.Errorf("unable to read from UDP echo server: %v", err)
} else if amtRead != n {
// we should never read back the *wrong* thing
return nil, fmt.Errorf("read back incorrect number of bytes from echo server")
}
if string(expectedBuff) != string(actualBuff) {
return nil, fmt.Errorf("written contents %q didn't equal read contents %q from echo server: %v", string(expectedBuff), string(actualBuff), err)
}
return actualBuff, nil
}
func tryEchoUDP(svc *kapiv1.Service) error {
rawIP := svc.Spec.ClusterIP
o.Expect(rawIP).NotTo(o.BeEmpty(), "The service should have a cluster IP set")
ip := net.ParseIP(rawIP)
o.Expect(ip).NotTo(o.BeNil(), "The service should have a valid cluster IP, but %q was not valid", rawIP)
var udpPort int
for _, port := range svc.Spec.Ports {
if port.Protocol == "UDP" {
udpPort = int(port.Port)
break
}
}
o.Expect(udpPort).NotTo(o.Equal(0), "The service should have a UDP port exposed")
// For UDP, we just drop packets on the floor rather than queue them up
readTimeout := 5 * time.Second
expectedBuff := []byte("It's time to UDP!\n")
o.Eventually(func() ([]byte, error) { return tryEchoUDPOnce(ip, udpPort, expectedBuff, readTimeout) }, 2*time.Minute, readTimeout).Should(o.Equal(expectedBuff))
return nil
}
func tryEchoTCP(svc *kapiv1.Service) error {
rawIP := svc.Spec.ClusterIP
if rawIP == "" {
return fmt.Errorf("no ClusterIP specified on service %s", svc.Name)
}
ip := net.ParseIP(rawIP)
var tcpPort int
for _, port := range svc.Spec.Ports {
if port.Protocol == "TCP" {
tcpPort = int(port.Port)
break
}
}
if tcpPort == 0 {
return fmt.Errorf("Unable to find any TCP ports on service %s", svc.Name)
}
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: ip, Port: tcpPort})
if err != nil {
return fmt.Errorf("unable to connect to service %s: %v", svc.Name, err)
}
if err = conn.SetDeadline(time.Now().Add(2 * time.Minute)); err != nil {
return fmt.Errorf("unable to set timeout on TCP connection to service %s: %v", svc.Name, err)
}
expectedBuff := []byte("It's time to TCP!\n")
var n int
if n, err = conn.Write(expectedBuff); err != nil {
return fmt.Errorf("unable to write data to echo server for service %s: %v", svc.Name, err)
} else if n != len(expectedBuff) {
return fmt.Errorf("unable to write all data to echo server for service %s", svc.Name)
}
actualBuff := make([]byte, n)
var amtRead int
amtRead, err = conn.Read(actualBuff)
if err != nil {
return fmt.Errorf("unable to read data from echo server for service %s: %v", svc.Name, err)
} else if amtRead != n {
return fmt.Errorf("unable to read all data written from echo server for service %s: %v", svc.Name, err)
}
if string(expectedBuff) != string(actualBuff) {
return fmt.Errorf("written contents %q didn't equal read contents %q from echo server for service %s: %v", string(expectedBuff), string(actualBuff), svc.Name, err)
}
return nil
}
func createFixture(oc *exutil.CLI, path string) ([]string, []string, error) {
output, err := oc.Run("create").Args("-f", path, "-o", "name").Output()
if err != nil {
return nil, nil, err
}
lines := strings.Split(output, "\n")
resources := make([]string, 0, len(lines)-1)
names := make([]string, 0, len(lines)-1)
for _, line := range lines {
if line == "" {
continue
}
parts := strings.Split(line, "/")
if len(parts) != 2 {
return nil, nil, fmt.Errorf("expected type/name syntax, got: %q", line)
}
resources = append(resources, parts[0])
names = append(names, parts[1])
}
return resources, names, nil
}
func checkSingleIdle(oc *exutil.CLI, idlingFile string, resources map[string][]string, resourceName string, kind string) {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Ensuring the scale is zero (and stays zero)")
objName := resources[resourceName][0]
// make sure we don't get woken up by an incorrect router health check or anything like that
o.Consistently(func() (string, error) {
return oc.Run("get").Args(resourceName+"/"+objName, "--output=jsonpath=\"{.spec.replicas}\"").Output()
}, 20*time.Second, 500*time.Millisecond).Should(o.ContainSubstring("0"))
g.By("Fetching the service and checking the annotations are present")
serviceName := resources["service"][0]
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).NotTo(o.HaveOccurred())
o.Expect(endpoints.Annotations).To(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).To(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
g.By("Checking the idled-at time")
idledAtAnnotation := endpoints.Annotations[unidlingapi.IdledAtAnnotation]
idledAtTime, err := time.Parse(time.RFC3339, idledAtAnnotation)
o.Expect(err).ToNot(o.HaveOccurred())
o.Expect(idledAtTime).To(o.BeTemporally("~", time.Now(), 5*time.Minute))
g.By("Checking the idle targets")
unidleTargetAnnotation := endpoints.Annotations[unidlingapi.UnidleTargetAnnotation]
unidleTargets := []unidlingapi.RecordedScaleReference{}
err = json.Unmarshal([]byte(unidleTargetAnnotation), &unidleTargets)
o.Expect(err).ToNot(o.HaveOccurred())
o.Expect(unidleTargets).To(o.Equal([]unidlingapi.RecordedScaleReference{
{
Replicas: 2,
CrossGroupObjectReference: unidlingapi.CrossGroupObjectReference{
Name: resources[resourceName][0],
Kind: kind,
},
},
}))
}
var _ = g.Describe("idling and unidling", func() {
defer g.GinkgoRecover()
var (
oc = exutil.NewCLI("cli-idling", exutil.KubeConfigPath()).Verbose()
echoServerFixture = exutil.FixturePath("testdata", "idling-echo-server.yaml")
echoServerRcFixture = exutil.FixturePath("testdata", "idling-echo-server-rc.yaml")
framework = oc.KubeFramework()
)
// path to the fixture
var fixture string
// path to the idling file
var idlingFile string
// map of all resources created from the fixtures
var resources map[string][]string
g.JustBeforeEach(func() {
g.By("Creating the resources")
rawResources, rawResourceNames, err := createFixture(oc, fixture)
o.Expect(err).ToNot(o.HaveOccurred())
resources = make(map[string][]string)
for i, resource := range rawResources {
resources[resource] = append(resources[resource], rawResourceNames[i])
}
g.By("Creating the idling file")
serviceNames := resources["service"]
targetFile, err := ioutil.TempFile(exutil.TestContext.OutputDir, "idling-services-")
o.Expect(err).ToNot(o.HaveOccurred())
defer targetFile.Close()
idlingFile = targetFile.Name()
_, err = targetFile.Write([]byte(strings.Join(serviceNames, "\n")))
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting for the endpoints to exist")
serviceName := resources["service"][0]
g.By("Waiting for endpoints to be up")
err = waitForEndpointsAvailable(oc, serviceName)
o.Expect(err).ToNot(o.HaveOccurred())
})
g.AfterEach(func() {
g.By("Cleaning up the idling file")
os.Remove(idlingFile)
})
g.Describe("idling", func() {
g.Context("with a single service and DeploymentConfig [Conformance]", func() {
g.BeforeEach(func() {
framework.BeforeEach()
fixture = echoServerFixture
})
g.It("should idle the service and DeploymentConfig properly", func() {
checkSingleIdle(oc, idlingFile, resources, "deploymentconfig", "DeploymentConfig")
})
})
g.Context("with a single service and ReplicationController", func() {
g.BeforeEach(func() {
framework.BeforeEach()
fixture = echoServerRcFixture
})
g.It("should idle the service and ReplicationController properly", func() {
checkSingleIdle(oc, idlingFile, resources, "replicationcontroller", "ReplicationController")
})
})
})
g.Describe("unidling", func() {
g.BeforeEach(func() {
framework.BeforeEach()
fixture = echoServerFixture
})
g.It("should work with TCP (when fully idled) [Conformance] [local]", func() {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting for the pods to have terminated")
err = waitForNoPodsAvailable(oc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Connecting to the service IP and checking the echo")
serviceName := resources["service"][0]
svc, err := oc.KubeClient().CoreV1().Services(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
err = tryEchoTCP(svc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting until we have endpoints")
err = waitForEndpointsAvailable(oc, serviceName)
o.Expect(err).ToNot(o.HaveOccurred())
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Making sure the endpoints are no longer marked as idled")
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
})
g.It("should work with TCP (while idling) [local]", func() {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Connecting to the service IP and repeatedly connecting, making sure we seamlessly idle and come back up")
serviceName := resources["service"][0]
svc, err := oc.KubeClient().CoreV1().Services(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
o.Consistently(func() error { return tryEchoTCP(svc) }, 10*time.Second, 500*time.Millisecond).ShouldNot(o.HaveOccurred())
g.By("Waiting until we have endpoints")
err = waitForEndpointsAvailable(oc, serviceName)
o.Expect(err).ToNot(o.HaveOccurred())
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Making sure the endpoints are no longer marked as idled")
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
})
g.It("should handle many TCP connections by dropping those under a certain bound [local]", func() {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting for the pods to have terminated")
serviceName := resources["service"][0]
err = waitForNoPodsAvailable(oc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Connecting to the service IP many times and checking the echo")
svc, err := oc.KubeClient().CoreV1().Services(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
connectionsToStart := 100
errors := make([]error, connectionsToStart)
var connWG sync.WaitGroup
// spawn many connections
for i := 0; i < connectionsToStart; i++ {
connWG.Add(1)
go func(ind int) {
defer connWG.Done()
err = tryEchoTCP(svc)
errors[ind] = err
}(i)
}
connWG.Wait()
g.By(fmt.Sprintf("Expecting all but %v of those connections to fail", unidlingproxy.MaxHeldConnections))
errCount := 0
for _, err := range errors {
if err != nil {
errCount++
}
}
o.Expect(errCount).To(o.Equal(connectionsToStart - unidlingproxy.MaxHeldConnections))
g.By("Waiting until we have endpoints")
err = waitForEndpointsAvailable(oc, serviceName)
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Making sure the endpoints are no longer marked as idled")
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
})
g.It("should work with UDP [local]", func() {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting for the pods to have terminated")
err = waitForNoPodsAvailable(oc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Connecting to the service IP and checking the echo")
serviceName := resources["service"][0]
svc, err := oc.KubeClient().CoreV1().Services(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
err = tryEchoUDP(svc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting until we have endpoints")
err = waitForEndpointsAvailable(oc, serviceName)
o.Expect(err).ToNot(o.HaveOccurred())
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Making sure the endpoints are no longer marked as idled")
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
})
// TODO: Work out how to make this test work correctly when run on AWS
g.XIt("should handle many UDP senders (by continuing to drop all packets on the floor) [local]", func() {
g.By("Idling the service")
_, err := oc.Run("idle").Args("--resource-names-file", idlingFile).Output()
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Waiting for the pods to have terminated")
err = waitForNoPodsAvailable(oc)
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Connecting to the service IP many times and checking the echo")
serviceName := resources["service"][0]
svc, err := oc.KubeClient().CoreV1().Services(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
connectionsToStart := 100
errors := make([]error, connectionsToStart)
var connWG sync.WaitGroup
// spawn many connectors
for i := 0; i < connectionsToStart; i++ {
connWG.Add(1)
go func(ind int) {
defer g.GinkgoRecover()
defer connWG.Done()
err = tryEchoUDP(svc)
errors[ind] = err
}(i)
}
connWG.Wait()
// all of the echoers should eventually succeed
errCount := 0
for _, err := range errors {
if err != nil {
errCount++
}
}
o.Expect(errCount).To(o.Equal(0))
g.By("Waiting until we have endpoints")
err = waitForEndpointsAvailable(oc, serviceName)
o.Expect(err).ToNot(o.HaveOccurred())
endpoints, err := oc.KubeClient().CoreV1().Endpoints(oc.Namespace()).Get(serviceName, metav1.GetOptions{})
o.Expect(err).ToNot(o.HaveOccurred())
g.By("Making sure the endpoints are no longer marked as idled")
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.IdledAtAnnotation))
o.Expect(endpoints.Annotations).NotTo(o.HaveKey(unidlingapi.UnidleTargetAnnotation))
})
})
})