Skip to content

Commit

Permalink
Implement traffic control interfaces via OVS
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Apr 12, 2022
1 parent c36cd12 commit 1333d66
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 60 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Expand Up @@ -130,7 +130,10 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.FlowExporter),
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
features.DefaultFeatureGate.Enabled(features.Multicast))
features.DefaultFeatureGate.Enabled(features.Multicast),
// TODO: replace with features.DefaultFeatureGate.Enabled(features.TrafficControl),
false,
)

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
var serviceCIDRNetv6 *net.IPNet
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Expand Up @@ -177,4 +177,7 @@ require (

// Newer version of github.com/googleapis/gnostic make use of newer gopkg.in/yaml(v3), which conflicts with
// explicit imports of gopkg.in/yaml.v2.
replace github.com/googleapis/gnostic v0.5.5 => github.com/googleapis/gnostic v0.4.1
replace (
antrea.io/ofnet v0.5.5 => github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5
github.com/googleapis/gnostic v0.5.5 => github.com/googleapis/gnostic v0.4.1
)
4 changes: 2 additions & 2 deletions go.sum
@@ -1,7 +1,5 @@
antrea.io/libOpenflow v0.6.2 h1:1JMSJ7Lp7yOhKybHey9VDtRI6JuIgkhUWJBX5GIFY9I=
antrea.io/libOpenflow v0.6.2/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/ofnet v0.5.5 h1:4CLYWqQE4/XyuIUaTSqB83Zj+YnuplrlEXPvVm/r0JE=
antrea.io/ofnet v0.5.5/go.mod h1:8TJVF6MLe9/gZ/KbhGUvULs9/TxssepEaYEe+o1SEgs=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -767,6 +765,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae h1:4hwBBUfQCFe3C
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vmware/go-ipfix v0.5.12 h1:mqQknlvnvDY25apPNy9c27ri3FMDFIhzvO68Kk5Qp58=
github.com/vmware/go-ipfix v0.5.12/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY=
github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5 h1:mx94DJfWe5LkwkaIRLpBL/eSILpvs2WvuG38fKOTMF0=
github.com/wenyingd/ofnet v0.0.0-20220411025655-52ebfca401f5/go.mod h1:8TJVF6MLe9/gZ/KbhGUvULs9/TxssepEaYEe+o1SEgs=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
50 changes: 48 additions & 2 deletions pkg/agent/openflow/client.go
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -279,6 +280,19 @@ type Client interface {
dstIP net.IP,
outPort uint32,
igmp ofutil.Message) error

// InstallTrafficControlMarkFlows installs the flows to mark the packets for a traffic control rule with the traffic control
// mark value.
InstallTrafficControlMarkFlows(name string, mark uint32, srcOfPorts []uint32, direction v1alpha2.Direction) error

// UninstallTrafficControlMarkFlows removes the flows for a traffic control rule.
UninstallTrafficControlMarkFlows(name string) error

// InstallTrafficControlInitFlows installs the initial flows for the traffic control mark value.
InstallTrafficControlInitFlows(mark uint32, action v1alpha2.Action, dstOfInPort, dstOfOutPort uint32) error

// UninstallTrafficControlInitFlows removes the flows for the traffic control mark value.
UninstallTrafficControlInitFlows(mark uint8) error
}

// GetFlowTableStatus returns an array of flow table status.
Expand Down Expand Up @@ -697,7 +711,8 @@ func (c *client) generatePipelines() {
c.networkConfig,
c.ovsDatapathType,
c.connectUplinkToBridge,
c.enableMulticast)
c.enableMulticast,
c.enableTrafficControl)
c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity)
c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity)

Expand All @@ -718,7 +733,8 @@ func (c *client) generatePipelines() {
c.bridge,
c.enableProxy,
c.proxyAll,
c.connectUplinkToBridge)
c.connectUplinkToBridge,
c.enableTrafficControl)
c.activatedFeatures = append(c.activatedFeatures, c.featureService)
c.traceableFeatures = append(c.traceableFeatures, c.featureService)

