Skip to content

Commit

Permalink
Fix inability to access NodePort in specific case
Browse files Browse the repository at this point in the history
When a Service NodePort and an Egress CRD has the same backend Pod, assumed
that the backend Pod is on Node A and the external IP of Egress is on Node
B. If an external client (not any K8s Nodes) accesses the NodePort through
the IP of the Node A in which the backend Pod is running, the access will
fail. The root cause is that the reply packets of NodePort is incorrectly
matched by the flow installed by Egress which is used to match the packets
sourced from local Pods and destined for tunneling to Node B. This PR fixes
the issue in two steps:
- Add match condition ct_state=-rpl+trk to the flow which matches the Egress
  packets sourced from local Pods and destined for external in L3Forwarding.
  The priority of this flow is 190.
- Lower the priority (from 190 to 180) of the default flow to match Service
  packets in table L3Forwarding.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 1, 2022
1 parent 57ef15c commit bf86baf
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 9 deletions.
26 changes: 17 additions & 9 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -78,6 +78,7 @@ var (
priorityHigh = uint16(210)
priorityNormal = uint16(200)
priorityLow = uint16(190)
priorityDefault = uint16(180)
priorityMiss = uint16(0)
priorityTopAntreaPolicy = uint16(64990)
priorityDNSIntercept = uint16(64991)
Expand Down Expand Up @@ -1337,16 +1338,20 @@ func (c *client) l3FwdServiceDefaultFlowsViaGW(ipProto binding.Protocol, categor
gatewayMAC := c.nodeConfig.GatewayConfig.MAC

flows := []binding.Flow{
// This flow is used to match the packets of Service traffic:
// - NodePort/LoadBalancer request packets which pass through Antrea gateway and the Service Endpoint is not on
// This generates the default flow to match Service packets, traffic cases include:
// - When Egress is not enabled:
// - Service request packets sourced from Antrea gateway and destined for external Endpoint.
// - Service request packets sourced from local Pods and destined for external Endpoint.
// - Service response packets sourced from local Pods.
// local Pod CIDR or any remote Pod CIDRs.
// - ClusterIP request packets which are from Antrea gateway and the Service Endpoint is not on local Pod CIDR
// or any remote Pod CIDRs.
// - NodePort/LoadBalancer/ClusterIP response packets.
// The matched packets should leave through Antrea gateway, however, they also enter through Antrea gateway. This
// is hairpin traffic.
// - When Egress is enabled:
// - Service request packets sourced from Antrea gateway and destined for external Endpoint.
// - Service response packets sourced from local Pods.
// Note that, when Egress is enabled, Service request packets sourced from local Pods and destined for external
// Endpoint should not be matched by the flow, and such packets should be matched by the flow with higher priority
// in L3ForwardingTable, which is installed by Egress.
// Skip traffic from AntreaFlexibleIPAM Pods.
L3ForwardingTable.BuildFlow(priorityLow).MatchProtocol(ipProto).
L3ForwardingTable.BuildFlow(priorityDefault).MatchProtocol(ipProto).
MatchCTMark(ServiceCTMark).
MatchCTStateTrk(true).
MatchRegMark(RewriteMACRegMark).
Expand Down Expand Up @@ -2159,10 +2164,13 @@ func (c *client) snatCommonFlows(nodeIP net.IP, localSubnet net.IPNet, localGate
// should bypass SNAT too. But it has been covered by the gatewayCT related flow generated in l3FwdFlowToGateway
// which forwards all reply traffic for such connections back to the gateway interface with the high priority.

// Send the traffic to external to SNATTable.
// This generates the flow to match the requests packets sourced from local Pods and destined for external, then
// forward the packets to SNATTable.
L3ForwardingTable.BuildFlow(priorityLow).
MatchProtocol(ipProto).
MatchRegMark(FromLocalRegMark).
MatchCTStateRpl(false).
MatchCTStateTrk(true).
Action().GotoTable(SNATTable.GetID()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
Expand Down
5 changes: 5 additions & 0 deletions test/e2e/framework.go
Expand Up @@ -1521,6 +1521,11 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity,
return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

// createNginxNodePortService creates a NodePort nginx service with the given name.
func (data *TestData) createNginxNodePortService(serviceName string, affinity, nodeLocalExternal bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) {
return data.createService(serviceName, testNamespace, 80, 80, map[string]string{"app": "nginx"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) {
svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions test/e2e/proxy_test.go
Expand Up @@ -27,6 +27,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"antrea.io/antrea/pkg/agent/config"
agentconfig "antrea.io/antrea/pkg/config/agent"
controllerconfig "antrea.io/antrea/pkg/config/controller"
"antrea.io/antrea/pkg/features"
)

Expand Down Expand Up @@ -381,6 +384,87 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca
})
}

func TestNodePortAndEgressWithTheSameBackendPod(t *testing.T) {
skipIfHasWindowsNodes(t)
skipIfNotIPv4Cluster(t)
skipIfNumNodesLessThan(t, 2)
skipIfProxyDisabled(t)

data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
skipIfProxyAllDisabled(t, data)
skipIfEncapModeIsNot(t, data, config.TrafficEncapModeEncap) // Egress works for encap mode only.

cc := func(config *controllerconfig.ControllerConfig) {
config.FeatureGates["Egress"] = true
}
ac := func(config *agentconfig.AgentConfig) {
config.FeatureGates["Egress"] = true
}

if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil {
t.Fatalf("Failed to enable Egress feature: %v", err)
}

// Create a NodePort Service.
nodePortIP := controlPlaneNodeIPv4()
ipProtocol := corev1.IPv4Protocol
var portStr string
nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", true, false, &ipProtocol)
require.NoError(t, err)
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
portStr = fmt.Sprint(port.NodePort)
break
}
}
testNodePortURL := net.JoinHostPort(nodePortIP, portStr)

// Create an Egress whose external IP is on worker Node.
egressNodeIP := workerNodeIPv4(1)
egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP)
defer data.crdClient.CrdV1alpha2().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{})

// Create the backend Pod on control plane Node.
backendPodName := "test-nodeport-egress-backend-pod"
require.NoError(t, data.createNginxPodOnNode(backendPodName, testNamespace, controlPlaneNodeName(), false))
defer deletePodWrapper(t, data, testNamespace, backendPodName)
if err := data.podWaitForRunning(defaultTimeout, backendPodName, testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName)
}

