Skip to content

Commit

Permalink
Add trafficControlController to handle TrafficControl requests (#3487)
Browse files Browse the repository at this point in the history
- Use label selectors to filter Pods running on current Node.
- Translate the selected Pods to OVS ports, which will be used
  to filter the packets that should be mirrored or redirected.
- Translate the target device to the OVS port, which will be
  used as the target port the traffic should be mirrored or
  redirected.
- Install OpenFlow rules calculated using the above arguments.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
Co-authored-by: Quan Tian <qtian@vmware.com>
Co-authored-by: Wenqi Qiu <wenqiq@vmware.com>
  • Loading branch information
wenqiq and tnqn committed Jun 10, 2022
1 parent 2736201 commit 4e60600
Show file tree
Hide file tree
Showing 13 changed files with 2,776 additions and 18 deletions.
21 changes: 19 additions & 2 deletions cmd/antrea-agent/agent.go
Expand Up @@ -41,6 +41,7 @@ import (
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/serviceexternalip"
"antrea.io/antrea/pkg/agent/controller/traceflow"
"antrea.io/antrea/pkg/agent/controller/trafficcontrol"
"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/flowexporter/exporter"
"antrea.io/antrea/pkg/agent/interfacestore"
Expand All @@ -65,6 +66,7 @@ import (
"antrea.io/antrea/pkg/monitor"
ofconfig "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/cipher"
Expand Down Expand Up @@ -99,10 +101,12 @@ func run(o *Options) error {
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync)
traceflowInformer := crdInformerFactory.Crd().V1alpha1().Traceflows()
egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses()
externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools()
trafficControlInformer := crdInformerFactory.Crd().V1alpha2().TrafficControls()
nodeInformer := informerFactory.Core().V1().Nodes()
serviceInformer := informerFactory.Core().V1().Services()
endpointsInformer := informerFactory.Core().V1().Endpoints()
externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools()
namespaceInformer := informerFactory.Core().V1().Namespaces()

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
Expand Down Expand Up @@ -504,7 +508,8 @@ func run(o *Options) error {
// Initialize localPodInformer for NPLAgent, AntreaIPAMController, and secondary network controller.
var localPodInformer cache.SharedIndexInformer
if enableNodePortLocal || enableBridgingMode ||
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) ||
features.DefaultFeatureGate.Enabled(features.TrafficControl) {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
Expand Down Expand Up @@ -578,6 +583,18 @@ func run(o *Options) error {
go podWatchController.Run(stopCh)
}

if features.DefaultFeatureGate.Enabled(features.TrafficControl) {
tcController := trafficcontrol.NewTrafficControlController(ofClient,
ifaceStore,
ovsBridgeClient,
ovsctl.NewClient(o.config.OVSBridge),
trafficControlInformer,
localPodInformer,
namespaceInformer,
podUpdateChannel)
go tcController.Run(stopCh)
}

// Start the localPodInformer
if localPodInformer != nil {
go localPodInformer.Run(stopCh)
Expand Down
5 changes: 4 additions & 1 deletion pkg/agent/agent.go
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/controller/trafficcontrol"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/openflow/cookie"
Expand Down Expand Up @@ -280,6 +281,8 @@ func (i *Initializer) initInterfaceStore() error {
case interfacestore.AntreaContainer:
// The port should be for a container interface.
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort, true)
case interfacestore.AntreaTrafficControl:
intf = trafficcontrol.ParseTrafficControlInterfaceConfig(port, ovsPort)
default:
klog.InfoS("Unknown Antrea interface type", "type", interfaceType)
}
Expand Down Expand Up @@ -750,7 +753,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
externalIDs := map[string]interface{}{
interfacestore.AntreaInterfaceTypeKey: interfacestore.AntreaTunnel,
}
tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName, i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", "", externalIDs)
tunnelPortUUID, err := i.ovsBridgeClient.CreateTunnelPortExt(tunnelPortName, i.networkConfig.TunnelType, config.DefaultTunOFPort, shouldEnableCsum, localIPStr, "", "", "", nil, externalIDs)
if err != nil {
klog.Errorf("Failed to create tunnel port %s type %s on OVS bridge: %v", tunnelPortName, i.networkConfig.TunnelType, err)
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/controller/noderoute/node_route_controller.go
Expand Up @@ -686,6 +686,7 @@ func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int3
nodeIP.String(),
remoteName,
psk,
nil,
ovsExternalIDs)
if err != nil {
return 0, fmt.Errorf("failed to create IPsec tunnel port for Node %s", nodeName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Expand Up @@ -341,11 +341,11 @@ func TestCreateIPSecTunnelPortPSK(t *testing.T) {
node2PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-2")
c.ovsClient.EXPECT().CreateTunnelPortExt(
node1PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP1.String(), "", "changeme",
false, "", nodeIP1.String(), "", "changeme", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1)
c.ovsClient.EXPECT().CreateTunnelPortExt(
node2PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP2.String(), "", "changeme",
false, "", nodeIP2.String(), "", "changeme", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-2"}).Times(1)
c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil)
c.ovsClient.EXPECT().GetOFPort(node2PortName, false).Return(int32(2), nil)
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestCreateIPSecTunnelPortCert(t *testing.T) {
node1PortName := util.GenerateNodeTunnelInterfaceName("xyz-k8s-0-1")
c.ovsClient.EXPECT().CreateTunnelPortExt(
node1PortName, ovsconfig.TunnelType("vxlan"), int32(0),
false, "", nodeIP1.String(), "xyz-k8s-0-1", "",
false, "", nodeIP1.String(), "xyz-k8s-0-1", "", nil,
map[string]interface{}{ovsExternalIDNodeName: "xyz-k8s-0-1"}).Times(1)
c.ovsClient.EXPECT().GetOFPort(node1PortName, false).Return(int32(1), nil)

Expand Down

0 comments on commit 4e60600

Please sign in to comment.