Skip to content

Commit

Permalink
Implement traffic control interfaces via OVS
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Apr 27, 2022
1 parent 00f9d98 commit 04d3598
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 35 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Expand Up @@ -131,7 +131,10 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.FlowExporter),
o.config.AntreaProxy.ProxyAll,
connectUplinkToBridge,
features.DefaultFeatureGate.Enabled(features.Multicast))
features.DefaultFeatureGate.Enabled(features.Multicast),
// TODO: replace with features.DefaultFeatureGate.Enabled(features.TrafficControl),
false,
)

_, serviceCIDRNet, _ := net.ParseCIDR(o.config.ServiceCIDR)
var serviceCIDRNetv6 *net.IPNet
Expand Down
46 changes: 45 additions & 1 deletion pkg/agent/openflow/client.go
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/third_party/proxy"
Expand Down Expand Up @@ -279,6 +280,18 @@ type Client interface {
dstIP net.IP,
outPort uint32,
igmp ofutil.Message) error

// InstallTrafficControlMarkFlows installs the flows to mark the packets for a traffic control rule.
InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error

// UninstallTrafficControlMarkFlows removes the flows for a traffic control rule.
UninstallTrafficControlMarkFlows(name string) error

// InstallTrafficControlReturnPortFlow installs the flow to classify the packets from a return port.
InstallTrafficControlReturnPortFlow(returnOFPort uint32) error

// UninstallTrafficControlReturnPortFlow removes the flow to classify the packets from a return port.
UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error
}

// GetFlowTableStatus returns an array of flow table status.
Expand Down Expand Up @@ -697,7 +710,8 @@ func (c *client) generatePipelines() {
c.networkConfig,
c.connectUplinkToBridge,
c.enableMulticast,
c.proxyAll)
c.proxyAll,
c.enableTrafficControl)
c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity)
c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity)

Expand Down Expand Up @@ -1129,3 +1143,33 @@ func (c *client) SendIGMPQueryPacketOut(
packetOutObj := packetOutBuilder.Done()
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallTrafficControlMarkFlows(name string, sourceOFPorts []uint32, targetOFPort uint32, direction v1alpha2.Direction, action v1alpha2.TrafficControlAction) error {
flows := c.featurePodConnectivity.trafficControlMarkFlows(sourceOFPorts, targetOFPort, direction, action)
cacheKey := fmt.Sprintf("tc_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.modifyFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
}

func (c *client) UninstallTrafficControlMarkFlows(name string) error {
cacheKey := fmt.Sprintf("tc_%s", name)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}

func (c *client) InstallTrafficControlReturnPortFlow(returnOFPort uint32) error {
cacheKey := fmt.Sprintf("tc_%d", returnOFPort)
flows := []binding.Flow{c.featurePodConnectivity.trafficControlReturnClassifierFlow(returnOFPort)}
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.addFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey, flows)
}

func (c *client) UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error {
cacheKey := fmt.Sprintf("tc_%d", returnOFPort)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.deleteFlows(c.featurePodConnectivity.tcCachedFlows, cacheKey)
}
14 changes: 7 additions & 7 deletions pkg/agent/openflow/client_test.go
Expand Up @@ -107,7 +107,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestIdempotentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestFlowInstallationFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestConcurrentFlowInstallation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, false, false, false, false)
client := ofClient.(*client)
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
Expand Down Expand Up @@ -417,7 +417,7 @@ func Test_client_SendTraceflowPacket(t *testing.T) {
}

func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.nodeConfig = nodeConfig
Expand All @@ -443,7 +443,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
}

func prepareSendTraceflowPacket(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
c.nodeConfig = nodeConfig
m := ovsoftest.NewMockBridge(ctrl)
Expand Down Expand Up @@ -531,7 +531,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) {
}

