Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Fix fragment tracking test under KUBEPROXY=1 #11098

Merged
merged 2 commits into from Apr 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 48 additions & 33 deletions test/k8sT/Services.go
Expand Up @@ -444,28 +444,22 @@ var _ = Describe("K8sServicesTest", func() {
}

// srcPod: Name of pod sending the datagram
// srcIP: IPv4 of pod sending the datagram
// fromNode: Node hosting the pod sending the datagram
// srcPort: Source UDP port
// dstPodIP: Receiver pod IP (for checking in CT table)
// dstPodPort: Receiver pod port (for checking in CT table)
// dstIP: Target endpoint IP for sending the datagram
// dstPort: Target endpoint port for sending the datagram
doFragmentedRequest := func(srcPod string, srcIP string, fromNode string, dstPodPort int, dstIP string, dstPort int32) {
// kubeProxy: True if kube-proxy is enabled
doFragmentedRequest := func(srcPod string, srcPort, dstPodPort int, dstIP string, dstPort int32, kubeProxy bool) {
var (
blockSize = 5120
blockCount = 1
srcPort = 12345
)
ciliumPodK8s1, err := kubectl.GetCiliumPodOnNode(helpers.CiliumNamespace, helpers.K8s1)
Expect(err).Should(BeNil(), fmt.Sprintf("Cannot get cilium pod on k8s1"))
ciliumPodK8s2, err := kubectl.GetCiliumPodOnNode(helpers.CiliumNamespace, helpers.K8s2)
Expect(err).Should(BeNil(), fmt.Sprintf("Cannot get cilium pod on k8s2"))

ciliumPod := ciliumPodK8s1
if fromNode == helpers.K8s2 {
ciliumPod = ciliumPodK8s2
}

_, dstPodIPK8s1 := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDS, 1)
_, dstPodIPK8s2 := kubectl.GetPodOnNodeWithOffset(helpers.K8s2, testDS, 1)

Expand All @@ -479,29 +473,41 @@ var _ = Describe("K8sServicesTest", func() {
cmdIn := "cilium bpf ct list global | awk '/%s/ { sub(\".*=\",\"\", $7); print $7 }'"

endpointK8s1 := fmt.Sprintf("%s:%d", dstPodIPK8s1, dstPodPort)
patternInK8s1 := fmt.Sprintf("UDP IN %s:%d -> %s", srcIP, srcPort, endpointK8s1)
patternInK8s1 := fmt.Sprintf("UDP IN [^:]+:%d -> %s", srcPort, endpointK8s1)
cmdInK8s1 := fmt.Sprintf(cmdIn, patternInK8s1)
res := kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdInK8s1)
countInK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))

endpointK8s2 := fmt.Sprintf("%s:%d", dstPodIPK8s2, dstPodPort)
patternInK8s2 := fmt.Sprintf("UDP IN %s:%d -> %s", srcIP, srcPort, endpointK8s2)
patternInK8s2 := fmt.Sprintf("UDP IN [^:]+:%d -> %s", srcPort, endpointK8s2)
cmdInK8s2 := fmt.Sprintf(cmdIn, patternInK8s2)
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s2, cmdInK8s2)
countInK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))

// Field #11 is "TxPackets=<n>"
cmdOut := "cilium bpf ct list global | awk '/%s/ { sub(\".*=\",\"\", $11); print $11 }'"

patternOutK8s1 := fmt.Sprintf("UDP OUT %s:%d -> %s", srcIP, srcPort, endpointK8s1)
if kubeProxy {
// If kube-proxy is enabled, we see packets in ctmap with the
// service's IP address and port, not backend's.
dstIPv4 := strings.Replace(dstIP, "::ffff:", "", 1)
endpointK8s1 = fmt.Sprintf("%s:%d", dstIPv4, dstPort)
endpointK8s2 = endpointK8s1
}
pchaigno marked this conversation as resolved.
Show resolved Hide resolved
patternOutK8s1 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s1)
cmdOutK8s1 := fmt.Sprintf(cmdOut, patternOutK8s1)
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s1)
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1)
countOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))

patternOutK8s2 := fmt.Sprintf("UDP OUT %s:%d -> %s", srcIP, srcPort, endpointK8s2)
// If kube-proxy is enabled, the two commands are the same and
// there's no point executing it twice.
countOutK8s2 := 0
patternOutK8s2 := fmt.Sprintf("UDP OUT [^:]+:%d -> %s", srcPort, endpointK8s2)
cmdOutK8s2 := fmt.Sprintf(cmdOut, patternOutK8s2)
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s2)
countOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))
if !kubeProxy {
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2)
countOutK8s2, _ = strconv.Atoi(strings.TrimSpace(res.GetStdOut()))
}

