Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement traffic control interfaces via OVS #3580

Merged
merged 1 commit into from May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Expand Up @@ -131,7 +131,9 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.FlowExporter),
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
features.DefaultFeatureGate.Enabled(features.Multicast))
features.DefaultFeatureGate.Enabled(features.Multicast),
features.DefaultFeatureGate.Enabled(features.TrafficControl),
)

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
var serviceCIDRNetv6 *net.IPNet
Expand Down
46 changes: 45 additions & 1 deletion pkg/agent/openflow/client.go
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -279,6 +280,18 @@ type Client interface {
dstIP net.IP,
outPort uint32,
igmp ofutil.Message) error

// InstallTrafficControlMarkFlows installs the flows to mark the packets for a traffic control rule.
InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error

// UninstallTrafficControlMarkFlows removes the flows for a traffic control rule.
UninstallTrafficControlMarkFlows(name string) error

// InstallTrafficControlReturnPortFlow installs the flow to classify the packets from a return port.
InstallTrafficControlReturnPortFlow(returnOFPort uint32) error

// UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port.
UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error
}

// GetFlowTableStatus returns an array of flow table status.
Expand Down Expand Up @@ -697,7 +710,8 @@ func (c *client) generatePipelines() {
c.networkConfig,
c.connectUplinkToBridge,
c.enableMulticast,
c.proxyAll)
c.proxyAll,
c.enableTrafficControl)
c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity)
c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity)

Expand Down Expand Up @@ -1129,3 +1143,33 @@ func (c *client) SendIGMPQueryPacketOut(
packetOutObj := packetOutBuilder.Done()
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error {
flows := c.featurePodConnectivity.trafficControlMarkFlows(sourceOFPorts, targetOFPort, direction, action)
cacheKey := fmt.Sprintf("tc_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.modifyFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
Copy link
Contributor

@wenyingd wenyingd Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using modifyFlows, it might delete the original tc flows which are not included in the flows generated by this call. I think your target is to append these flows in the cache but not overwrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a traffic rule could be updated, the original tc flows should be deleted and new flows should be installed. Flows in the cache should be also overwritten.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you mean InstallTrafficControlMarkFlows is called for both add and update case? And UninstallTrafficControlMarkFlows is called only when delete the tc configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

}

func (c *client) UninstallTrafficControlMarkFlows(name string) error {
cacheKey := fmt.Sprintf("tc_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}

func (c *client) InstallTrafficControlReturnPortFlow(returnOFPort uint32) error {
cacheKey := fmt.Sprintf("tc_%d", returnOFPort)
flows := []binding.Flow{c.featurePodConnectivity.trafficControlReturnClassifierFlow(returnOFPort)}
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
}

func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error {
cacheKey := fmt.Sprintf("tc_%d", returnOFPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}
14 changes: 7 additions & 7 deletions pkg/agent/openflow/client_test.go
Expand Up @@ -107,7 +107,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestFlowInstallationFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestConcurrentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -417,7 +417,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) {
}

func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.nodeConfig = nodeConfig
Expand All @@ -443,7 +443,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
}

func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.nodeConfig = nodeConfig
m := ovsoftest.NewMockBridge(ctrl)
Expand Down Expand Up @@ -531,7 +531,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) {
}

func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
m := ovsoftest.NewMockBridge(ctrl)
c.bridge = m
Expand Down
33 changes: 22 additions & 11 deletions pkg/agent/openflow/fields.go
Expand Up @@ -21,11 +21,12 @@ import (

// Fields using reg.
var (
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
tcReturnVal = uint32(6)

// reg0 (NXM_NX_REG0)
// reg0[0..3]: Field to store the packet source. Marks in this field include:
Expand All @@ -34,12 +35,14 @@ var (
// - 3: from local Pods.
// - 4: from uplink port.
// - 5: from bridge local port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
// - 6: from traffic control return port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
FromTCReturnRegMark = binding.NewRegMark(PktSourceField, tcReturnVal)
// reg0[4..7]: Field to store the packet destination. Marks in this field include:
// - 1: to tunnel port.
// - 2: to Antrea gateway port.
Expand Down Expand Up @@ -123,6 +126,10 @@ var (
NotAntreaFlexibleIPAMRegMark = binding.NewOneBitZeroRegMark(4, 20, "NotAntreaFlexibleIPAM")
// reg4[21]: Mark to indicate externalTrafficPolicy of the Service is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21, "ToClusterService")
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlActionField = binding.NewRegField(4, 22, 23, "TrafficControlAction")
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10)

// reg5(NXM_NX_REG5)
// Field to cache the Egress conjunction ID hit by TraceFlow packet.
Expand All @@ -147,6 +154,10 @@ var (
IPv6CtZoneTypeRegMark = binding.NewRegMark(CtZoneTypeField, 0b0011)
// Field to store the CtZone ID, which is a combination of VLANIDField and CtZoneTypeField to indicate CtZone for DstNAT.
CtZoneField = binding.NewRegField(8, 0, 15, "CtZoneID")

// reg9(NXM_NX_REG9)
// Field to cache the ofPort of the OVS interface to output traffic control packets.
TrafficControlTargetOFPortField = binding.NewRegField(9, 0, 31, "TrafficControlTargetOFPort")
)

// Fields using xxreg.
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/openflow/framework.go
Expand Up @@ -187,6 +187,9 @@ func (f *featurePodConnectivity) getRequiredTables() []*Table {
}
}
}
if f.enableTrafficControl {
tables = append(tables, TrafficControlTable)
}

return tables
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/network_policy_test.go
Expand Up @@ -524,7 +524,7 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOperations := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, false, false)
c = ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.ofEntryOperations = mockOperations
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/openflow/pipeline.go
Expand Up @@ -161,6 +161,7 @@ var (

// Tables in stageSwitching:
L2ForwardingCalcTable = newTable("L2ForwardingCalc", stageSwitching, pipelineIP)
TrafficControlTable = newTable("TrafficControl", stageSwitching, pipelineIP)

// Tables in stageIngressSecurity:
IngressSecurityClassifierTable = newTable("IngressSecurityClassifier", stageIngressSecurity, pipelineIP)
Expand Down Expand Up @@ -389,6 +390,7 @@ type client struct {
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
connectUplinkToBridge bool
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
Expand Down Expand Up @@ -2702,7 +2704,8 @@ func NewClient(bridgeName string,
enableDenyTracking bool,
proxyAll bool,
connectUplinkToBridge bool,
enableMulticast bool) Client {
enableMulticast bool,
enableTrafficControl bool) Client {
bridge := binding.NewOFBridge(bridgeName, mgmtAddr)
c := &client{
bridge: bridge,
Expand All @@ -2712,6 +2715,7 @@ func NewClient(bridgeName string,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]map[string]PacketInHandler{},
Expand Down