Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realize Egress for a Pod once its network is created #3360

Merged
merged 1 commit into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
"antrea.io/antrea/pkg/agent/stats"
"antrea.io/antrea/pkg/agent/types"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/controller/externalippool"
"antrea.io/antrea/pkg/features"
Expand All @@ -62,6 +61,7 @@ import (
ofconfig "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/cipher"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/version"
Expand Down Expand Up @@ -258,10 +258,10 @@ func run(o *Options) error {
}
}

// entityUpdates is a channel for receiving entity updates from CNIServer and
// notifying NetworkPolicyController to reconcile rules related to the
// updated entities.
entityUpdates := make(chan types.EntityReference, 100)
// podUpdateChannel is a channel for receiving Pod updates from CNIServer and
// notifying NetworkPolicyController and EgressController to reconcile rules
// related to the updated Pods.
podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)
// We set flow poll interval as the time interval for rule deletion in the async
// rule cache, which is implemented as part of the idAllocator. This is to preserve
// the rule info for populating NetworkPolicy fields in the Flow Exporter even
Expand All @@ -279,7 +279,7 @@ func run(o *Options) error {
ofClient,
ifaceStore,
nodeConfig.Name,
entityUpdates,
podUpdateChannel,
groupCounters,
groupIDUpdates,
antreaPolicyEnabled,
Expand Down Expand Up @@ -329,7 +329,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer,
memberlistCluster, egressInformer, podUpdateChannel,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down Expand Up @@ -367,12 +367,12 @@ func run(o *Options) error {
var cniPodInfoStore cnipodcache.CNIPodInfoStore
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, cniPodInfoStore)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, entityUpdates, nil)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
}
Expand Down Expand Up @@ -461,6 +461,8 @@ func run(o *Options) error {

log.StartLogFileNumberMonitor(stopCh)

go podUpdateChannel.Run(stopCh)

go routeClient.Run(stopCh)

go cniServer.Run(stopCh)
Expand Down
31 changes: 14 additions & 17 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -63,9 +62,9 @@ type podConfigurator struct {
ifaceStore interfacestore.InterfaceStore
gatewayMAC net.HardwareAddr
ifConfigurator *ifConfigurator
// entityUpdates is a channel for notifying updates of local endpoints / entities (most notably Pod)
// to other components which may benefit from this information, i.e NetworkPolicyController.
entityUpdates chan<- types.EntityReference
// podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this
// information, i.e. NetworkPolicyController, EgressController.
podUpdateNotifier channel.Notifier
// consumed by secondary network creation.
podInfoStore cnipodcache.CNIPodInfoStore
}
Expand All @@ -78,22 +77,22 @@ func newPodConfigurator(
gatewayMAC net.HardwareAddr,
ovsDatapathType ovsconfig.OVSDatapathType,
isOvsHardwareOffloadEnabled bool,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled)
if err != nil {
return nil, err
}
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
entityUpdates: entityUpdates,
podInfoStore: podInfoStore,
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
routeClient: routeClient,
ifaceStore: ifaceStore,
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podInfoStore: podInfoStore,
}, nil
}

Expand Down Expand Up @@ -486,9 +485,7 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
// Add containerConfig into local cache
pc.ifaceStore.AddInterface(containerConfig)
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: containerConfig.PodName, Namespace: containerConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName))
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/util/k8s"
)

Expand All @@ -51,9 +49,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
pc.entityUpdates <- types.EntityReference{
Pod: &v1beta2.PodReference{Name: ifConfig.PodName, Namespace: ifConfig.PodNamespace},
}
pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName))
return nil
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -589,7 +589,7 @@ func (s *CNIServer) Initialize(
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
entityUpdates chan<- types.EntityReference,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) error {
var err error
Expand All @@ -602,7 +602,7 @@ func (s *CNIServer) Initialize(

s.podConfigurator, err = newPodConfigurator(
ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), entityUpdates,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(), podUpdateNotifier,
podInfoStore,
)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
antreatypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
"antrea.io/antrea/pkg/util/channel"
)

