From 1333d6693779bff45469f3f9f63068ab492e99c1 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Fri, 1 Apr 2022 11:54:37 +0800 Subject: [PATCH] Implement traffic control interfaces via OVS Signed-off-by: Hongliang Liu --- cmd/antrea-agent/agent.go | 5 +- go.mod | 5 +- go.sum | 4 +- pkg/agent/openflow/client.go | 50 +++++++++- pkg/agent/openflow/client_test.go | 14 +-- pkg/agent/openflow/cookie/allocator.go | 1 + pkg/agent/openflow/fields.go | 34 ++++--- pkg/agent/openflow/framework.go | 7 ++ pkg/agent/openflow/network_policy_test.go | 2 +- pkg/agent/openflow/pipeline.go | 84 +++++++++++++--- pkg/agent/openflow/pod_connectivity.go | 103 +++++++++++++++++++- pkg/agent/openflow/service.go | 5 +- pkg/agent/openflow/testing/mock_openflow.go | 57 +++++++++++ pkg/apis/crd/v1alpha2/types.go | 16 +++ test/integration/agent/openflow_test.go | 30 +++--- 15 files changed, 357 insertions(+), 60 deletions(-) diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 1741a0774f..0e5c1ed5f6 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -130,7 +130,10 @@ func run(o *Options) error { features.DefaultFeatureGate.Enabled(features.FlowExporter), o.config.AntreaProxy.ProxyAll, connectUplinkToBridge, - features.DefaultFeatureGate.Enabled(features.Multicast)) + features.DefaultFeatureGate.Enabled(features.Multicast), + // TODO: replace with features.DefaultFeatureGate.Enabled(features.TrafficControl), + false, + ) _, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR) var serviceCIDRNetv6 *net.IPNet diff --git a/go.mod b/go.mod index ad8d828932..e49e8a01c9 100644 --- a/go.mod +++ b/go.mod @@ -177,4 +177,7 @@ require ( // Newer version of github.com/googleapis/gnostic make use of newer gopkg.in/yaml(v3), which conflicts with // explicit imports of gopkg.in/yaml.v2. -replace github.com/googleapis/gnostic v0.5.5 => github.com/googleapis/gnostic v0.4.1 +replace ( + antrea.io/ofnet v0.5.5 => github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5 + github.com/googleapis/gnostic v0.5.5 => github.com/googleapis/gnostic v0.4.1 +) diff --git a/go.sum b/go.sum index a73e293351..ec4651e96e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ antrea.io/libOpenflow v0.6.2 h1:1JMSJ7Lp7yOhKybHey9VDtRI6JuIgkhUWJBX5GIFY9I= antrea.io/libOpenflow v0.6.2/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o= -antrea.io/ofnet v0.5.5 h1:4CLYWqQE4/XyuIUaTSqB83Zj+YnuplrlEXPvVm/r0JE= -antrea.io/ofnet v0.5.5/go.mod h1:8TJVF6MLe9/gZ/KbhGUvULs9/TxssepEaYEe+o1SEgs= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= @@ -767,6 +765,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3C github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vmware/go-ipfix v0.5.12 h1:mqQknlvnvDY25apPNy9c27ri3FMDFIhzvO68Kk5Qp58= github.com/vmware/go-ipfix v0.5.12/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= +github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5 h1:mx94DJfWe5LkwkaIRLpBL/eSILpvs2WvuG38fKOTMF0= +github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5/go.mod h1:8TJVF6MLe9/gZ/KbhGUvULs9/TxssepEaYEe+o1SEgs= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index bb852c62cb..7fef29a176 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -28,6 +28,7 @@ import ( "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" + "antrea.io/antrea/pkg/apis/crd/v1alpha2" binding "antrea.io/antrea/pkg/ovs/openflow" utilip "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/third_party/proxy" @@ -279,6 +280,19 @@ type Client interface { dstIP net.IP, outPort uint32, igmp ofutil.Message) error + + // InstallTrafficControlMarkFlows installs the flows to mark the packets for a traffic control rule with the traffic control + // mark value. + InstallTrafficControlMarkFlows(name string, mark uint32, srcOfPorts []uint32, direction v1alpha2.Direction) error + + // UninstallTrafficControlMarkFlows removes the flows for a traffic control rule. + UninstallTrafficControlMarkFlows(name string) error + + // InstallTrafficControlInitFlows installs the initial flows for the traffic control mark value. + InstallTrafficControlInitFlows(mark uint32, action v1alpha2.Action, dstOfInPort, dstOfOutPort uint32) error + + // UninstallTrafficControlInitFlows removes the flows for the traffic control mark value. + UninstallTrafficControlInitFlows(mark uint8) error } // GetFlowTableStatus returns an array of flow table status. @@ -697,7 +711,8 @@ func (c *client) generatePipelines() { c.networkConfig, c.ovsDatapathType, c.connectUplinkToBridge, - c.enableMulticast) + c.enableMulticast, + c.enableTrafficControl) c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity) c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity) @@ -718,7 +733,8 @@ func (c *client) generatePipelines() { c.bridge, c.enableProxy, c.proxyAll, - c.connectUplinkToBridge) + c.connectUplinkToBridge, + c.enableTrafficControl) c.activatedFeatures = append(c.activatedFeatures, c.featureService) c.traceableFeatures = append(c.traceableFeatures, c.featureService) @@ -1128,3 +1144,33 @@ func (c *client) SendIGMPQueryPacketOut( packetOutObj := packetOutBuilder.Done() return c.bridge.SendPacketOut(packetOutObj) } + +func (c *client) InstallTrafficControlMarkFlows(name string, mark uint32, srcOfPorts []uint32, direction v1alpha2.Direction) error { + flows := c.featurePodConnectivity.trafficControlMarkFlows(mark, srcOfPorts, direction) + cacheKey := fmt.Sprintf("trafficControl_%s", name) + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows) +} + +func (c *client) UninstallTrafficControlMarkFlows(name string) error { + cacheKey := fmt.Sprintf("trafficControl_%s", name) + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) +} + +func (c *client) InstallTrafficControlInitFlows(mark uint32, action v1alpha2.Action, dstOfInPort, dstOfOutPort uint32) error { + cacheKey := fmt.Sprintf("trafficControl_%d", mark) + flows := c.featurePodConnectivity.trafficControlInitFlows(mark, action, dstOfInPort, dstOfOutPort) + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows) +} + +func (c *client) UninstallTrafficControlInitFlows(mark uint8) error { + cacheKey := fmt.Sprintf("trafficControl_%d", mark) + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey) +} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index ec11a49af1..3ee73085ba 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -107,7 +107,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -140,7 +140,7 @@ func TestIdempotentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -186,7 +186,7 @@ func TestFlowInstallationFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -225,7 +225,7 @@ func TestConcurrentFlowInstallation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() m := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false) client := ofClient.(*client) client.cookieAllocator = cookie.NewAllocator(0) client.ofEntryOperations = m @@ -417,7 +417,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) { } func prepareTraceflowFlow(ctrl *gomock.Controller) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false) c := ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.nodeConfig = nodeConfig @@ -443,7 +443,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { } func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false) c := ofClient.(*client) c.nodeConfig = nodeConfig m := ovsoftest.NewMockBridge(ctrl) @@ -531,7 +531,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false) c := ofClient.(*client) m := ovsoftest.NewMockBridge(ctrl) c.bridge = m diff --git a/pkg/agent/openflow/cookie/allocator.go b/pkg/agent/openflow/cookie/allocator.go index 72df44b35d..21aa02eb0c 100644 --- a/pkg/agent/openflow/cookie/allocator.go +++ b/pkg/agent/openflow/cookie/allocator.go @@ -37,6 +37,7 @@ const ( Egress Multicast Traceflow + TrafficControl ) func (c Category) String() string { diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index f6e756d5f5..2159444d7f 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -21,11 +21,12 @@ import ( // Fields using reg. var ( - tunnelVal = uint32(1) - gatewayVal = uint32(2) - localVal = uint32(3) - uplinkVal = uint32(4) - bridgeVal = uint32(5) + tunnelVal = uint32(1) + gatewayVal = uint32(2) + localVal = uint32(3) + uplinkVal = uint32(4) + bridgeVal = uint32(5) + redirectVal = uint32(6) // reg0 (NXM_NX_REG0) // reg0[0..3]: Field to store the packet source. Marks in this field include: @@ -34,12 +35,14 @@ var ( // - 3: from local Pods. // - 4: from uplink port. // - 5: from bridge local port. - PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource") - FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal) - FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal) - FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal) - FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal) - FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal) + // - 6: from traffic control redirecting out port. + PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource") + FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal) + FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal) + FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal) + FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal) + FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal) + FromRedirectRegMark = binding.NewRegMark(PktSourceField, redirectVal) // reg0[4..7]: Field to store the packet destination. Marks in this field include: // - 1: to tunnel port. // - 2: to Antrea gateway port. @@ -82,6 +85,8 @@ var ( CustomReasonDenyRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDeny) CustomReasonDNSRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDNS) CustomReasonIGMPRegMark = binding.NewRegMark(CustomReasonField, CustomReasonIGMP) + // reg0[17..18]: Field to store the mark value for the packets that need to be mirrored or redirected. + TrafficControlField = binding.NewRegField(0, 18, 21, "TrafficControl") // reg1(NXM_NX_REG1) // Field to cache the ofPort of the OVS interface where to output packet. @@ -175,7 +180,8 @@ var ( // CTMark[4]: Mark to indicate DNAT is performed on the connection for Service. // This CT mark is used in CtZone / CtZoneV6 and SNATCtZone / SNATCtZoneV6. - ServiceCTMark = binding.NewOneBitCTMark(4) + ServiceCTMark = binding.NewOneBitCTMark(4) + NotServiceCTMark = binding.NewOneBitZeroCTMark(4) // CTMark[5]: Mark to indicate SNAT should be performed on the connection for Service. // This CT mark is only used in CtZone / CtZoneV6. @@ -184,6 +190,10 @@ var ( // CTMark[6]: Mark to indicate the connection is hairpin. // This CT mark is used in CtZone / CtZoneV6 and SNATCtZone / SNATCtZoneV6. HairpinCTMark = binding.NewOneBitCTMark(6) + + // CTMark[6]: Field to store the mark value for connections that need to be mirrored or redirected. + // This CT mark is only used in CtZone / CtZoneV6. + ConnTrafficControlCTMarkField = binding.NewCTMarkField(7, 10) ) // Fields using CT label. diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index c01edea5ca..1cf78e8429 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -187,6 +187,13 @@ func (f *featurePodConnectivity) getRequiredTables() []*Table { } } } + if f.enableTrafficControl { + tables = append(tables, + EgressTrafficControlMarkTable, + TrafficControlRedirectOutTable, + IngressTrafficControlMarkTable, + ) + } return tables } diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index 967f234b2d..5fa0bbe4be 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -519,7 +519,7 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOperations := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false, false) c = ofClient.(*client) c.cookieAllocator = cookie.NewAllocator(0) c.ofEntryOperations = mockOperations diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 771c6b17b2..9932f88081 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -123,7 +123,8 @@ var ( // Tables of pipelineIP are declared below. // Tables in stageClassifier: - ClassifierTable = newTable("Classifier", stageClassifier, pipelineIP, defaultDrop) + ClassifierTable = newTable("Classifier", stageClassifier, pipelineIP, defaultDrop) + EgressTrafficControlMarkTable = newTable("EgressTrafficControlMark", stageClassifier, pipelineIP) // Tables in stageValidation: SpoofGuardTable = newTable("SpoofGuard", stageValidation, pipelineIP, defaultDrop) @@ -161,7 +162,9 @@ var ( SNATConntrackCommitTable = newTable("SNATConntrackCommit", stagePostRouting, pipelineIP) // Tables in stageSwitching: - L2ForwardingCalcTable = newTable("L2ForwardingCalc", stageSwitching, pipelineIP) + L2ForwardingCalcTable = newTable("L2ForwardingCalc", stageSwitching, pipelineIP) + TrafficControlRedirectOutTable = newTable("TrafficControlRedirectOut", stageSwitching, pipelineIP) + IngressTrafficControlMarkTable = newTable("IngressTrafficControlMark", stageSwitching, pipelineIP) // Tables in stageIngressSecurity: IngressSecurityClassifierTable = newTable("IngressSecurityClassifier", stageIngressSecurity, pipelineIP) @@ -390,6 +393,7 @@ type client struct { enableDenyTracking bool enableEgress bool enableMulticast bool + enableTrafficControl bool connectUplinkToBridge bool roundInfo types.RoundInfo cookieAllocator cookie.Allocator @@ -602,7 +606,7 @@ func (f *featurePodConnectivity) podClassifierFlow(podOFPort uint32, isAntreaFle Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchInPort(podOFPort). Action().LoadRegMarks(regMarksToLoad...). - Action().GotoStage(stageValidation). + Action().NextTable(). Done() } @@ -657,8 +661,16 @@ func (f *featurePodConnectivity) podUplinkClassifierFlows(dstMAC net.HardwareAdd func (f *featurePodConnectivity) conntrackFlows() []binding.Flow { cookieID := f.cookieAllocator.Request(f.category).Raw() var flows []binding.Flow - regFieldToCtMarkFieldList := []binding.RegFieldToCtMarkField{{SrcRegField: PktSourceField, DstCTMarkField: ConnSourceCTMarkField}} for _, ipProtocol := range f.ipProtocols { + // This pair is used to mark the source of a connection. + pktSourceField := binding.RegFieldToCtMarkField{SrcRegField: TrafficControlField, DstCTMarkField: ConnTrafficControlCTMarkField} + // This pair is used to indicate that whether the connection requires traffic control. + trafficControlField := binding.RegFieldToCtMarkField{SrcRegField: TrafficControlField, DstCTMarkField: ConnTrafficControlCTMarkField} + regFieldToCtMarkFieldList := []binding.RegFieldToCtMarkField{pktSourceField} + if f.enableTrafficControl { + regFieldToCtMarkFieldList = append(regFieldToCtMarkFieldList, trafficControlField) + } + flows = append(flows, // This generates the flow to transform the destination IP of request packets or source IP of reply packets // from tracked connections in CT zone. @@ -688,17 +700,57 @@ func (f *featurePodConnectivity) conntrackFlows() []binding.Flow { Action().Drop(). Done(), // This generates the flow to match the first packet of non-Service connection and mark the source of the connection - // by copying PktSourceField to ConnSourceCTMarkField. + // by copying SrcRegField to DstCTMarkField of every FieldPair. ConntrackCommitTable.ofTable.BuildFlow(priorityNormal). Cookie(cookieID). MatchProtocol(ipProtocol). MatchCTStateNew(true). MatchCTStateTrk(true). + MatchCTMark(NotServiceCTMark). // TODO: this cannot be realized on OVS due to issue https://github.com/antrea-io/antrea/issues/3583 Action().CT(true, ConntrackCommitTable.GetNext(), f.ctZones[ipProtocol], f.ctZoneSrcField). MoveToCtMarkFields(regFieldToCtMarkFieldList...). CTDone(). Done(), ) + // This generates the flow to match the first packet of Service connection that requires redirecting or mirroring. + // Note that, SNATed connection (hairpin or Service whose externalTrafficPolicy is Cluster) is not supported. + if f.enableTrafficControl { + flows = append(flows, + // This generates the flow to bypass the first packet of SNATed Service connection. Note that, traffic control + // is not supported for SNATed Service connection. + ConntrackCommitTable.ofTable.BuildFlow(priorityHigh+2). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchCTMarks(ServiceCTMark, ConnSNATCTMark). + Action().NextTable(). + Done(), + // This generates the flow to bypass the first packet of Service connection without traffic control reg mark. + ConntrackCommitTable.ofTable.BuildFlow(priorityHigh+2). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchCTMarks(ServiceCTMark). + MatchRegFieldWithValue(TrafficControlField, 0). + Action().NextTable(). + Done(), + // This generates the flow to match the ingress packets of Service connection that require traffic control. + // Note that, the egress packets that require traffic control is committed before this flow and value of + // ConnTrafficControlCTMarkField has been written. + ConntrackCommitTable.ofTable.BuildFlow(priorityHigh+1). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchCTStateNew(true). + MatchCTStateTrk(true). + MatchCTMarks(ServiceCTMark, binding.NewCTMark(ConnTrafficControlCTMarkField, 0)). + Action().CT(true, ConntrackCommitTable.GetNext(), f.ctZones[ipProtocol], f.ctZoneSrcField). + MoveToCtMarkFields(trafficControlField). + CTDone(). + Done(), + ) + } } // This generates default flow to match the first packet of a new connection and forward it to stagePreRouting. flows = append(flows, ConntrackStateTable.ofTable.BuildFlow(priorityMiss). @@ -726,14 +778,6 @@ func (f *featureService) conntrackFlows() []binding.Flow { Action().LoadRegMark(RewriteMACRegMark). Action().GotoStage(stageEgressSecurity). Done(), - // This generates the flow to avoid committing Service connections (with ServiceCTMark) another time. They - // have been committed in EndpointDNATTable, using the same CT zone. - ConntrackCommitTable.ofTable.BuildFlow(priorityHigh). - Cookie(cookieID). - MatchProtocol(ipProtocol). - MatchCTMark(ServiceCTMark). - Action().GotoStage(stageOutput). - Done(), ) } return flows @@ -783,6 +827,7 @@ func (f *featureService) snatConntrackFlows() []binding.Flow { SNAT(&binding.IPRange{StartIP: f.virtualIPs[ipProtocol], EndIP: f.virtualIPs[ipProtocol]}, nil). LoadToCtMark(ServiceCTMark). LoadToCtMark(HairpinCTMark). + LoadToCtMark(ConnSNATCTMark). CTDone(). Done(), // This generates the flow to match the first packet of hairpin Service connection initiated through a Pod with @@ -799,6 +844,7 @@ func (f *featureService) snatConntrackFlows() []binding.Flow { SNAT(&binding.IPRange{StartIP: f.gatewayIPs[ipProtocol], EndIP: f.gatewayIPs[ipProtocol]}, nil). LoadToCtMark(ServiceCTMark). LoadToCtMark(HairpinCTMark). + LoadToCtMark(ConnSNATCTMark). CTDone(). Done(), // This generates the flow to match the first packet of NodePort / LoadBalancer connection (non-hairpin) initiated @@ -813,6 +859,7 @@ func (f *featureService) snatConntrackFlows() []binding.Flow { Action().CT(true, SNATConntrackCommitTable.GetNext(), f.snatCtZones[ipProtocol], nil). SNAT(&binding.IPRange{StartIP: f.gatewayIPs[ipProtocol], EndIP: f.gatewayIPs[ipProtocol]}, nil). LoadToCtMark(ServiceCTMark). + LoadToCtMark(ConnSNATCTMark). CTDone(). Done(), // This generates the flow to match the subsequent request packets of connection whose first request packet has @@ -2395,7 +2442,6 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16 Cookie(f.cookieAllocator.Request(f.category).Raw()). MatchRegFieldWithValue(EpUnionField, unionVal) ipProtocol := getIPProtocol(endpointIP) - regFieldToCtMarkFieldList := []binding.RegFieldToCtMarkField{{SrcRegField: PktSourceField, DstCTMarkField: ConnSourceCTMarkField}} if ipProtocol == binding.ProtocolIP { ipVal := binary.BigEndian.Uint32(endpointIP.To4()) @@ -2405,6 +2451,12 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16 flowBuilder = flowBuilder.MatchXXReg(EndpointIP6Field.GetRegID(), ipVal) } + // This pair is used to indicate the initiated location of a connection. + regFieldToCtMarkFieldList := []binding.RegFieldToCtMarkField{{SrcRegField: PktSourceField, DstCTMarkField: ConnSourceCTMarkField}} + if f.enableTrafficControl { + // This pair is used to indicate that whether the connection requires redirecting or mirroring. + regFieldToCtMarkFieldList = append(regFieldToCtMarkFieldList, binding.RegFieldToCtMarkField{SrcRegField: TrafficControlField, DstCTMarkField: ConnTrafficControlCTMarkField}) + } return flowBuilder.Action(). CT(true, EndpointDNATTable.GetNext(), f.dnatCtZones[ipProtocol], f.ctZoneSrcField). DNAT( @@ -2678,7 +2730,8 @@ func NewClient(bridgeName string, enableDenyTracking bool, proxyAll bool, connectUplinkToBridge bool, - enableMulticast bool) Client { + enableMulticast bool, + enableTrafficControl bool) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) c := &client{ bridge: bridge, @@ -2688,6 +2741,7 @@ func NewClient(bridgeName string, enableDenyTracking: enableDenyTracking, enableEgress: enableEgress, enableMulticast: enableMulticast, + enableTrafficControl: enableTrafficControl, connectUplinkToBridge: connectUplinkToBridge, pipelines: make(map[binding.PipelineID]binding.Pipeline), packetInHandlers: map[uint8]map[string]PacketInHandler{}, diff --git a/pkg/agent/openflow/pod_connectivity.go b/pkg/agent/openflow/pod_connectivity.go index d319edf94f..caff1e23b9 100644 --- a/pkg/agent/openflow/pod_connectivity.go +++ b/pkg/agent/openflow/pod_connectivity.go @@ -19,6 +19,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" + "antrea.io/antrea/pkg/apis/crd/v1alpha2" binding "antrea.io/antrea/pkg/ovs/openflow" "antrea.io/antrea/pkg/ovs/ovsconfig" "antrea.io/antrea/pkg/util/runtime" @@ -30,6 +31,7 @@ type featurePodConnectivity struct { nodeCachedFlows *flowCategoryCache podCachedFlows *flowCategoryCache + tcCachedFlows *flowCategoryCache gatewayIPs map[binding.Protocol]net.IP ctZones map[binding.Protocol]int @@ -44,6 +46,7 @@ type featurePodConnectivity struct { ctZoneSrcField *binding.RegField ipCtZoneTypeRegMarks map[binding.Protocol]*binding.RegMark enableMulticast bool + enableTrafficControl bool category cookie.Category } @@ -59,7 +62,8 @@ func newFeaturePodConnectivity( networkConfig *config.NetworkConfig, ovsDatapathType ovsconfig.OVSDatapathType, connectUplinkToBridge bool, - enableMulticast bool) *featurePodConnectivity { + enableMulticast bool, + enableTrafficControl bool) *featurePodConnectivity { ctZones := make(map[binding.Protocol]int) gatewayIPs := make(map[binding.Protocol]net.IP) localCIDRs := make(map[binding.Protocol]net.IPNet) @@ -90,6 +94,7 @@ func newFeaturePodConnectivity( ipProtocols: ipProtocols, nodeCachedFlows: newFlowCategoryCache(), podCachedFlows: newFlowCategoryCache(), + tcCachedFlows: newFlowCategoryCache(), gatewayIPs: gatewayIPs, ctZones: ctZones, localCIDRs: localCIDRs, @@ -98,6 +103,7 @@ func newFeaturePodConnectivity( networkConfig: networkConfig, ovsDatapathType: ovsDatapathType, connectUplinkToBridge: connectUplinkToBridge, + enableTrafficControl: enableTrafficControl, ipCtZoneTypeRegMarks: ipCtZoneTypeRegMarks, ctZoneSrcField: getZoneSrcField(connectUplinkToBridge), enableMulticast: enableMulticast, @@ -157,6 +163,9 @@ func (f *featurePodConnectivity) initFlows() []binding.Flow { // Pod IP will take care of routing the traffic to destination Pod. flows = append(flows, f.l3FwdFlowToLocalPodCIDR()...) } + if f.enableTrafficControl { + flows = append(flows, f.redirectedPktsFwdFlows()...) + } return flows } @@ -164,9 +173,99 @@ func (f *featurePodConnectivity) replayFlows() []binding.Flow { var flows []binding.Flow // Get cached flows. - for _, cachedFlows := range []*flowCategoryCache{f.nodeCachedFlows, f.podCachedFlows} { + for _, cachedFlows := range []*flowCategoryCache{f.nodeCachedFlows, f.podCachedFlows, f.tcCachedFlows} { flows = append(flows, getCachedFlows(cachedFlows)...) } return flows } + +// trafficControlMarkFlows generates the flows to mark the packets that need to be redirected or mirrored with the traffic +// control mark value. +func (f *featurePodConnectivity) trafficControlMarkFlows(mark uint32, srcOfPorts []uint32, direction v1alpha2.Direction) []binding.Flow { + cookieID := f.cookieAllocator.Request(f.category).Raw() + trafficControlRegMark := binding.NewRegMark(TrafficControlField, mark) + var flows []binding.Flow + for _, port := range srcOfPorts { + if direction == v1alpha2.DirectionIn || direction == v1alpha2.DirectionBoth { + flows = append(flows, IngressTrafficControlMarkTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegFieldWithValue(TargetOFPortField, port). + Action().LoadRegMark(trafficControlRegMark). + Action().NextTable(). + Done()) + } + if direction == v1alpha2.DirectionOut || direction == v1alpha2.DirectionBoth { + flows = append(flows, EgressTrafficControlMarkTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchInPort(port). + Action().LoadRegMark(trafficControlRegMark). + Action().NextTable(). + Done()) + } + } + return flows +} + +// trafficControlInitFlows generates the initial flows for the traffic control mark value. +func (f *featurePodConnectivity) trafficControlInitFlows(mark uint32, action v1alpha2.Action, ofInPort, ofOutPort uint32) []binding.Flow { + cookieID := f.cookieAllocator.Request(f.category).Raw() + var flows []binding.Flow + trafficControlCTMark := binding.NewCTMark(ConnTrafficControlCTMarkField, mark) + if action == v1alpha2.ActionMirror { + // When the traffic control mark is for action mirror, this generates the flow to output packets to the original out + // port as well as mirror the packets to the ofInPort which is used to receive mirrored packets in L2ForwardingOutTable. + flows = append(flows, L2ForwardingOutTable.ofTable.BuildFlow(priorityHigh+1). + Cookie(cookieID). + MatchCTMark(trafficControlCTMark). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Action().Output(ofInPort). + Done()) + } else if action == v1alpha2.ActionRedirect { + flows = append(flows, + // When the traffic control mark is for action redirect, the generates the flow to forward the redirected packets + // sourced from the ofOutPort to stageRouting directly in ClassifierTable. Note that, for the packets which are + // to be originally output to a tunnel, value of NXM_NX_TUN_IPV4_DST for the redirected packets needs to be + // loaded in stageRouting. + ClassifierTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchInPort(ofOutPort). + Action().LoadRegMark(FromRedirectRegMark). + Action().GotoStage(stageRouting). + Done(), + // When the traffic control mark is for action redirect, the generates the flow to forward the packets to be + // redirected (with CT mark whose value is equal to the traffic control mark value) to the ofOutPort which is + // used to receive the packets. + L2ForwardingOutTable.ofTable.BuildFlow(priorityHigh+1). + Cookie(cookieID). + MatchCTMark(trafficControlCTMark). + MatchRegMark(OFPortFoundRegMark). + Action().Output(ofInPort). + Done(), + ) + } + return flows +} + +// redirectedPktsFwdFlows generates the flows to forward the redirected packets. +func (f *featurePodConnectivity) redirectedPktsFwdFlows() []binding.Flow { + cookieID := f.cookieAllocator.Request(f.category).Raw() + return []binding.Flow{ + // This generates the flow to forward the redirected packets to stageOutput directly in TrafficControlRedirectOutTable, + // after loading output port to reg1 in L2ForwardingCalcTable. + TrafficControlRedirectOutTable.ofTable.BuildFlow(priorityNormal). + Cookie(cookieID). + MatchRegMark(FromRedirectRegMark). + MatchRegMark(OFPortFoundRegMark). + Action().GotoStage(stageOutput). + Done(), + // This generates the flow to forward the redirected packets to the original out port. + L2ForwardingOutTable.ofTable.BuildFlow(priorityHigh + 2). + Cookie(cookieID). + MatchRegMark(FromRedirectRegMark). + MatchRegMark(OFPortFoundRegMark). + Action().OutputToRegField(TargetOFPortField). + Done(), + } +} diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index b27dc64c12..0c8ba83153 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -44,6 +44,7 @@ type featureService struct { enableProxy bool proxyAll bool connectUplinkToBridge bool + enableTrafficControl bool ctZoneSrcField *binding.RegField category cookie.Category @@ -61,7 +62,8 @@ func newFeatureService( bridge binding.Bridge, enableProxy, proxyAll, - connectUplinkToBridge bool) *featureService { + connectUplinkToBridge, + enableTrafficControl bool) *featureService { gatewayIPs := make(map[binding.Protocol]net.IP) virtualIPs := make(map[binding.Protocol]net.IP) dnatCtZones := make(map[binding.Protocol]int) @@ -106,6 +108,7 @@ func newFeatureService( enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, + enableTrafficControl: enableTrafficControl, ctZoneSrcField: getZoneSrcField(connectUplinkToBridge), category: cookie.Service, } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 740cad169d..894446269b 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -22,6 +22,7 @@ package testing import ( config "antrea.io/antrea/pkg/agent/config" types "antrea.io/antrea/pkg/agent/types" + v1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" openflow "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" @@ -421,6 +422,34 @@ func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6) } +// InstallTrafficControlInitFlows mocks base method +func (m *MockClient) InstallTrafficControlInitFlows(arg0 uint32, arg1 v1alpha2.Action, arg2, arg3 uint32) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallTrafficControlInitFlows", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallTrafficControlInitFlows indicates an expected call of InstallTrafficControlInitFlows +func (mr *MockClientMockRecorder) InstallTrafficControlInitFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTrafficControlInitFlows", reflect.TypeOf((*MockClient)(nil).InstallTrafficControlInitFlows), arg0, arg1, arg2, arg3) +} + +// InstallTrafficControlMarkFlows mocks base method +func (m *MockClient) InstallTrafficControlMarkFlows(arg0 string, arg1 uint32, arg2 []uint32, arg3 v1alpha2.Direction) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallTrafficControlMarkFlows", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallTrafficControlMarkFlows indicates an expected call of InstallTrafficControlMarkFlows +func (mr *MockClientMockRecorder) InstallTrafficControlMarkFlows(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTrafficControlMarkFlows", reflect.TypeOf((*MockClient)(nil).InstallTrafficControlMarkFlows), arg0, arg1, arg2, arg3) +} + // IsConnected mocks base method func (m *MockClient) IsConnected() bool { m.ctrl.T.Helper() @@ -738,6 +767,34 @@ func (mr *MockClientMockRecorder) UninstallTraceflowFlows(arg0 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).UninstallTraceflowFlows), arg0) } +// UninstallTrafficControlInitFlows mocks base method +func (m *MockClient) UninstallTrafficControlInitFlows(arg0 byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallTrafficControlInitFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallTrafficControlInitFlows indicates an expected call of UninstallTrafficControlInitFlows +func (mr *MockClientMockRecorder) UninstallTrafficControlInitFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallTrafficControlInitFlows", reflect.TypeOf((*MockClient)(nil).UninstallTrafficControlInitFlows), arg0) +} + +// UninstallTrafficControlMarkFlows mocks base method +func (m *MockClient) UninstallTrafficControlMarkFlows(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallTrafficControlMarkFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallTrafficControlMarkFlows indicates an expected call of UninstallTrafficControlMarkFlows +func (mr *MockClientMockRecorder) UninstallTrafficControlMarkFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallTrafficControlMarkFlows", reflect.TypeOf((*MockClient)(nil).UninstallTrafficControlMarkFlows), arg0) +} + // MockOFEntryOperations is a mock of OFEntryOperations interface type MockOFEntryOperations struct { ctrl *gomock.Controller diff --git a/pkg/apis/crd/v1alpha2/types.go b/pkg/apis/crd/v1alpha2/types.go index 43e9361a82..11f6f9e942 100644 --- a/pkg/apis/crd/v1alpha2/types.go +++ b/pkg/apis/crd/v1alpha2/types.go @@ -379,3 +379,19 @@ type IPPoolList struct { Items []IPPool `json:"items"` } + +// TODO: remove this after #3487 is merged. +type Direction string + +const ( + DirectionIn Direction = "In" + DirectionOut Direction = "Out" + DirectionBoth Direction = "Both" +) + +type Action string + +const ( + ActionRedirect Action = "Redirect" + ActionMirror Action = "Mirror" +) diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 06d14f5c98..64219674dc 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -116,7 +116,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -164,7 +164,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, true, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, true, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -223,7 +223,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -265,7 +265,7 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -442,7 +442,7 @@ func TestNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -556,7 +556,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, true, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -604,7 +604,7 @@ func TestProxyServiceFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, true, false, false, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1389,21 +1389,20 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ip", ActStr: "drop"}, ) tableConntrackCommitFlows.flows = append(tableConntrackCommitFlows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_mark=0x10/0x10,ip", ActStr: fmt.Sprintf("goto_table:%s", outputStageTable)}, &ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ip", ActStr: fmt.Sprintf("ct(commit,table=%s,zone=%s,exec(move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3]))", outputStageTable, ctZone)}, ) tableSNATConntrackCommitFlows.flows = append(tableSNATConntrackCommitFlows.flows, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=+new+trk,ct_mark=0x40/0x40,ip,reg0=0x2/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6]))", config1.VirtualServiceIPv4), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6],load:0x1->NXM_NX_CT_MARK[5]))", config1.VirtualServiceIPv4), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=+new+trk,ct_mark=0x40/0x40,ip,reg0=0x3/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6]))", config.nodeConfig.GatewayConfig.IPv4), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6],load:0x1->NXM_NX_CT_MARK[5]))", config.nodeConfig.GatewayConfig.IPv4), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=190,ct_state=+new+trk,ct_mark=0x20/0x20,ip,reg0=0x2/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4]))", config.nodeConfig.GatewayConfig.IPv4), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[5]))", config.nodeConfig.GatewayConfig.IPv4), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=-new-rpl+trk,ct_mark=0x20/0x20,ip", @@ -1434,21 +1433,20 @@ func prepareDefaultFlows(config *testConfig) []expectTableFlows { &ofTestUtils.ExpectFlow{MatchStr: "priority=190,ct_state=+inv+trk,ipv6", ActStr: "drop"}, ) tableConntrackCommitFlows.flows = append(tableConntrackCommitFlows.flows, - &ofTestUtils.ExpectFlow{MatchStr: "priority=210,ct_mark=0x10/0x10,ipv6", ActStr: "goto_table:Output"}, &ofTestUtils.ExpectFlow{MatchStr: "priority=200,ct_state=+new+trk,ipv6", ActStr: fmt.Sprintf("ct(commit,table=Output,zone=%s,exec(move:NXM_NX_REG0[0..3]->NXM_NX_CT_MARK[0..3]))", ctZoneV6)}, ) tableSNATConntrackCommitFlows.flows = append(tableSNATConntrackCommitFlows.flows, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=+new+trk,ct_mark=0x40/0x40,ipv6,reg0=0x2/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6]))", config1.VirtualServiceIPv6), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6],load:0x1->NXM_NX_CT_MARK[5]))", config1.VirtualServiceIPv6), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=+new+trk,ct_mark=0x40/0x40,ipv6,reg0=0x3/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6]))", config.nodeConfig.GatewayConfig.IPv6), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[6],load:0x1->NXM_NX_CT_MARK[5]))", config.nodeConfig.GatewayConfig.IPv6), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=190,ct_state=+new+trk,ct_mark=0x20/0x20,ipv6,reg0=0x2/0xf", - ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4]))", config.nodeConfig.GatewayConfig.IPv6), + ActStr: fmt.Sprintf("ct(commit,table=L2ForwardingCalc,zone=65511,nat(src=%s),exec(load:0x1->NXM_NX_CT_MARK[4],load:0x1->NXM_NX_CT_MARK[5]))", config.nodeConfig.GatewayConfig.IPv6), }, &ofTestUtils.ExpectFlow{ MatchStr: "priority=200,ct_state=-new-rpl+trk,ct_mark=0x20/0x20,ipv6", @@ -1632,7 +1630,7 @@ func TestEgressMarkFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, false, false, true, false, false, false, false) + c = ofClient.NewClient(br, bridgeMgmtAddr, ovsconfig.OVSDatapathNetdev, false, false, true, false, false, false, false, false) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))