Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay removal of flow-restore-wait #6342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,20 @@ func run(o *Options) error {
// Processes that enable Pod network should wait for it.
podNetworkWait := utilwait.NewGroup()

// flowRestoreCompleteWait is used to wait until "essential" flows have been installed
// successfully in OVS. These flows include NetworkPolicy flows (guaranteed by
// podNetworkWait), Pod forwarding flows and flows installed by the
// NodeRouteController. Additional requirements may be added in the future.
flowRestoreCompleteWait := utilwait.NewGroup()
// We ensure that flowRestoreCompleteWait.Wait() cannot return until podNetworkWait.Wait()
// returns. This is not strictly necessary because it is guatanteed by the CNIServer Pod
// reconciliation logic but it helps with readability.
flowRestoreCompleteWait.Increment()
go func() {
defer flowRestoreCompleteWait.Done()
podNetworkWait.Wait()
}()

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down Expand Up @@ -299,6 +313,7 @@ func run(o *Options) error {
egressConfig,
serviceConfig,
podNetworkWait,
flowRestoreCompleteWait,
stopCh,
o.nodeType,
o.config.ExternalNode.ExternalNodeNamespace,
Expand Down Expand Up @@ -332,6 +347,7 @@ func run(o *Options) error {
nodeConfig,
agentInitializer.GetWireGuardClient(),
ipsecCertController,
flowRestoreCompleteWait,
)
}

Expand Down Expand Up @@ -596,7 +612,8 @@ func run(o *Options) error {
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
podNetworkWait)
podNetworkWait,
flowRestoreCompleteWait)

err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
Expand Down Expand Up @@ -638,20 +655,6 @@ func run(o *Options) error {
o.enableAntreaProxy)
}

// TODO: we should call this after installing flows for initial node routes
// and initial NetworkPolicies so that no packets will be mishandled.
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}
// ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete
if connectUplinkToBridge {
// Restore network config before shutdown. ovsdbConnection must be alive when restore.
defer agentInitializer.RestoreOVSBridge()
if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}

if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
Expand Down Expand Up @@ -848,6 +851,21 @@ func run(o *Options) error {
go mcStrechedNetworkPolicyController.Run(stopCh)
}

klog.InfoS("Waiting for flow restoration to complete")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tnqn I wasn't sure what would be the best place for this code. I put it here because I think by that point all the controllers that need to install flows have been instantiated, initialized and "started" (even though at the moment, flowRestoreCompleteWait only waits for the NetworkPolicyController + NodeRouteController + CNIServer).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to me. I tried to delay FlowRestoreComplete but was unsure whether moving ConnectUplinkToOVSBridge has any impact. cc @gran-vmv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can run flexible-ipam-e2e multiple times to check if this change can work.