// Send datagram
By("Sending a fragmented packet from %s to endpoint %s:%d", srcPod, dstIP, dstPort)
Expand Down Expand Up @@ -533,16 +539,21 @@ var _ = Describe("K8sServicesTest", func() {
Expect([]int{newCountInK8s1, newCountInK8s2}).To(SatisfyAny(
Equal([]int{countInK8s1, countInK8s2 + delta}),
Equal([]int{countInK8s1 + delta, countInK8s2}),
), "Failed to account for IPv4 fragments (in)")
), "Failed to account for IPv4 fragments to %s (in)", dstIP)

res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s1)
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s1)
newCountOutK8s1, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPod, cmdOutK8s2)
newCountOutK8s2, _ := strconv.Atoi(strings.TrimSpace(res.GetStdOut()))
// If kube-proxy is enabled, the two commands are the same and
// there's no point executing it twice.
newCountOutK8s2 := 0
if !kubeProxy {
res = kubectl.CiliumExecMustSucceed(context.TODO(), ciliumPodK8s1, cmdOutK8s2)
newCountOutK8s2, _ = strconv.Atoi(strings.TrimSpace(res.GetStdOut()))
}
Expect([]int{newCountOutK8s1, newCountOutK8s2}).To(SatisfyAny(
Equal([]int{countOutK8s1, countOutK8s2 + delta}),
Equal([]int{countOutK8s1 + delta, countOutK8s2}),
), "Failed to account for IPv4 fragments (out)")
), "Failed to account for IPv4 fragments to %s (out)", dstIP)
}

testNodePort := func(bpfNodePort bool) {
Expand Down Expand Up @@ -827,44 +838,48 @@ var _ = Describe("K8sServicesTest", func() {
}

testIPv4FragmentSupport := func() {
var data v1.Service
var (
data v1.Service
srcPort = 12345
)
k8s1Name, k8s1IP := kubectl.GetNodeInfo(helpers.K8s1)
k8s2Name, k8s2IP := kubectl.GetNodeInfo(helpers.K8s2)
kubeProxy := !helpers.RunsWithoutKubeProxy()

waitPodsDs()

// Get testDSClient and testDS pods running on k8s1.
// This is because we search for new packets in the
// conntrack table for node k8s1.
clientPod, clientIP := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDSClient, 1)
clientPod, _ := kubectl.GetPodOnNodeWithOffset(helpers.K8s1, testDSClient, 1)

err := kubectl.Get(helpers.DefaultNamespace, "service test-nodeport").Unmarshal(&data)
Expect(err).Should(BeNil(), "Cannot retrieve service")
nodePort := data.Spec.Ports[1].NodePort
serverPort := data.Spec.Ports[1].TargetPort.IntValue()

// With ClusterIP
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port)
doFragmentedRequest(clientPod, srcPort, serverPort, data.Spec.ClusterIP, data.Spec.Ports[1].Port, false)

// From pod via node IPs
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, k8s1IP, nodePort)
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+k8s1IP, nodePort)
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, k8s2IP, nodePort)
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+k8s2IP, nodePort)
doFragmentedRequest(clientPod, srcPort+1, serverPort, k8s1IP, nodePort, kubeProxy)
doFragmentedRequest(clientPod, srcPort+2, serverPort, "::ffff:"+k8s1IP, nodePort, kubeProxy)
doFragmentedRequest(clientPod, srcPort+3, serverPort, k8s2IP, nodePort, kubeProxy)
doFragmentedRequest(clientPod, srcPort+4, serverPort, "::ffff:"+k8s2IP, nodePort, kubeProxy)

if helpers.RunsWithoutKubeProxy() {
if !kubeProxy {
localCiliumHostIPv4, err := kubectl.GetCiliumHostIPv4(context.TODO(), k8s1Name)
Expect(err).Should(BeNil(), "Cannot retrieve local cilium_host ipv4")
remoteCiliumHostIPv4, err := kubectl.GetCiliumHostIPv4(context.TODO(), k8s2Name)
Expect(err).Should(BeNil(), "Cannot retrieve remote cilium_host ipv4")

// From pod via local cilium_host
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, localCiliumHostIPv4, nodePort)
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort)
doFragmentedRequest(clientPod, srcPort+5, serverPort, localCiliumHostIPv4, nodePort, kubeProxy)
doFragmentedRequest(clientPod, srcPort+6, serverPort, "::ffff:"+localCiliumHostIPv4, nodePort, kubeProxy)

// From pod via remote cilium_host
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, remoteCiliumHostIPv4, nodePort)
doFragmentedRequest(clientPod, clientIP, helpers.K8s1, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort)
doFragmentedRequest(clientPod, srcPort+7, serverPort, remoteCiliumHostIPv4, nodePort, kubeProxy)
doFragmentedRequest(clientPod, srcPort+8, serverPort, "::ffff:"+remoteCiliumHostIPv4, nodePort, kubeProxy)
}
}

Expand Down