func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, true, false, false, false, false, false, false)
c := ofClient.(*client)
m := ovsoftest.NewMockBridge(ctrl)
c.bridge = m
Expand Down
33 changes: 22 additions & 11 deletions pkg/agent/openflow/fields.go
Expand Up @@ -21,11 +21,12 @@ import (

// Fields using reg.
var (
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
tunnelVal = uint32(1)
gatewayVal = uint32(2)
localVal = uint32(3)
uplinkVal = uint32(4)
bridgeVal = uint32(5)
tcReturnVal = uint32(6)

// reg0 (NXM_NX_REG0)
// reg0[0..3]: Field to store the packet source. Marks in this field include:
Expand All @@ -34,12 +35,14 @@ var (
// - 3: from local Pods.
// - 4: from uplink port.
// - 5: from bridge local port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
// - 6: from traffic control return port.
PktSourceField = binding.NewRegField(0, 0, 3, "PacketSource")
FromTunnelRegMark = binding.NewRegMark(PktSourceField, tunnelVal)
FromGatewayRegMark = binding.NewRegMark(PktSourceField, gatewayVal)
FromLocalRegMark = binding.NewRegMark(PktSourceField, localVal)
FromUplinkRegMark = binding.NewRegMark(PktSourceField, uplinkVal)
FromBridgeRegMark = binding.NewRegMark(PktSourceField, bridgeVal)
FromTCRedirectRegMark = binding.NewRegMark(PktSourceField, tcReturnVal)
// reg0[4..7]: Field to store the packet destination. Marks in this field include:
// - 1: to tunnel port.
// - 2: to Antrea gateway port.
Expand Down Expand Up @@ -123,6 +126,10 @@ var (
NotAntreaFlexibleIPAMRegMark = binding.NewOneBitZeroRegMark(4, 20, "NotAntreaFlexibleIPAM")
// reg4[21]: Mark to indicate externalTrafficPolicy of the Service is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21, "ToClusterService")
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlField = binding.NewRegField(4, 22, 23, "TrafficControlAction")
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlField, 0b10)

// reg5(NXM_NX_REG5)
// Field to cache the Egress conjunction ID hit by TraceFlow packet.
Expand All @@ -147,6 +154,10 @@ var (
IPv6CtZoneTypeRegMark = binding.NewRegMark(CtZoneTypeField, 0b0011)
// Field to store the CtZone ID, which is a combination of VLANIDField and CtZoneTypeField to indicate CtZone for DstNAT.
CtZoneField = binding.NewRegField(8, 0, 15, "CtZoneID")

// reg9(NXM_NX_REG9)
// Field to cache the ofPort of the OVS interface to output traffic control packets.
TargetTrafficControlOFPortField = binding.NewRegField(9, 0, 31, "TargetTrafficControlOFPort")
)

// Fields using xxreg.
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/openflow/framework.go
Expand Up @@ -187,6 +187,13 @@ func (f *featurePodConnectivity) getRequiredTables() []*Table {
}
}
}
if f.enableTrafficControl {
tables = append(tables,
EgressTrafficControlMarkTable,
TrafficControlRedirectOutTable,
IngressTrafficControlMarkTable,
)
}

return tables
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/network_policy_test.go
Expand Up @@ -524,7 +524,7 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOperations := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, false, false)
c = ofClient.(*client)
c.cookieAllocator = cookie.NewAllocator(0)
c.ofEntryOperations = mockOperations
Expand Down
14 changes: 10 additions & 4 deletions pkg/agent/openflow/pipeline.go
Expand Up @@ -122,7 +122,8 @@ var (
// Tables of pipelineIP are declared below.

// Tables in stageClassifier:
ClassifierTable = newTable("Classifier", stageClassifier, pipelineIP, defaultDrop)
ClassifierTable = newTable("Classifier", stageClassifier, pipelineIP, defaultDrop)
EgressTrafficControlMarkTable = newTable("EgressTrafficControlMark", stageClassifier, pipelineIP)

// Tables in stageValidation:
SpoofGuardTable = newTable("SpoofGuard", stageValidation, pipelineIP, defaultDrop)
Expand Down Expand Up @@ -160,7 +161,9 @@ var (
SNATConntrackCommitTable = newTable("SNATConntrackCommit", stagePostRouting, pipelineIP)

// Tables in stageSwitching:
L2ForwardingCalcTable = newTable("L2ForwardingCalc", stageSwitching, pipelineIP)
L2ForwardingCalcTable = newTable("L2ForwardingCalc", stageSwitching, pipelineIP)
TrafficControlRedirectOutTable = newTable("TrafficControlRedirectOut", stageSwitching, pipelineIP)
IngressTrafficControlMarkTable = newTable("IngressTrafficControlMark", stageSwitching, pipelineIP)

// Tables in stageIngressSecurity:
IngressSecurityClassifierTable = newTable("IngressSecurityClassifier", stageIngressSecurity, pipelineIP)
Expand Down Expand Up @@ -389,6 +392,7 @@ type client struct {
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
connectUplinkToBridge bool
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
Expand Down Expand Up @@ -598,7 +602,7 @@ func (f *featurePodConnectivity) podClassifierFlow(podOFPort uint32, isAntreaFle
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchInPort(podOFPort).
Action().LoadRegMark(regMarksToLoad...).
Action().GotoStage(stageValidation).
Action().NextTable().
Done()
}

Expand Down Expand Up @@ -2702,7 +2706,8 @@ func NewClient(bridgeName string,
enableDenyTracking bool,
proxyAll bool,
connectUplinkToBridge bool,
enableMulticast bool) Client {
enableMulticast bool,
enableTrafficControl bool) Client {
bridge := binding.NewOFBridge(bridgeName, mgmtAddr)
c := &client{
bridge: bridge,
Expand All @@ -2712,6 +2717,7 @@ func NewClient(bridgeName string,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]map[string]PacketInHandler{},
Expand Down

0 comments on commit 04d3598

Please sign in to comment.