Skip to content

Commit

Permalink
Fix inability to access NodePort in particular case
Browse files Browse the repository at this point in the history
When a Service NodePort and an Egress CRD has the same backend Pod, accessing
to the NodePort Service may fail in particular cases. Assume that the backend
Pod is on Node A and the Egress's external IP is on Node B. If an external
client (not any K8s Node) accesses the NodePort through IP of Node A where
the backend Pod is running, the access will fail. The root cause is that the
reply packets of NodePort is incorrectly matched by the flow installed by Egress
which is used to match the packets sourced from local Pods and destined for
tunneling to Node B. This PR fixes the issue by loading NXM_NX_REG0[0..3]
(PktSourceField, field to mark packet source) to NXM_NX_CT_MARK[0..3] when Service
connection is committed, then the reply packets of Service connection sourced
from Antrea gateway can be matched by NXM_NX_CT_MARK[0..3] and forced back to
Antrea gateway.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 3, 2022
1 parent 57ef15c commit a767cfe
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 68 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/flowexporter/connections/conntrack_linux_test.go
Expand Up @@ -140,7 +140,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
// Set expect call for mock ovsCtlClient
ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=4,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=16,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
outputFlow := strings.Split(string(ovsctlCmdOutput), "\n")
expConn := &flowexporter.Connection{
ID: 982464968,
Expand Down
39 changes: 23 additions & 16 deletions pkg/agent/openflow/fields.go
Expand Up @@ -21,6 +21,12 @@ import (

// Fields using reg.
var (
fromTunnelVal = uint32(0)
fromGatewayVal = uint32(1)
fromLocalVal = uint32(2)
fromUplinkVal = uint32(4)
fromBridgeVal = uint32(5)

// reg0 (NXM_NX_REG0)
// reg0[0..3]: Field to mark the packet source. Marks in this field include,
// - 0: from the tunnel port
Expand All @@ -29,11 +35,11 @@ var (
// - 4: from the Bridge interface
// - 5: from the uplink interface
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, 0)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, 1)
FromLocalRegMark = binding.NewRegMark(PktSourceField, 2)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, 4)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, 5)
FromTunnelRegMark = binding.NewRegMark(PktSourceField, fromTunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, fromGatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, fromLocalVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, fromUplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, fromBridgeVal)
// reg0[16]: Mark to indicate the ofPort number of an interface is found.
OFPortFoundRegMark = binding.NewOneBitRegMark(0, 16, "OFPortFound")
// reg0[18]: Mark to indicate the packet needs DNAT to virtual IP.
Expand Down Expand Up @@ -139,17 +145,18 @@ var (

// Marks using CT.
var (
//TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got,
// so bit 0 of CT_MARK is not used for now.

// Mark to indicate the connection is initiated through the host gateway interface
// (i.e. for which the first packet of the connection was received through the gateway).
FromGatewayCTMark = binding.NewCTMark(0b1, 1, 1)
// Mark to indicate DNAT is performed on the connection for Service.
ServiceCTMark = binding.NewCTMark(0b1, 2, 2)
// Mark to indicate the connection is initiated through the host bridge interface
// (i.e. for which the first packet of the connection was received through the bridge).
FromBridgeCTMark = binding.NewCTMark(0x1, 3, 3)
//TODO: There is a bug in libOpenflow when CT_MARK range is from 0 to 0, and a wrong mask will be got. As a result,
// don't just use bit 0 of CT_MARK.

// CTMark (NXM_NX_CT_MARK)
// CTMark[0..3]: Field to mark the source of the connection. This field has the same bits and positions as PktSourceField
// for persisting the value from reg0 to CTMark when committing the first packet of the connection with CT action.
ConnSourceCTMarkField = binding.NewCTMarkField(0, 3, "ConnSourceCTMark")
FromGatewayCTMark = binding.NewCTMark(ConnSourceCTMarkField, fromGatewayVal)
FromBridgeCTMark = binding.NewCTMark(ConnSourceCTMarkField, fromBridgeVal)

// CTMark[4]: Mark to indicate DNAT is performed on the connection for Service.
ServiceCTMark = binding.NewOneBitCTMark(4, "ServiceCTMark")
)

// Fields using CT label.
Expand Down
42 changes: 12 additions & 30 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -594,7 +594,7 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
// This flow is used to match the Service traffic from Antrea gateway. The Service traffic from gateway
// should enter table serviceConntrackCommitTable, otherwise it will be matched by other flows in
// table connectionTrackCommit.
ConntrackCommitTable.BuildFlow(priorityHigh).MatchProtocol(proto).
ConntrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchCTMark(ServiceCTMark).
MatchRegMark(FromGatewayRegMark).
Action().GotoTable(ServiceConntrackCommitTable.GetID()).
Expand Down Expand Up @@ -685,31 +685,6 @@ func (c *client) connectionTrackFlows(category cookie.Category) []binding.Flow {
flows = append(flows, c.kubeProxyFlows(category)...)
}

// TODO: following flows should move to function "kubeProxyFlows". Since another PR(#1198) is trying
// to polish the relevant logic, code refactoring is needed after that PR is merged.
for _, proto := range c.ipProtocols {
ctZone := CtZone
if proto == binding.ProtocolIPv6 {
ctZone = CtZoneV6
}
flows = append(flows,
// Connections initiated through the gateway are marked with FromGatewayCTMark.
ConntrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchRegMark(FromGatewayRegMark).
MatchCTStateNew(true).MatchCTStateTrk(true).
Action().CT(true, ConntrackCommitTable.GetNext(), ctZone).LoadToCtMark(FromGatewayCTMark).CTDone().
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
// Connections initiated through the bridge port are marked with FromBridgeCTMark.
ConntrackCommitTable.BuildFlow(priorityNormal).MatchProtocol(proto).
MatchRegMark(FromBridgeRegMark).
MatchCTStateNew(true).MatchCTStateTrk(true).
Action().CT(true, ConntrackCommitTable.GetNext(), ctZone).LoadToCtMark(FromBridgeCTMark).CTDone().
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
// Add reject response packet bypass flow.
)
}
return flows
}