const (
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := ""
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand All @@ -411,7 +411,7 @@ func TestValidatePrevResult(t *testing.T) {
cniConfig.Netns = "invalid_netns"
sriovVFDeviceID := "0000:03:00.6"
prevResult.Interfaces = []*current.Interface{hostIface, containerIface}
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, make(chan antreatypes.EntityReference, 100), nil)
cniServer.podConfigurator, _ = newPodConfigurator(nil, nil, nil, nil, nil, "", true, channel.NewSubscribableChannel("PodUpdate", 100), nil)
response := cniServer.validatePrevResult(cniConfig.CniCmdArgs, k8sPodArgs, prevResult, sriovVFDeviceID)
checkErrorResponse(t, response, cnipb.ErrorCode_CHECK_INTERFACE_FAILURE, "")
})
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestRemoveInterface(t *testing.T) {
ifaceStore := interfacestore.NewInterfaceStore()
routeMock := routetest.NewMockInterface(controller)
gwMAC, _ := net.ParseMAC("00:00:11:11:11:11")
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, make(chan antreatypes.EntityReference, 100), nil)
podConfigurator, err := newPodConfigurator(mockOVSBridgeClient, mockOFClient, routeMock, ifaceStore, gwMAC, "system", false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
require.Nil(t, err, "No error expected in podConfigurator constructor")

containerMAC, _ := net.ParseMAC("aa:bb:cc:dd:ee:ff")
Expand Down
19 changes: 17 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
Expand All @@ -48,6 +47,7 @@ import (
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2"
crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2"
"antrea.io/antrea/pkg/controller/metrics"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -154,7 +154,7 @@ func NewEgressController(
nodeTransportInterface string,
cluster *memberlist.Cluster,
egressInformer crdinformers.EgressInformer,
nodeInformer coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
) (*EgressController, error) {
c := &EgressController{
ofClient: ofClient,
Expand Down Expand Up @@ -207,11 +207,26 @@ func NewEgressController(
},
resyncPeriod,
)
// Subscribe Pod update events from CNIServer to enforce Egress earlier, instead of waiting for their IPs are
// reported to kube-apiserver and processed by antrea-controller.
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
return c, nil
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(pod string) {
c.egressBindingsMutex.Lock()
defer c.egressBindingsMutex.Unlock()
binding, exists := c.egressBindings[pod]
if !exists {
return
}
c.queue.Add(binding.effectiveEgress)
}

// addEgress processes Egress ADD events.
func (c *EgressController) addEgress(obj interface{}) {
egress := obj.(*crdv1a2.Egress)
Expand Down
51 changes: 51 additions & 0 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"antrea.io/antrea/pkg/client/clientset/versioned"
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ type fakeController struct {
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand All @@ -114,6 +116,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns3", "pod3", 3)
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)

egressController := &EgressController{
ofClient: mockOFClient,
routeClient: mockRouteClient,
Expand All @@ -133,6 +137,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressIPStates: map[string]*egressIPState{},
ipAssigner: mockIPAssigner,
}
podUpdateChannel.Subscribe(egressController.processPodUpdate)
return &fakeController{
EgressController: egressController,
mockController: controller,
Expand All @@ -141,6 +146,7 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -541,6 +547,51 @@ func TestSyncEgress(t *testing.T) {
}
}

func TestPodUpdateShouldSyncEgress(t *testing.T) {
egress := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Spec: crdv1a2.EgressSpec{EgressIP: fakeLocalEgressIP1},
}
egressGroup := &cpv1b2.EgressGroup{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
GroupMembers: []cpv1b2.GroupMember{
{Pod: &cpv1b2.PodReference{Name: "pod1", Namespace: "ns1"}},
{Pod: &cpv1b2.PodReference{Name: "pendingPod", Namespace: "ns1"}},
},
}
c := newFakeController(t, []runtime.Object{egress})
defer c.mockController.Finish()
stopCh := make(chan struct{})
defer close(stopCh)
go c.podUpdateChannel.Run(stopCh)
c.crdInformerFactory.Start(stopCh)
c.crdInformerFactory.WaitForCacheSync(stopCh)

c.mockOFClient.EXPECT().InstallSNATMarkFlows(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(1), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockRouteClient.EXPECT().AddSNATRule(net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
c.addEgressGroup(egressGroup)
require.Equal(t, 1, c.queue.Len())
item, _ := c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)

c.mockOFClient.EXPECT().InstallPodSNATFlows(uint32(10), net.ParseIP(fakeLocalEgressIP1), uint32(1))
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
// Mock CNIServer
addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10)
c.podUpdateChannel.Notify("ns1/pendingPod")
require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) {
return c.queue.Len() == 1, nil
}))
item, _ = c.queue.Get()
require.Equal(t, egress.Name, item)
require.NoError(t, c.syncEgress(item.(string)))
c.queue.Done(item)
}

func TestSyncOverlappingEgress(t *testing.T) {
egress1 := &crdv1a2.Egress{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Expand Down
Loading