flowRestoreCompleteWait.Wait()
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}
klog.InfoS("Flow restoration has completed")
// ConnectUplinkToOVSBridge must be run immediately after FlowRestoreComplete
if connectUplinkToBridge {
// Restore network config before shutdown. ovsdbConnection must be alive when restore.
defer agentInitializer.RestoreOVSBridge()
if err := agentInitializer.ConnectUplinkToOVSBridge(); err != nil {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats and Multicast stats.
if features.DefaultFeatureGate.Enabled(features.NetworkPolicyStats) {
Expand Down
73 changes: 40 additions & 33 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,14 @@ type Initializer struct {
enableAntreaProxy bool
// podNetworkWait should be decremented once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
podNetworkWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
podNetworkWait *utilwait.Group
// flowRestoreCompleteWait is used to indicate that required flows have
// been installed. We use it to determine whether flows from previous
// rounds can be deleted.
flowRestoreCompleteWait *utilwait.Group
stopCh <-chan struct{}
nodeType config.NodeType
externalNodeNamespace string
}

func NewInitializer(
Expand All @@ -154,6 +158,7 @@ func NewInitializer(
egressConfig *config.EgressConfig,
serviceConfig *config.ServiceConfig,
podNetworkWait *utilwait.Group,
flowRestoreCompleteWait *utilwait.Group,
stopCh <-chan struct{},
nodeType config.NodeType,
externalNodeNamespace string,
Expand All @@ -163,29 +168,30 @@ func NewInitializer(
enableL7FlowExporter bool,
) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
ofClient: ofClient,
routeClient: routeClient,
ovsBridge: ovsBridge,
hostGateway: hostGateway,
mtu: mtu,
networkConfig: networkConfig,
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
podNetworkWait: podNetworkWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
connectUplinkToBridge: connectUplinkToBridge,
enableAntreaProxy: enableAntreaProxy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableL7FlowExporter: enableL7FlowExporter,
ovsBridgeClient: ovsBridgeClient,
ovsCtlClient: ovsCtlClient,
client: k8sClient,
crdClient: crdClient,
ifaceStore: ifaceStore,
ofClient: ofClient,
routeClient: routeClient,
ovsBridge: ovsBridge,
hostGateway: hostGateway,
mtu: mtu,
networkConfig: networkConfig,
wireGuardConfig: wireGuardConfig,
egressConfig: egressConfig,
serviceConfig: serviceConfig,
l7NetworkPolicyConfig: &config.L7NetworkPolicyConfig{},
podNetworkWait: podNetworkWait,
flowRestoreCompleteWait: flowRestoreCompleteWait,
stopCh: stopCh,
nodeType: nodeType,
externalNodeNamespace: externalNodeNamespace,
connectUplinkToBridge: connectUplinkToBridge,
enableAntreaProxy: enableAntreaProxy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableL7FlowExporter: enableL7FlowExporter,
}
}

Expand Down Expand Up @@ -529,13 +535,14 @@ func (i *Initializer) initOpenFlowPipeline() error {
// the new round number), otherwise we would disrupt the dataplane. Unfortunately,
// the time required for convergence may be large and there is no simple way to
// determine when is a right time to perform the cleanup task.
// TODO: introduce a deterministic mechanism through which the different entities
// responsible for installing flows can notify the agent that this deletion
// operation can take place. A waitGroup can be created here and notified when
// full sync in agent networkpolicy controller is complete. This would signal NP
// flows have been synced once. Other mechanisms are still needed for node flows
// fullSync check.
// We took a first step towards introducing a deterministic mechanism through which
// the different entities responsible for installing flows can notify the agent that
// this deletion operation can take place. i.flowRestoreCompleteWait.Wait() will
// block until some key flows (NetworkPolicy flows, Pod flows, Node route flows)
// have been installed. But not all entities responsible for installing flows
// currently use this wait group, so we block for a minimum of 10 seconds.
time.Sleep(10 * time.Second)
i.flowRestoreCompleteWait.Wait()
klog.Info("Deleting stale flows from previous round if any")
if err := i.ofClient.DeleteStaleFlows(); err != nil {
klog.Errorf("Error when deleting stale flows from previous round: %v", err)
Expand Down
13 changes: 11 additions & 2 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"strings"
"sync"

current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
Expand Down Expand Up @@ -404,7 +405,7 @@ func parsePrevResult(conf *types.NetworkConfig) error {
return nil
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait *wait.Group) error {
func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *containerAccessArbitrator, podNetworkWait, flowRestoreCompleteWait *wait.Group) error {
// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.New[string]()
Expand All @@ -413,6 +414,8 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// knownInterfaces is the list of interfaces currently in the local cache.
knownInterfaces := pc.ifaceStore.GetInterfacesByType(interfacestore.ContainerInterface)

var podWg sync.WaitGroup

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
Expand All @@ -436,7 +439,9 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
missingIfConfigs = append(missingIfConfigs, containerConfig)
continue
}
podWg.Add(1)
go func(containerID, pod, namespace string) {
defer podWg.Done()
// Do not install Pod flows until all preconditions are met.
podNetworkWait.Wait()
// To avoid race condition with CNIServer CNI event handlers.
Expand All @@ -452,7 +457,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
klog.InfoS("Syncing Pod interface", "Pod", klog.KRef(namespace, name), "iface", containerConfig.InterfaceName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IPs,
Expand All @@ -473,6 +478,10 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// interface should no longer be in store after the call to removeInterfaces
}
}
go func() {
defer flowRestoreCompleteWait.Done()
podWg.Wait()
}()
if len(missingIfConfigs) > 0 {
pc.reconcileMissingPods(missingIfConfigs, containerAccess)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type CNIServer struct {
networkConfig *config.NetworkConfig
// podNetworkWait notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
podNetworkWait *wait.Group
// flowRestoreCompleteWait will be decremented and Pod reconciliation is completed.
flowRestoreCompleteWait *wait.Group
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -630,7 +632,7 @@ func New(
routeClient route.Interface,
isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool,
networkConfig *config.NetworkConfig,
podNetworkWait *wait.Group,
podNetworkWait, flowRestoreCompleteWait *wait.Group,
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -646,6 +648,7 @@ func New(
enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM,
networkConfig: networkConfig,
podNetworkWait: podNetworkWait,
flowRestoreCompleteWait: flowRestoreCompleteWait.Increment(),
}
}

Expand Down Expand Up @@ -748,7 +751,7 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse,
// installing Pod flows, so as part of this reconciliation process we retrieve the Pod list from the
// K8s apiserver and replay the necessary flows.
func (s *CNIServer) reconcile() error {
klog.InfoS("Reconciliation for CNI server")
klog.InfoS("Starting reconciliation for CNI server")
// For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from
// the watch cache in kube-apiserver.
pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
Expand All @@ -759,7 +762,7 @@ func (s *CNIServer) reconcile() error {
return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err)
}

return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait)
return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait)
}

func init() {
Expand Down
11 changes: 6 additions & 5 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,11 +761,12 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[

func newCNIServer(t *testing.T) *CNIServer {
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
podNetworkWait: wait.NewGroup(),
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
podNetworkWait: wait.NewGroup(),
flowRestoreCompleteWait: wait.NewGroup().Increment(),
}
cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450}
return cniServer
Expand Down
Loading
Loading