Skip to content

Commit

Permalink
Fix inability to access NodePort in particular case
Browse files Browse the repository at this point in the history
When a Service NodePort and an Egress CRD has the same backend Pod, accessing
to the NodePort Service may fail in particular cases. Assume that the backend
Pod is on Node A and the Egress's external IP is on Node B. If an external
client (not any K8s Node) accesses the NodePort through IP of Node A where
the backend Pod is running, the access will fail. The root cause is that the
reply packets of NodePort is incorrectly matched by the flow installed by Egress
which is used to match the packets sourced from local Pods and destined for
tunneling to Node B. This PR fixes the issue by loading NXM_NX_REG0[0..3]
(PktSourceField, field to mark packet source) to NXM_NX_CT_MARK[0..3] when Service
connection is committed, then the reply packets of Service connection sourced
from Antrea gateway can be matched by NXM_NX_CT_MARK[0..3] and forced back to
Antrea gateway.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 2, 2022
1 parent 57ef15c commit 5317506
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/conntrack_linux_test.go
Expand Up @@ -140,7 +140,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
// Set expect call for mock ovsCtlClient
ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=4,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=16,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
outputFlow := strings.Split(string(ovsctlCmdOutput), "\n")
expConn := &flowexporter.Connection{
ID: 982464968,
Expand Down
27 changes: 16 additions & 11 deletions pkg/agent/openflow/fields.go
Expand Up @@ -21,6 +21,11 @@ import (

// Fields using reg.
var (
fromTunnelVal = uint32(0)
fromGatewayVal = uint32(1)
fromLocalVal = uint32(2)
fromUplinkVal = uint32(3)
fromBridgeVal = uint32(4)
// reg0 (NXM_NX_REG0)
// reg0[0..3]: Field to mark the packet source. Marks in this field include,
// - 0: from the tunnel port
Expand All @@ -29,11 +34,11 @@ var (
// - 4: from the Bridge interface
// - 5: from the uplink interface
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, 0)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, 1)
FromLocalRegMark = binding.NewRegMark(PktSourceField, 2)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, 4)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, 5)
FromTunnelRegMark = binding.NewRegMark(PktSourceField, fromTunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, fromGatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, fromLocalVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, fromUplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, fromBridgeVal)
// reg0[16]: Mark to indicate the ofPort number of an interface is found.
OFPortFoundRegMark = binding.NewOneBitRegMark(0, 16, "OFPortFound")
// reg0[18]: Mark to indicate the packet needs DNAT to virtual IP.
Expand Down Expand Up @@ -139,17 +144,17 @@ var (

// Marks using CT.
var (
//TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got,
// so bit 0 of CT_MARK is not used for now.
//TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got. As a result,
// don't just use bit 0 of CT_MARK.

// Mark to indicate the connection is initiated through the host gateway interface
// (i.e. for which the first packet of the connection was received through the gateway).
FromGatewayCTMark = binding.NewCTMark(0b1, 1, 1)
// Mark to indicate DNAT is performed on the connection for Service.
ServiceCTMark = binding.NewCTMark(0b1, 2, 2)
FromGatewayCTMark = binding.NewCTMark(fromGatewayVal, 0, 3)
// Mark to indicate the connection is initiated through the host bridge interface
// (i.e. for which the first packet of the connection was received through the bridge).
FromBridgeCTMark = binding.NewCTMark(0x1, 3, 3)
FromBridgeCTMark = binding.NewCTMark(fromBridgeVal, 0, 3)
// Mark to indicate DNAT is performed on the connection for Service.
ServiceCTMark = binding.NewCTMark(0b1, 4, 4)
)

// Fields using CT label.
Expand Down
24 changes: 16 additions & 8 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -1337,14 +1337,16 @@ func (c *client) l3FwdServiceDefaultFlowsViaGW(ipProto binding.Protocol, categor
gatewayMAC := c.nodeConfig.GatewayConfig.MAC

flows := []binding.Flow{
// This flow is used to match the packets of Service traffic:
// - NodePort/LoadBalancer request packets which pass through Antrea gateway and the Service Endpoint is not on
// This generates the default flow to match Service packets, traffic cases include:
// - When Egress is not enabled:
// - Service request packets sourced from Antrea gateway and destined for external Endpoint.
// - Service request packets sourced from local Pods and destined for external Endpoint.
// local Pod CIDR or any remote Pod CIDRs.
// - ClusterIP request packets which are from Antrea gateway and the Service Endpoint is not on local Pod CIDR
// or any remote Pod CIDRs.
// - NodePort/LoadBalancer/ClusterIP response packets.
// The matched packets should leave through Antrea gateway, however, they also enter through Antrea gateway. This
// is hairpin traffic.
// - When Egress is enabled:
// - Service request packets sourced from Antrea gateway and destined for external Endpoint.
// Note that, when Egress is enabled, Service request packets sourced from local Pods and destined for external
// Endpoint should not be matched by the flow, and such packets should be matched by the flow with higher priority
// in L3ForwardingTable, which is installed by Egress.
// Skip traffic from AntreaFlexibleIPAM Pods.
L3ForwardingTable.BuildFlow(priorityLow).MatchProtocol(ipProto).
MatchCTMark(ServiceCTMark).
Expand Down Expand Up @@ -2159,10 +2161,13 @@ func (c *client) snatCommonFlows(nodeIP net.IP, localSubnet net.IPNet, localGate
// should bypass SNAT too. But it has been covered by the gatewayCT related flow generated in l3FwdFlowToGateway
// which forwards all reply traffic for such connections back to the gateway interface with the high priority.

// Send the traffic to external to SNATTable.
// This generates the flow to match the requests packets sourced from local Pods and destined for external, then
// forward the packets to SNATTable.
L3ForwardingTable.BuildFlow(priorityLow).
MatchProtocol(ipProto).
MatchRegMark(FromLocalRegMark).
MatchCTStateRpl(false).
MatchCTStateTrk(true).
Action().GotoTable(SNATTable.GetID()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
Expand All @@ -2171,6 +2176,8 @@ func (c *client) snatCommonFlows(nodeIP net.IP, localSubnet net.IPNet, localGate
L3ForwardingTable.BuildFlow(priorityLow).
MatchProtocol(ipProto).
MatchRegMark(FromTunnelRegMark).
MatchCTStateRpl(false).
MatchCTStateTrk(true).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(SNATTable.GetID()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Expand Down Expand Up @@ -2426,6 +2433,7 @@ func (c *client) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protoc
&binding.PortRange{StartPort: endpointPort, EndPort: endpointPort},
).
LoadToCtMark(ServiceCTMark).
MoveToCtMark(PktSourceField.GetNXFieldName(), PktSourceField.GetRange(), &binding.Range{0, 3}).
CTDone().
Done()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ovs/openflow/interfaces.go
Expand Up @@ -342,6 +342,7 @@ type CTAction interface {
LoadToCtMark(mark *CtMark) CTAction
LoadToLabelField(value uint64, labelField *CtLabel) CTAction
MoveToLabel(fromName string, fromRng, labelRng *Range) CTAction
MoveToCtMark(fromName string, fromRng, ctMarkRng *Range) CTAction
// NAT action translates the packet in the way that the connection was committed into the conntrack zone, e.g., if
// a connection was committed with SNAT, the later packets would be translated with the earlier SNAT configurations.
NAT() CTAction
Expand Down
8 changes: 8 additions & 0 deletions pkg/ovs/openflow/ofctrl_action.go
Expand Up @@ -117,6 +117,14 @@ func (a *ofCTAction) MoveToLabel(fromName string, fromRng, labelRng *Range) CTAc
return a
}

// MoveToCtMark is an action to move data into ct_mark.
func (a *ofCTAction) MoveToCtMark(fromName string, fromRng, ctMarkRng *Range) CTAction {
fromField, _ := openflow13.FindFieldHeaderByName(fromName, false)
toField, _ := openflow13.FindFieldHeaderByName(NxmFieldCtMark, false)
a.move(fromField, toField, uint16(fromRng.Length()), uint16(fromRng[0]), uint16(ctMarkRng[0]))
return a
}

func (a *ofCTAction) move(fromField *openflow13.MatchField, toField *openflow13.MatchField, nBits, fromStart, toStart uint16) {
action := openflow13.NewNXActionRegMove(nBits, fromStart, toStart, fromField, toField)
a.actions = append(a.actions, action)
Expand Down
16 changes: 15 additions & 1 deletion pkg/ovs/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions test/e2e/framework.go
Expand Up @@ -1521,6 +1521,11 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity,
return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

// createNginxNodePortService creates a NodePort nginx service with the given name.
func (data *TestData) createNginxNodePortService(serviceName string, affinity, nodeLocalExternal bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) {
return data.createService(serviceName, testNamespace, 80, 80, map[string]string{"app": "nginx"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) {
svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions test/e2e/proxy_test.go
Expand Up @@ -27,6 +27,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"antrea.io/antrea/pkg/agent/config"
agentconfig "antrea.io/antrea/pkg/config/agent"
controllerconfig "antrea.io/antrea/pkg/config/controller"
"antrea.io/antrea/pkg/features"
)

Expand Down Expand Up @@ -381,6 +384,87 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca
})
}

func TestNodePortAndEgressWithTheSameBackendPod(t *testing.T) {
skipIfHasWindowsNodes(t)
skipIfNotIPv4Cluster(t)
skipIfNumNodesLessThan(t, 2)
skipIfProxyDisabled(t)

data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
skipIfProxyAllDisabled(t, data)
skipIfEncapModeIsNot(t, data, config.TrafficEncapModeEncap) // Egress works for encap mode only.

cc := func(config *controllerconfig.ControllerConfig) {
config.FeatureGates["Egress"] = true
}
ac := func(config *agentconfig.AgentConfig) {
config.FeatureGates["Egress"] = true
}

if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil {
t.Fatalf("Failed to enable Egress feature: %v", err)
}

// Create a NodePort Service.
nodePortIP := controlPlaneNodeIPv4()
ipProtocol := corev1.IPv4Protocol
var portStr string
nodePortSvc, err := data.createNginxNodePortService("test-nodeport-svc", true, false, &ipProtocol)
require.NoError(t, err)
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
portStr = fmt.Sprint(port.NodePort)
break
}
}
testNodePortURL := net.JoinHostPort(nodePortIP, portStr)

// Create an Egress whose external IP is on worker Node.
egressNodeIP := workerNodeIPv4(1)
egress := data.createEgress(t, "test-egress", nil, map[string]string{"app": "nginx"}, "", egressNodeIP)
defer data.crdClient.CrdV1alpha2().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{})

// Create the backend Pod on control plane Node.
backendPodName := "test-nodeport-egress-backend-pod"
require.NoError(t, data.createNginxPodOnNode(backendPodName, testNamespace, controlPlaneNodeName(), false))
defer deletePodWrapper(t, data, testNamespace, backendPodName)
if err := data.podWaitForRunning(defaultTimeout, backendPodName, testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", backendPodName)
}

// Create another netns to fake an external network on the host network Pod.
testPod := "test-client"
testNetns := "test-ns"
cmd := fmt.Sprintf(`ip netns add %[1]s && \
ip link add dev %[1]s-a type veth peer name %[1]s-b && \
ip link set dev %[1]s-a netns %[1]s && \
ip addr add %[3]s/%[4]d dev %[1]s-b && \
ip link set dev %[1]s-b up && \
ip netns exec %[1]s ip addr add %[2]s/%[4]d dev %[1]s-a && \
ip netns exec %[1]s ip link set dev %[1]s-a up && \
ip netns exec %[1]s ip route replace default via %[3]s && \
sleep 3600
`, testNetns, "1.1.1.1", "1.1.1.254", 24)
if err := data.createPodOnNode(testPod, testNamespace, controlPlaneNodeName(), agnhostImage, []string{"sh", "-c", cmd}, nil, nil, nil, true, func(pod *corev1.Pod) {
privileged := true
pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{Privileged: &privileged}
}); err != nil {
t.Fatalf("Failed to create client Pod: %v", err)
}
defer deletePodWrapper(t, data, testNamespace, testPod)
if err := data.podWaitForRunning(defaultTimeout, testPod, testNamespace); err != nil {
t.Fatalf("Error when waiting for Pod '%s' to be in the Running state", testPod)
}
// Connect to NodePort on control plane Node in the fake external network.
cmd = fmt.Sprintf("ip netns exec %s curl --connect-timeout 1 --retry 5 --retry-connrefused %s", testNetns, testNodePortURL)
_, _, err = data.runCommandFromPod(testNamespace, testPod, agnhostContainerName, []string{"sh", "-c", cmd})
require.NoError(t, err, "Service NodePort should be able to be connected from external network when Egress is enabled")
}

func createAgnhostPod(t *testing.T, data *TestData, podName string, node string, hostNetwork bool) {
args := []string{"netexec", "--http-port=8080"}
ports := []corev1.ContainerPort{
Expand Down
14 changes: 7 additions & 7 deletions test/integration/agent/openflow_test.go
Expand Up @@ -740,7 +740,7 @@ func expectedProxyServiceGroupAndFlows(gid uint32, svc svcConfig, endpointList [
unionVal := (0b010 << 16) + uint32(epPort)
epDNATFlows.flows = append(epDNATFlows.flows, &ofTestUtils.ExpectFlow{
MatchStr: fmt.Sprintf("priority=200,%s,reg3=%s,reg4=0x%x/0x7ffff", string(svc.protocol), epIP, unionVal),
ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(load:0x1->NXM_NX_CT_MARK[2])", ep.IP(), epPort),
ActStr: fmt.Sprintf("ct(commit,table=EgressRule,zone=65520,nat(dst=%s:%d),exec(load:0x1->NXM_NX_CT_MARK[4],move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3])", ep.IP(), epPort),
})

if ep.GetIsLocal() {
Expand Down Expand Up @@ -1223,11 +1223,11 @@ func prepareGatewayFlows(gwIPs []net.IP, gwMAC net.HardwareAddr, vMAC net.Hardwa
"L3Forwarding",
[]*ofTestUtils.ExpectFlow{
{
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x2/0x2,%s,reg0=0x2/0xf", ipProtoStr),
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x1/0xf,%s,reg0=0x2/0xf", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:L2Forwarding", gwMAC.String()),
},
{
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x2/0x2,%s,reg0=0/0xf", ipProtoStr),
MatchStr: fmt.Sprintf("priority=210,ct_state=+rpl+trk,ct_mark=0x1/0xf,%s,reg0=0/0xf", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:L2Forwarding", gwMAC.String()),
},
},
Expand Down Expand Up @@ -1346,7 +1346,7 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ip,reg0=0x1/0xf", ActStr: "ct(commit,table=HairpinSNAT,zone=65520,exec(load:0x1->NXM_NX_CT_MARK[1])"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ip,reg0=0x1/0xf", ActStr: "ct(commit,table=HairpinSNAT,zone=65520,exec(load:0x1->NXM_NX_CT_MARK[0..3])"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+new+trk,ip", ActStr: "ct(commit,table=HairpinSNAT,zone=65520)"},
)
table72Flows.flows = append(table72Flows.flows,
Expand All @@ -1362,7 +1362,7 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows {
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"},
)
table105Flows.flows = append(table105Flows.flows,
&ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ipv6,reg0=0x1/0xf", ActStr: "ct(commit,table=HairpinSNAT,zone=65510,exec(load:0x1->NXM_NX_CT_MARK[1])"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ipv6,reg0=0x1/0xf", ActStr: "ct(commit,table=HairpinSNAT,zone=65510,exec(load:0x1->NXM_NX_CT_MARK[0..3])"},
&ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+new+trk,ipv6", ActStr: "ct(commit,table=HairpinSNAT,zone=65510)"},
)
table72Flows.flows = append(table72Flows.flows,
Expand Down Expand Up @@ -1481,11 +1481,11 @@ func expectedExternalFlows(nodeIP net.IP, localSubnet *net.IPNet, gwMAC net.Hard
ActStr: "goto_table:L2Forwarding",
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg0=0x2/0xf", ipProtoStr),
MatchStr: fmt.Sprintf("priority=190,ct_state=-rpl+trk,%s,reg0=0x2/0xf", ipProtoStr),
ActStr: "goto_table:SNAT",
},
{
MatchStr: fmt.Sprintf("priority=190,%s,reg0=0/0xf", ipProtoStr),
MatchStr: fmt.Sprintf("priority=190,ct_state=-rpl+trk,%s,reg0=0/0xf", ipProtoStr),
ActStr: fmt.Sprintf("set_field:%s->eth_dst,goto_table:SNAT", gwMAC.String()),
},
},
Expand Down

0 comments on commit 5317506

Please sign in to comment.