Expand Down Expand Up @@ -1128,3 +1144,33 @@ func (c *client) SendIGMPQueryPacketOut(
packetOutObj := packetOutBuilder.Done()
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallTrafficControlMarkFlows(name string, mark uint32, srcOfPorts []uint32, direction v1alpha2.Direction) error {
flows := c.featurePodConnectivity.trafficControlMarkFlows(mark, srcOfPorts, direction)
cacheKey := fmt.Sprintf("trafficControl_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
}

func (c *client) UninstallTrafficControlMarkFlows(name string) error {
cacheKey := fmt.Sprintf("trafficControl_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}

func (c *client) InstallTrafficControlInitFlows(mark uint32, action v1alpha2.Action, dstOfInPort, dstOfOutPort uint32) error {
cacheKey := fmt.Sprintf("trafficControl_%d", mark)
flows := c.featurePodConnectivity.trafficControlInitFlows(mark, action, dstOfInPort, dstOfOutPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
}

func (c *client) UninstallTrafficControlInitFlows(mark uint8) error {
cacheKey := fmt.Sprintf("trafficControl_%d", mark)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}
14 changes: 7 additions & 7 deletions pkg/agent/openflow/client_test.go
Expand Up @@ -107,7 +107,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestFlowInstallationFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestConcurrentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -417,7 +417,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) {
}

func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.nodeConfig = nodeConfig
Expand All @@ -443,7 +443,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
}

func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.nodeConfig = nodeConfig
m := ovsoftest.NewMockBridge(ctrl)
Expand Down Expand Up @@ -531,7 +531,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) {
}

func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
m := ovsoftest.NewMockBridge(ctrl)
c.bridge = m
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/openflow/cookie/allocator.go
Expand Up @@ -37,6 +37,7 @@ const (
Egress
Multicast
Traceflow
TrafficControl
)

func (c Category) String() string {
Expand Down
34 changes: 22 additions & 12 deletions pkg/agent/openflow/fields.go
Expand Up @@ -21,11 +21,12 @@ import (

// Fields using reg.
var (
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
redirectVal = uint32(6)

// reg0 (NXM_NX_REG0)
// reg0[0..3]: Field to store the packet source. Marks in this field include:
Expand All @@ -34,12 +35,14 @@ var (
// - 3: from local Pods.
// - 4: from uplink port.
// - 5: from bridge local port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
// - 6: from traffic control redirecting out port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
FromRedirectRegMark = binding.NewRegMark(PktSourceField, redirectVal)
// reg0[4..7]: Field to store the packet destination. Marks in this field include:
// - 1: to tunnel port.
// - 2: to Antrea gateway port.
Expand Down Expand Up @@ -82,6 +85,8 @@ var (
CustomReasonDenyRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDeny)
CustomReasonDNSRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDNS)
CustomReasonIGMPRegMark = binding.NewRegMark(CustomReasonField, CustomReasonIGMP)
// reg0[17..18]: Field to store the mark value for the packets that need to be mirrored or redirected.
TrafficControlField = binding.NewRegField(0, 18, 21, "TrafficControl")

// reg1(NXM_NX_REG1)
// Field to cache the ofPort of the OVS interface where to output packet.
Expand Down Expand Up @@ -175,7 +180,8 @@ var (

// CTMark[4]: Mark to indicate DNAT is performed on the connection for Service.
// This CT mark is used in CtZone / CtZoneV6 and SNATCtZone / SNATCtZoneV6.
ServiceCTMark = binding.NewOneBitCTMark(4)
ServiceCTMark = binding.NewOneBitCTMark(4)
NotServiceCTMark = binding.NewOneBitZeroCTMark(4)

// CTMark[5]: Mark to indicate SNAT should be performed on the connection for Service.
// This CT mark is only used in CtZone / CtZoneV6.
Expand All @@ -184,6 +190,10 @@ var (
// CTMark[6]: Mark to indicate the connection is hairpin.
// This CT mark is used in CtZone / CtZoneV6 and SNATCtZone / SNATCtZoneV6.
HairpinCTMark = binding.NewOneBitCTMark(6)

// CTMark[6]: Field to store the mark value for connections that need to be mirrored or redirected.
// This CT mark is only used in CtZone / CtZoneV6.
ConnTrafficControlCTMarkField = binding.NewCTMarkField(7, 10)
)

// Fields using CT label.
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/openflow/framework.go
Expand Up @@ -187,6 +187,13 @@ func (f *featurePodConnectivity) getRequiredTables() []*Table {
}
}
}
if f.enableTrafficControl {
tables = append(tables,
EgressTrafficControlMarkTable,
TrafficControlRedirectOutTable,
IngressTrafficControlMarkTable,
)
}

return tables
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/network_policy_test.go
Expand Up @@ -519,7 +519,7 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOperations := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, ovsconfig.OVSDatapathSystem, false, true, false, false, false, false, false, false)
c = ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.ofEntryOperations = mockOperations
Expand Down

0 comments on commit 1333d66

Please sign in to comment.