// Create another netns to fake an external network on the host network Pod.
testPod := "test-client"
testNetns := "test-ns"
cmd := fmt.Sprintf(`ip netns add %[1]s && \
ip link add dev %[1]s-a type veth peer name %[1]s-b && \
ip link set dev %[1]s-a netns %[1]s && \
ip addr add %[3]s/%[4]d dev %[1]s-b && \
ip link set dev %[1]s-b up && \
ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \
ip netns exec %[1]s ip link set dev %[1]s-a up && \
ip netns exec %[1]s ip route replace default via %[3]s && \
sleep 3600
`, testNetns, "1.1.1.1", "1.1.1.254", 24)
if err := data.createPodOnNode(testPod, testNamespace, controlPlaneNodeName(), agnhostImage, []string{"sh", "-c", cmd}, nil, nil, nil, true, func(pod *corev1.Pod) {
privileged := true
pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{Privileged: &privileged}
}); err != nil {
t.Fatalf("Failed to create client Pod: %v", err)
}
defer deletePodWrapper(t, data, testNamespace, testPod)
if err := data.podWaitForRunning(defaultTimeout, testPod, testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", testPod)
}
// Connect to NodePort on control plane Node in the fake external network.
cmd = fmt.Sprintf("ip netns exec %s curl --connect-timeout 1 --retry 5 --retry-connrefused %s", testNetns, testNodePortURL)
_, _, err = data.runCommandFromPod(testNamespace, testPod, agnhostContainerName, []string{"sh", "-c", cmd})
require.NoError(t, err, "Service NodePort should be able to be connected from external network when Egress is enabled")
}

func createAgnhostPod(t *testing.T, data *TestData, podName string, node string, hostNetwork bool) {
args := []string{"netexec", "--http-port=8080"}
ports := []corev1.ContainerPort{
Expand Down

0 comments on commit bf86baf

Please sign in to comment.