Skip to content

Commit

Permalink
test: Add sessionAffinity integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Martynas Pumputis <m@lambda.lt>
  • Loading branch information
brb authored and qmonnet committed Apr 29, 2020
1 parent f4fd2cf commit dbdf127
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 0 deletions.
16 changes: 16 additions & 0 deletions test/helpers/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ var (
"global.ipv6.enabled": "true",
"global.psp.enabled": "true",
"global.ci.kubeCacheMutationDetector": "true",
// Disable by default, so that 4.9 CI build does not panic due to
// missing LRU support. On 4.19 and net-next we enable it with
// kubeProxyReplacement=strict.
"global.sessionAffinity.enabled": "false",
}

flannelHelmOverrides = map[string]string{
Expand Down Expand Up @@ -2088,6 +2092,18 @@ func (kub *Kubectl) WaitPolicyDeleted(pod string, policyName string) error {
return WithTimeout(body, fmt.Sprintf("Policy %s was not deleted in time", policyName), &TimeoutConfig{Timeout: HelperTimeout})
}

// WaitPodDeleted waits for pods with the given name to be deleted.
func (kub *Kubectl) WaitPodDeleted(namespace string, pod string) error {
body := func() bool {
_, cancel := context.WithTimeout(context.Background(), ShortCommandTimeout)
defer cancel()
res := kub.GetPods(namespace, pod)
return !res.WasSuccessful()
}

return WithTimeout(body, fmt.Sprintf("Pod %s was not deleted in time", pod), &TimeoutConfig{Timeout: HelperTimeout})
}

// CiliumIsPolicyLoaded returns true if the policy is loaded in the given
// cilium Pod. it returns false in case that the policy is not in place
func (kub *Kubectl) CiliumIsPolicyLoaded(pod string, policyCmd string) bool {
Expand Down
93 changes: 93 additions & 0 deletions test/k8sT/Services.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ var _ = Describe("K8sServicesTest", func() {
doRequestsFromThirdHostWithLocalPort :=
func(url string, count int, checkSourceIP bool, fromPort int) {
var cmd string

By("Making %d HTTP requests from outside cluster to %q", count, url)
for i := 1; i <= count; i++ {
if fromPort == 0 {
Expand All @@ -439,6 +440,7 @@ var _ = Describe("K8sServicesTest", func() {
}
}
}

doRequestsFromThirdHost := func(url string, count int, checkSourceIP bool) {
doRequestsFromThirdHostWithLocalPort(url, count, checkSourceIP, 0)
}
Expand Down Expand Up @@ -732,6 +734,87 @@ var _ = Describe("K8sServicesTest", func() {
kubectl.CiliumExecMustSucceed(context.TODO(), pod, "cilium bpf ct flush global", "Unable to flush CT maps")
}

// fromOutside=true tests session affinity implementation from lb.h, while
// fromOutside=false tests from bpf_sock.c.
testSessionAffinity := func(fromOutside bool) {
var (
data v1.Service
dstPod string
count = 10
from string
err error
res *helpers.CmdRes
)

err = kubectl.Get(helpers.DefaultNamespace, "service test-affinity").Unmarshal(&data)
Expect(err).Should(BeNil(), "Cannot retrieve service")
_, k8s1IP := kubectl.GetNodeInfo(helpers.K8s1)

httpURL := getHTTPLink(k8s1IP, data.Spec.Ports[0].NodePort)
cmd := helpers.CurlFail(httpURL) + " | grep 'Hostname:' " // pod name is in the hostname

if fromOutside {
from, _ = kubectl.GetNodeInfo(helpers.GetNodeWithoutCilium())
} else {
pods, err := kubectl.GetPodNames(helpers.DefaultNamespace, testDSClient)
ExpectWithOffset(1, err).Should(BeNil(), "cannot retrieve pod names by filter %q", testDSClient)
from = pods[0]
}

// Send 10 requests to the test-affinity and check that the same backend is chosen

By("Making %d HTTP requests from %s to %q (sessionAffinity)", count, from, httpURL)

for i := 1; i <= count; i++ {
if fromOutside {
res, err = kubectl.ExecInHostNetNS(context.TODO(), from, cmd)
Expect(err).Should(BeNil(), "Cannot exec in %s host netns", from)
} else {
res = kubectl.ExecPodCmd(helpers.DefaultNamespace, from, cmd)
}
ExpectWithOffset(1, res).Should(helpers.CMDSuccess(),
"Cannot connect to service %q from %s (%d/%d)", httpURL, from, i, count)
pod := strings.TrimSpace(strings.Split(res.GetStdOut(), ": ")[1])
if i == 1 {
// Retrieve the destination pod from the first request
dstPod = pod
} else {
// Check that destination pod is always the same
Expect(dstPod).To(Equal(pod))
}
}

// Delete the pod, and check that a new backend is chosen
kubectl.DeleteResource("pod", dstPod).ExpectSuccess("Unable to delete %s pod", dstPod)
kubectl.WaitPodDeleted(helpers.DefaultNamespace, dstPod)
// Unfortunately, it takes a while until cilium-agent receives Endpoint
// update event which triggers a removal of the deleted dstPod from
// the affinity and the service BPF maps. Therefore, the requests below
// are flaky.
// TODO(brb) don't sleep, instead wait for Endpoint obj update (might be complicated though)
time.Sleep(7 * time.Second)

for i := 1; i <= count; i++ {
if fromOutside {
res, err = kubectl.ExecInHostNetNS(context.TODO(), from, cmd)
Expect(err).Should(BeNil(), "Cannot exec in %s host netns", from)
} else {
res = kubectl.ExecPodCmd(helpers.DefaultNamespace, from, cmd)
}
ExpectWithOffset(1, res).Should(helpers.CMDSuccess(),
"Cannot connect to service %q from %s (%d/%d) after restart", httpURL, from, i, count)
pod := strings.TrimSpace(strings.Split(res.GetStdOut(), ": ")[1])
if i == 1 {
// Retrieve the destination pod from the first request
Expect(dstPod).ShouldNot(Equal(pod))
dstPod = pod
} else {
// Check that destination pod is always the same
Expect(dstPod).To(Equal(pod))
}
}
}

testExternalTrafficPolicyLocal := func() {
var (
data v1.Service
Expand Down Expand Up @@ -950,6 +1033,11 @@ var _ = Describe("K8sServicesTest", func() {
testExternalTrafficPolicyLocal()
})

It("Tests NodePort with sessionAffinity", func() {
testSessionAffinity(false)
testSessionAffinity(true)
})

It("Tests HealthCheckNodePort", func() {
testHealthCheckNodePort()
})
Expand All @@ -975,6 +1063,11 @@ var _ = Describe("K8sServicesTest", func() {
testExternalTrafficPolicyLocal()
})

It("Tests NodePort with sessionAffinity", func() {
testSessionAffinity(false)
testSessionAffinity(true)
})

It("Tests HealthCheckNodePort", func() {
testHealthCheckNodePort()
})
Expand Down
19 changes: 19 additions & 0 deletions test/k8sT/manifests/demo_ds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ spec:
---
apiVersion: v1
kind: Service
metadata:
name: test-affinity
spec:
type: NodePort
ports:
- port: 10080
targetPort: 80
protocol: TCP
name: http
- port: 10069
targetPort: 69
protocol: UDP
name: tftp
sessionAffinity: ClientIP
selector:
zgroup: testDS
---
apiVersion: v1
kind: Service
metadata:
name: test-nodeport-local
spec:
Expand Down

0 comments on commit dbdf127

Please sign in to comment.