From 17c22a2b7966196d1102ed4756961b8acdc6232a Mon Sep 17 00:00:00 2001 From: Paul Chaignon Date: Fri, 24 Apr 2020 15:57:50 +0200 Subject: [PATCH 1/2] test: Simplify fragment tracking test The fragment tracking test sends a couple fragments to a K8s service and checks the ctmap content to ensure they were properly sent and received. It used to filter based on the 4-tuple. That is however making the test convoluted when adding support for kube-proxy to the test (new commit). This commit therefore switches to filtering based on the 3-tuple (source port, destination IP and port). Support for sending from k8s2 is also removed since it wasn't used. Signed-off-by: Paul Chaignon --- test/k8sT/Services.go | 56 ++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/test/k8sT/Services.go b/test/k8sT/Services.go index ee348b6b6aae..2354f4504335 100644 --- a/test/k8sT/Services.go +++ b/test/k8sT/Services.go @@ -444,28 +444,21 @@ var _ = Describe("K8sServicesTest", func() { } // srcPod: Name of pod sending the datagram - // srcIP: IPv4 of pod sending the datagram - // fromNode: Node hosting the pod sending the datagram + // srcPort: Source UDP port // dstPodIP: Receiver pod IP (for checking in CT table) // dstPodPort: Receiver pod port (for checking in CT table) // dstIP: Target endpoint IP for sending the datagram // dstPort: Target endpoint port for sending the datagram - doFragmentedRequest := func(srcPod string, srcIP string, fromNode string, dstPodPort int, dstIP string, dstPort int32) { + doFragmentedRequest := func(srcPod string, srcPort, dstPodPort int, dstIP string, dstPort int32) { var ( blockSize = 5120 blockCount = 1 - srcPort = 12345 ) ciliumPodK8s1, err := kubectl.GetCiliumPodOnNode(helpers.CiliumNamespace, helpers.K8s1) Expect(err).Should(BeNil(), fmt.Sprintf("Cannot get cilium pod on k8s1")) ciliumPodK8s2, err := kubectl.GetCiliumPodOnNode(helpers.CiliumNamespace, helpers.K8s2) Expect(err).Should(BeNil(), fmt.Sprintf("Cannot get cilium pod on k8s2")) - ciliumPod := ciliumPodK8s1 - if fromNode == helpers.K8s2 { - ciliumPod = ciliumPodK8s2 - } - _, dstPodIPK8s1 := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDS, 1) _, dstPodIPK8s2 := kubectl.GetPodOnNodeWithOffset(helpers.K8s2, testDS, 1) @@ -479,13 +472,13 @@ var _ = Describe("K8sServicesTest", func() { cmdIn := "cilium bpf ct list global | awk '/%s/ { sub(\".*=\",\"\", $7); print $7 }'" endpointK8s1 := fmt.Sprintf("%s:%d", dstPodIPK8s1, dstPodPort) - patternInK8s1 := fmt.Sprintf("UDP IN %s:%d -> %s", srcIP, srcPort, endpointK8s1) + patternInK8s1 := fmt.Sprintf("UDP IN [^:]+:%d -> %s", srcPort, endpointK8s1) cmdInK8s1 := fmt.Sprintf(cmdIn, patternInK8s1) res := kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdInK8s1) countInK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) endpointK8s2 := fmt.Sprintf("%s:%d", dstPodIPK8s2, dstPodPort) - patternInK8s2 := fmt.Sprintf("UDP IN %s:%d -> %s", srcIP, srcPort, endpointK8s2) + patternInK8s2 := fmt.Sprintf("UDP IN [^:]+:%d -> %s", srcPort, endpointK8s2) cmdInK8s2 := fmt.Sprintf(cmdIn, patternInK8s2) res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s2, cmdInK8s2) countInK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) @@ -493,14 +486,14 @@ var _ = Describe("K8sServicesTest", func() { // Field #11 is "TxPackets=" cmdOut := "cilium bpf ct list global | awk '/%s/ { sub(\".*=\",\"\", $11); print $11 }'" - patternOutK8s1 := fmt.Sprintf("UDP OUT %s:%d -> %s", srcIP, srcPort, endpointK8s1) + patternOutK8s1 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s1) cmdOutK8s1 := fmt.Sprintf(cmdOut, patternOutK8s1) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s1) + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1) countOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) - patternOutK8s2 := fmt.Sprintf("UDP OUT %s:%d -> %s", srcIP, srcPort, endpointK8s2) + patternOutK8s2 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s2) cmdOutK8s2 := fmt.Sprintf(cmdOut, patternOutK8s2) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s2) + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) countOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) // Send datagram @@ -533,16 +526,16 @@ var _ = Describe("K8sServicesTest", func() { Expect([]int{newCountInK8s1, newCountInK8s2}).To(SatisfyAny( Equal([]int{countInK8s1, countInK8s2 + delta}), Equal([]int{countInK8s1 + delta, countInK8s2}), - ), "Failed to account for IPv4 fragments (in)") + ), "Failed to account for IPv4 fragments to %s (in)", dstIP) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s1) + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1) newCountOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s2) + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) newCountOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) Expect([]int{newCountOutK8s1, newCountOutK8s2}).To(SatisfyAny( Equal([]int{countOutK8s1, countOutK8s2 + delta}), Equal([]int{countOutK8s1 + delta, countOutK8s2}), - ), "Failed to account for IPv4 fragments (out)") + ), "Failed to account for IPv4 fragments to %s (out)", dstIP) } testNodePort := func(bpfNodePort bool) { @@ -827,7 +820,10 @@ var _ = Describe("K8sServicesTest", func() { } testIPv4FragmentSupport := func() { - var data v1.Service + var ( + data v1.Service + srcPort = 12345 + ) k8s1Name, k8s1IP := kubectl.GetNodeInfo(helpers.K8s1) k8s2Name, k8s2IP := kubectl.GetNodeInfo(helpers.K8s2) @@ -836,7 +832,7 @@ var _ = Describe("K8sServicesTest", func() { // Get testDSClient and testDS pods running on k8s1. // This is because we search for new packets in the // conntrack table for node k8s1. - clientPod, clientIP := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDSClient, 1) + clientPod, _ := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDSClient, 1) err := kubectl.Get(helpers.DefaultNamespace, "service test-nodeport").Unmarshal(&data) Expect(err).Should(BeNil(), "Cannot retrieve service") @@ -844,13 +840,13 @@ var _ = Describe("K8sServicesTest", func() { serverPort := data.Spec.Ports[1].TargetPort.IntValue() // With ClusterIP - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port) + doFragmentedRequest(clientPod, srcPort, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port) // From pod via node IPs - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, k8s1IP, nodePort) - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+k8s1IP, nodePort) - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, k8s2IP, nodePort) - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+k8s2IP, nodePort) + doFragmentedRequest(clientPod, srcPort+1, serverPort, k8s1IP, nodePort) + doFragmentedRequest(clientPod, srcPort+2, serverPort, "::ffff:"+k8s1IP, nodePort) + doFragmentedRequest(clientPod, srcPort+3, serverPort, k8s2IP, nodePort) + doFragmentedRequest(clientPod, srcPort+4, serverPort, "::ffff:"+k8s2IP, nodePort) if helpers.RunsWithoutKubeProxy() { localCiliumHostIPv4, err := kubectl.GetCiliumHostIPv4(context.TODO(), k8s1Name) @@ -859,12 +855,12 @@ var _ = Describe("K8sServicesTest", func() { Expect(err).Should(BeNil(), "Cannot retrieve remote cilium_host ipv4") // From pod via local cilium_host - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, localCiliumHostIPv4, nodePort) - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+5, serverPort, localCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+6, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort) // From pod via remote cilium_host - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, remoteCiliumHostIPv4, nodePort) - doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+7, serverPort, remoteCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+8, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort) } } From e7c48a2fd1811ad1ff069485e567f36cdcb602d7 Mon Sep 17 00:00:00 2001 From: Paul Chaignon Date: Fri, 24 Apr 2020 16:03:46 +0200 Subject: [PATCH 2/2] test: Fix fragment tracking test under KUBEPROXY=1 The fragment tracking test currently fails when kube-proxy is enabled because the destination IP address and port are sometimes not DNATed. The awk filter on the ctmap dump output fails in that case. This commit fixes this issue by checking if kube-proxy is enabled and adapting the destination on which to match as a result. Fixes: #10929 Signed-off-by: Paul Chaignon --- test/k8sT/Services.go | 49 ++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/test/k8sT/Services.go b/test/k8sT/Services.go index 2354f4504335..edeeeeeddf2a 100644 --- a/test/k8sT/Services.go +++ b/test/k8sT/Services.go @@ -449,7 +449,8 @@ var _ = Describe("K8sServicesTest", func() { // dstPodPort: Receiver pod port (for checking in CT table) // dstIP: Target endpoint IP for sending the datagram // dstPort: Target endpoint port for sending the datagram - doFragmentedRequest := func(srcPod string, srcPort, dstPodPort int, dstIP string, dstPort int32) { + // kubeProxy: True if kube-proxy is enabled + doFragmentedRequest := func(srcPod string, srcPort, dstPodPort int, dstIP string, dstPort int32, kubeProxy bool) { var ( blockSize = 5120 blockCount = 1 @@ -486,15 +487,27 @@ var _ = Describe("K8sServicesTest", func() { // Field #11 is "TxPackets=" cmdOut := "cilium bpf ct list global | awk '/%s/ { sub(\".*=\",\"\", $11); print $11 }'" + if kubeProxy { + // If kube-proxy is enabled, we see packets in ctmap with the + // service's IP address and port, not backend's. + dstIPv4 := strings.Replace(dstIP, "::ffff:", "", 1) + endpointK8s1 = fmt.Sprintf("%s:%d", dstIPv4, dstPort) + endpointK8s2 = endpointK8s1 + } patternOutK8s1 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s1) cmdOutK8s1 := fmt.Sprintf(cmdOut, patternOutK8s1) res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1) countOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) + // If kube-proxy is enabled, the two commands are the same and + // there's no point executing it twice. + countOutK8s2 := 0 patternOutK8s2 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s2) cmdOutK8s2 := fmt.Sprintf(cmdOut, patternOutK8s2) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) - countOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) + if !kubeProxy { + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) + countOutK8s2, _ = strconv.Atoi(strings.TrimSpace(res.GetStdOut())) + } // Send datagram By("Sending a fragmented packet from %s to endpoint %s:%d", srcPod, dstIP, dstPort) @@ -530,8 +543,13 @@ var _ = Describe("K8sServicesTest", func() { res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1) newCountOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) - res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) - newCountOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut())) + // If kube-proxy is enabled, the two commands are the same and + // there's no point executing it twice. + newCountOutK8s2 := 0 + if !kubeProxy { + res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2) + newCountOutK8s2, _ = strconv.Atoi(strings.TrimSpace(res.GetStdOut())) + } Expect([]int{newCountOutK8s1, newCountOutK8s2}).To(SatisfyAny( Equal([]int{countOutK8s1, countOutK8s2 + delta}), Equal([]int{countOutK8s1 + delta, countOutK8s2}), @@ -826,6 +844,7 @@ var _ = Describe("K8sServicesTest", func() { ) k8s1Name, k8s1IP := kubectl.GetNodeInfo(helpers.K8s1) k8s2Name, k8s2IP := kubectl.GetNodeInfo(helpers.K8s2) + kubeProxy := !helpers.RunsWithoutKubeProxy() waitPodsDs() @@ -840,27 +859,27 @@ var _ = Describe("K8sServicesTest", func() { serverPort := data.Spec.Ports[1].TargetPort.IntValue() // With ClusterIP - doFragmentedRequest(clientPod, srcPort, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port) + doFragmentedRequest(clientPod, srcPort, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port, false) // From pod via node IPs - doFragmentedRequest(clientPod, srcPort+1, serverPort, k8s1IP, nodePort) - doFragmentedRequest(clientPod, srcPort+2, serverPort, "::ffff:"+k8s1IP, nodePort) - doFragmentedRequest(clientPod, srcPort+3, serverPort, k8s2IP, nodePort) - doFragmentedRequest(clientPod, srcPort+4, serverPort, "::ffff:"+k8s2IP, nodePort) + doFragmentedRequest(clientPod, srcPort+1, serverPort, k8s1IP, nodePort, kubeProxy) + doFragmentedRequest(clientPod, srcPort+2, serverPort, "::ffff:"+k8s1IP, nodePort, kubeProxy) + doFragmentedRequest(clientPod, srcPort+3, serverPort, k8s2IP, nodePort, kubeProxy) + doFragmentedRequest(clientPod, srcPort+4, serverPort, "::ffff:"+k8s2IP, nodePort, kubeProxy) - if helpers.RunsWithoutKubeProxy() { + if !kubeProxy { localCiliumHostIPv4, err := kubectl.GetCiliumHostIPv4(context.TODO(), k8s1Name) Expect(err).Should(BeNil(), "Cannot retrieve local cilium_host ipv4") remoteCiliumHostIPv4, err := kubectl.GetCiliumHostIPv4(context.TODO(), k8s2Name) Expect(err).Should(BeNil(), "Cannot retrieve remote cilium_host ipv4") // From pod via local cilium_host - doFragmentedRequest(clientPod, srcPort+5, serverPort, localCiliumHostIPv4, nodePort) - doFragmentedRequest(clientPod, srcPort+6, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+5, serverPort, localCiliumHostIPv4, nodePort, kubeProxy) + doFragmentedRequest(clientPod, srcPort+6, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort, kubeProxy) // From pod via remote cilium_host - doFragmentedRequest(clientPod, srcPort+7, serverPort, remoteCiliumHostIPv4, nodePort) - doFragmentedRequest(clientPod, srcPort+8, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort) + doFragmentedRequest(clientPod, srcPort+7, serverPort, remoteCiliumHostIPv4, nodePort, kubeProxy) + doFragmentedRequest(clientPod, srcPort+8, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort, kubeProxy) } }