Expand Down Expand Up @@ -758,7 +733,9 @@ func (c *client) conntrackBasicFlows(category cookie.Category) []binding.Flow {
Done(),
ConntrackCommitTable.BuildFlow(priorityLow).MatchProtocol(proto).
MatchCTStateNew(true).MatchCTStateTrk(true).
Action().CT(true, ConntrackCommitTable.GetNext(), ctZone).CTDone().
Action().CT(true, ConntrackCommitTable.GetNext(), ctZone).
MoveToCtMarkField(PktSourceField, ConnSourceCTMarkField).
CTDone().
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
)
Expand Down Expand Up @@ -1341,8 +1318,7 @@ func (c *client) l3FwdServiceDefaultFlowsViaGW(ipProto binding.Protocol, categor
// - NodePort/LoadBalancer request packets which pass through Antrea gateway and the Service Endpoint is not on
// local Pod CIDR or any remote Pod CIDRs.
// - ClusterIP request packets which are from Antrea gateway and the Service Endpoint is not on local Pod CIDR
// or any remote Pod CIDRs.
// - NodePort/LoadBalancer/ClusterIP response packets.
// or any remote Pod CIDRs.
// The matched packets should leave through Antrea gateway, however, they also enter through Antrea gateway. This
// is hairpin traffic.
// Skip traffic from AntreaFlexibleIPAM Pods.
Expand Down Expand Up @@ -2159,10 +2135,13 @@ func (c *client) snatCommonFlows(nodeIP net.IP, localSubnet net.IPNet, localGate
// should bypass SNAT too. But it has been covered by the gatewayCT related flow generated in l3FwdFlowToGateway
// which forwards all reply traffic for such connections back to the gateway interface with the high priority.

// Send the traffic to external to SNATTable.
// This generates the flow to match the requests packets sourced from local Pods and destined for external, then
// forward the packets to SNATTable.
L3ForwardingTable.BuildFlow(priorityLow).
MatchProtocol(ipProto).
MatchRegMark(FromLocalRegMark).
MatchCTStateRpl(false).
MatchCTStateTrk(true).
Action().GotoTable(SNATTable.GetID()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
Expand All @@ -2171,6 +2150,8 @@ func (c *client) snatCommonFlows(nodeIP net.IP, localSubnet net.IPNet, localGate
L3ForwardingTable.BuildFlow(priorityLow).
MatchProtocol(ipProto).
MatchRegMark(FromTunnelRegMark).
MatchCTStateRpl(false).
MatchCTStateTrk(true).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(SNATTable.GetID()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Expand Down Expand Up @@ -2426,6 +2407,7 @@ func (c *client) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protoc
&binding.PortRange{StartPort: endpointPort, EndPort: endpointPort},
).
LoadToCtMark(ServiceCTMark).
MoveToCtMarkField(PktSourceField, ConnSourceCTMarkField).
CTDone().
Done()
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/ovs/openflow/interfaces.go
Expand Up @@ -342,6 +342,7 @@ type CTAction interface {
LoadToCtMark(mark *CtMark) CTAction
LoadToLabelField(value uint64, labelField *CtLabel) CTAction
MoveToLabel(fromName string, fromRng, labelRng *Range) CTAction
MoveToCtMarkField(fromRegField *RegField, ctMark *CtMarkField) CTAction
// NAT action translates the packet in the way that the connection was committed into the conntrack zone, e.g., if
// a connection was committed with SNAT, the later packets would be translated with the earlier SNAT configurations.
NAT() CTAction
Expand Down Expand Up @@ -442,9 +443,16 @@ type RegMark struct {
// XXRegField specifies a xxreg with a required bit range.
type XXRegField RegField

// CtMarkField specifies a bit range of a CT mark. rng is the range of bits taken by the field. The OF client could use a
// CtMarkField to cache or match varied value.
type CtMarkField struct {
rng *Range
name string
}

// CtMark is used to indicate the connection characteristics.
type CtMark struct {
rng *Range
field *CtMarkField
value uint32
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/ovs/openflow/ofctrl_action.go
Expand Up @@ -94,7 +94,7 @@ func (a *ofCTAction) LoadToMark(value uint32) CTAction {

func (a *ofCTAction) LoadToCtMark(mark *CtMark) CTAction {
field, _, _ := getFieldRange(NxmFieldCtMark)
a.load(field, uint64(mark.value), mark.rng)
a.load(field, uint64(mark.value), mark.field.rng)
return a
}

Expand All @@ -117,6 +117,14 @@ func (a *ofCTAction) MoveToLabel(fromName string, fromRng, labelRng *Range) CTAc
return a
}

// MoveToCtMarkField is an action to move data into ct_mark.
func (a *ofCTAction) MoveToCtMarkField(fromRegField *RegField, ctMarkField *CtMarkField) CTAction {
fromField, _ := openflow13.FindFieldHeaderByName(fromRegField.GetNXFieldName(), false)
toField, _ := openflow13.FindFieldHeaderByName(NxmFieldCtMark, false)
a.move(fromField, toField, uint16(fromRegField.GetRange().Length()), uint16(fromRegField.GetRange()[0]), uint16(ctMarkField.rng[0]))
return a
}

func (a *ofCTAction) move(fromField *openflow13.MatchField, toField *openflow13.MatchField, nBits, fromStart, toStart uint16) {
action := openflow13.NewNXActionRegMove(nBits, fromStart, toStart, fromField, toField)
a.actions = append(a.actions, action)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/openflow/ofctrl_builder.go
Expand Up @@ -237,7 +237,7 @@ func (b *ofFlowBuilder) MatchCTMark(mark *CtMark) FlowBuilder {
b.ofFlow.Match.CtMarkMask = nil
ctmarkKey = fmt.Sprintf("ct_mark=0x%x", mark.value)
} else {
mask := mark.rng.ToNXRange().ToUint32Mask()
mask := mark.field.rng.ToNXRange().ToUint32Mask()
ctmarkKey = fmt.Sprintf("ct_mark=0x%x/0x%x", mark.GetValue(), mask)
b.ofFlow.Match.CtMarkMask = &mask
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ovs/openflow/ofctrl_flow_test.go
Expand Up @@ -31,7 +31,8 @@ func TestCopyToBuilder(t *testing.T) {
id: t0,
next: t1,
}
mark := NewCTMark(12345, 0, 31)

mark := NewCTMark(NewCTMarkField(0, 31, "mark"), 12345)
oriFlow := table.BuildFlow(uint16(100)).MatchProtocol(ProtocolIP).
Cookie(uint64(1004)).
MatchRegFieldWithValue(testField, 0x101).
Expand Down
24 changes: 19 additions & 5 deletions pkg/ovs/openflow/ofctrl_nxfields.go
Expand Up @@ -83,21 +83,35 @@ func NewXXRegField(id int, start, end uint32) *XXRegField {
}

func (m *CtMark) GetRange() *Range {
return m.rng
return m.field.rng
}

// GetValue gets CT mark value with offset since CT mark is used by bit. E.g, CT_MARK_REG[3]==1, the return
// value of this function is 0b1000.
func (m *CtMark) GetValue() uint32 {
return m.value << m.rng.Offset()
return m.value << m.field.rng.Offset()
}

func (m *CtMark) isFullRange() bool {
return m.rng.Length() == 32
return m.field.rng.Length() == 32
}

func NewCTMark(value uint32, start, end uint32) *CtMark {
return &CtMark{value: value, rng: &Range{start, end}}
func NewCTMarkField(start, end uint32, name string) *CtMarkField {
return &CtMarkField{rng: &Range{start, end}, name: name}
}

func NewOneBitCTMark(bit uint32, name string) *CtMark {
field := NewCTMarkField(bit, bit, name)
return &CtMark{value: 1, field: field}
}

func NewOneBitZeroCTMark(bit uint32, name string) *CtMark {
field := NewCTMarkField(bit, bit, name)
return &CtMark{value: 0, field: field}
}

func NewCTMark(field *CtMarkField, value uint32) *CtMark {
return &CtMark{value: value, field: field}
}

func NewCTLabel(start, end uint32, name string) *CtLabel {
Expand Down
16 changes: 15 additions & 1 deletion pkg/ovs/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions test/e2e/framework.go
Expand Up @@ -1521,6 +1521,11 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity,
return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

// createNginxNodePortService creates a NodePort nginx service with the given name.
func (data *TestData) createNginxNodePortService(serviceName string, affinity, nodeLocalExternal bool, ipFamily *corev1.IPFamily) (*corev1.Service, error) {
return data.createService(serviceName, testNamespace, 80, 80, map[string]string{"app": "nginx"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) {
svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
Expand Down

0 comments on commit a767cfe

Please sign in to comment.