Skip to content

Commit

Permalink
Merge c24e914 into b85d2c0
Browse files Browse the repository at this point in the history
  • Loading branch information
tnqn committed Mar 1, 2022
2 parents b85d2c0 + c24e914 commit b43ad53
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 108 deletions.
20 changes: 11 additions & 9 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
"antrea.io/antrea/pkg/agent/stats"
"antrea.io/antrea/pkg/agent/types"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/controller/externalippool"
"antrea.io/antrea/pkg/features"
Expand All @@ -62,6 +61,7 @@ import (
ofconfig "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/cipher"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/version"
Expand Down Expand Up @@ -257,10 +257,10 @@ func run(o *Options) error {
}
}

// entityUpdates is a channel for receiving entity updates from CNIServer and
// notifying NetworkPolicyController to reconcile rules related to the
// updated entities.
entityUpdates := make(chan types.EntityReference, 100)
// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
Expand All @@ -278,7 +278,7 @@ func run(o *Options) error {
ofClient,
ifaceStore,
nodeConfig.Name,
entityUpdates,
podUpdateChannel,
groupCounters,
groupIDUpdates,
antreaPolicyEnabled,
Expand Down Expand Up @@ -328,7 +328,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer,
memberlistCluster, egressInformer, podUpdateChannel,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down Expand Up @@ -365,12 +365,12 @@ func run(o *Options) error {
var cniPodInfoStore cnipodcache.CNIPodInfoStore
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, cniPodInfoStore)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, nil)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
}
Expand Down Expand Up @@ -460,6 +460,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go podUpdateChannel.Run(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)
Expand Down
31 changes: 14 additions & 17 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -63,9 +62,9 @@ type podConfigurator struct {
ifaceStore interfacestore.InterfaceStore
gatewayMAC net.HardwareAddr
ifConfigurator *ifConfigurator
// entityUpdates is a channel for notifying updates of local endpoints / entities (most notably Pod)
// to other components which may benefit from this information, i.e NetworkPolicyController.
entityUpdates chan<- types.EntityReference
// podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this
// information, i.e. NetworkPolicyController, EgressController.
podUpdateNotifier channel.Notifier
// consumed by secondary network creation.
podInfoStore cnipodcache.CNIPodInfoStore
}
Expand All @@ -78,22 +77,22 @@ func newPodConfigurator(
gatewayMAC net.HardwareAddr,
ovsDatapathType ovsconfig.OVSDatapathType,
isOvsHardwareOffloadEnabled bool,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled)
if err != nil {
return nil, err
}
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
entityUpdates: entityUpdates,
podInfoStore: podInfoStore,
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podInfoStore: podInfoStore,
}, nil
}

Expand Down Expand Up @@ -486,9 +485,7 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
// Add containerConfig into local cache
pc.ifaceStore.AddInterface(containerConfig)
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: containerConfig.PodName, Namespace: containerConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName))
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/util/k8s"
)

Expand All @@ -51,9 +49,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: ifConfig.PodName, Namespace: ifConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName))
return nil
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *CNIServer) Initialize(
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) error {
var err error
Expand All @@ -602,7 +602,7 @@ func (s *CNIServer) Initialize(

s.podConfigurator, err = newPodConfigurator(
ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), entityUpdates,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), podUpdateNotifier,
podInfoStore,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
antreatypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := ""
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand All @@ -411,7 +411,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := "0000:03:00.6"
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, channel.NewSubscribableChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestRemoveInterface(t *testing.T) {
ifaceStore := interfacestore.NewInterfaceStore()
routeMock := routetest.NewMockInterface(controller)
gwMAC, _ := net.ParseMAC("00:00:11:11:11:11")
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, make(chan antreatypes.EntityReference, 100), nil)
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
require.Nil(t, err, "No error expected in podConfigurator constructor")

containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff")
Expand Down
19 changes: 17 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand All @@ -48,6 +47,7 @@ import (
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2"
"antrea.io/antrea/pkg/controller/metrics"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -154,7 +154,7 @@ func NewEgressController(
nodeTransportInterface string,
cluster *memberlist.Cluster,
egressInformer crdinformers.EgressInformer,
nodeInformer coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
) (*EgressController, error) {
c := &EgressController{
ofClient: ofClient,
Expand Down Expand Up @@ -207,11 +207,26 @@ func NewEgressController(
},
resyncPeriod,
)
// Subscribe Pod update events from CNIServer to enforce Egress earlier, instead of waiting for their IPs are
// reported to kube-apiserver and processed by antrea-controller.
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
return c, nil
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(pod string) {
c.egressBindingsMutex.Lock()
defer c.egressBindingsMutex.Unlock()
binding, exists := c.egressBindings[pod]
if !exists {
return
}
c.queue.Add(binding.effectiveEgress)
}

// addEgress processes Egress ADD events.
func (c *EgressController) addEgress(obj interface{}) {
egress := obj.(*crdv1a2.Egress)
Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned"
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ type fakeController struct {
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand All @@ -114,6 +116,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns3", "pod3", 3)
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)

egressController := &EgressController{
ofClient: mockOFClient,
routeClient: mockRouteClient,
Expand All @@ -133,6 +137,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressIPStates: map[string]*egressIPState{},
ipAssigner: mockIPAssigner,
}
podUpdateChannel.Subscribe(egressController.processPodUpdate)
return &fakeController{
EgressController: egressController,
mockController: controller,
Expand All @@ -141,6 +146,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -541,6 +547,51 @@ func TestSyncEgress(t *testing.T) {
}
}

func TestPodUpdateShouldSyncEgress(t *testing.T) {
egress := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Spec: crdv1a2.EgressSpec{EgressIP: fakeLocalEgressIP1},
}
egressGroup := &cpv1b2.EgressGroup{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
GroupMembers: []cpv1b2.GroupMember{
{Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}},
{Pod: &cpv1b2.PodReference{Name: "pendingPod", Namespace: "ns1"}},
},
}
c := newFakeController(t, []runtime.Object{egress})
defer c.mockController.Finish()
stopCh := make(chan struct{})
defer close(stopCh)
go c.podUpdateChannel.Run(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
c.addEgressGroup(egressGroup)
require.Equal(t, 1, c.queue.Len())
item, _ := c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)

c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(10), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
// Mock CNIServer
addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10)
c.podUpdateChannel.Notify("ns1/pendingPod")
require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) {
return c.queue.Len() == 1, nil
}))
item, _ = c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)
}

func TestSyncOverlappingEgress(t *testing.T) {
egress1 := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Expand Down
Loading

0 comments on commit b43ad53

Please sign in to comment.