Skip to content

Commit

Permalink
Make iptables initialization error non fatal
Browse files Browse the repository at this point in the history
In large scale clusters, xtables lock may be hold by kubelet/
kube-proxy/ portmap for a long time, especially when there are many
service rules in nat table. antrea-agent may not be able to acquire the
lock in short time. If the agent blocks on the lock or quit itself, the
CNI server won't be running, causing all CNI requests to fail.  If the
Pods' restart policy is Always and there are dead Pods, container
runtime will keep retrying calling CNIs, during which portmap is called
first, leading to more xtables lock competitor.

This patch makes iptables initialization error non fatal and uses a
goroutine to retry it until success. The agent will start the CNI server
anyway and handle the CNI Del requests but won't handle CNI Add requests
until the network is ready.
  • Loading branch information
tnqn committed Nov 5, 2020
1 parent 714af4d commit 2293e4c
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 44 deletions.
7 changes: 6 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func run(o *Options) error {
// Create an ifaceStore that caches network interfaces managed by this node.
ifaceStore := interfacestore.NewInterfaceStore()

// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -127,6 +130,7 @@ func run(o *Options) error {
o.config.DefaultMTU,
serviceCIDRNet,
networkConfig,
networkReadyCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -188,7 +192,8 @@ func run(o *Options) error {
k8sClient,
podUpdates,
isChaining,
routeClient)
routeClient,
networkReadyCh)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, o.config.OVSDatapathType)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"os"
"strconv"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
Expand Down Expand Up @@ -67,6 +68,9 @@ type Initializer struct {
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
enableProxy bool
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
}

func NewInitializer(
Expand All @@ -80,6 +84,7 @@ func NewInitializer(
mtu int,
serviceCIDR *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -92,6 +97,7 @@ func NewInitializer(
mtu: mtu,
serviceCIDR: serviceCIDR,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -195,6 +201,8 @@ func (i *Initializer) initInterfaceStore() error {
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
// wg is used to wait for the asynchronous initialization.
var wg sync.WaitGroup

if err := i.initNodeLocalConfig(); err != nil {
return err
Expand All @@ -216,14 +224,20 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.routeClient.Initialize(i.nodeConfig); err != nil {
wg.Add(1)
if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil {
return err
}

if err := i.setupExternalConnectivity(); err != nil {
return err
}

// The Node's network is ready only when both synchronous and asynchronous initialization are done.
go func() {
wg.Wait()
close(i.networkReadyCh)
}()
klog.Infof("Agent initialized NodeConfig=%v, NetworkConfig=%v", i.nodeConfig, i.networkConfig)
return nil
}
Expand Down
35 changes: 26 additions & 9 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"strings"
"sync"
"time"

cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
Expand All @@ -44,6 +45,10 @@ import (
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
)

const (
networkReadyTimeout = 30 * time.Second
)

// containerAccessArbitrator is used to ensure that concurrent goroutines cannot perfom operations
// on the same containerID. Other parts of the code make this assumption (in particular the
// InstallPodFlows / UninstallPodFlows methods of the OpenFlow client, which are invoked
Expand Down Expand Up @@ -100,6 +105,8 @@ type CNIServer struct {
podUpdates chan<- v1beta2.PodReference
isChaining bool
routeClient route.Interface
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -348,6 +355,13 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return response, nil
}

select {
case <-time.After(networkReadyTimeout):
klog.Errorf("Cannot process CmdAdd request for container %v because network is not ready", cniConfig.ContainerId)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}

cniVersion := cniConfig.CNIVersion
result := &current.Result{CNIVersion: cniVersion}
netNS := s.hostNetNsPath(cniConfig.Netns)
Expand All @@ -358,12 +372,12 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Rollback to delete configurations once ADD is failure.
if !success {
if isInfraContainer {
klog.Warningf("CmdAdd has failed, and try to rollback")
klog.Warningf("CmdAdd for container %v failed, and try to rollback", cniConfig.ContainerId)
if _, err := s.CmdDel(ctx, request); err != nil {
klog.Warningf("Failed to rollback after CNI add failure: %v", err)
}
} else {
klog.Warningf("CmdAdd has failed")
klog.Warningf("CmdAdd for container %v failed", cniConfig.ContainerId)
}
}
}()
Expand Down Expand Up @@ -392,11 +406,11 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Request IP Address from IPAM driver.
ipamResult, err = ipam.ExecIPAMAdd(cniConfig.CniCmdArgs, cniConfig.IPAM.Type, infraContainer)
if err != nil {
klog.Errorf("Failed to add IP addresses from IPAM driver: %v", err)
klog.Errorf("Failed to request IP addresses for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}
}
klog.Infof("Added ip addresses from IPAM driver, %v", ipamResult)
klog.Infof("Requested ip addresses for container %v: %v", cniConfig.ContainerId, ipamResult)
result.IPs = ipamResult.IPs
result.Routes = ipamResult.Routes
// Ensure interface gateway setting and mapping relations between result.Interfaces and result.IPs
Expand Down Expand Up @@ -425,7 +439,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*

var resultBytes bytes.Buffer
_ = result.PrintTo(&resultBytes)
klog.Infof("CmdAdd succeeded")
klog.Infof("CmdAdd for container %v succeeded", cniConfig.ContainerId)
// mark success as true to avoid rollback
success = true
return &cnipb.CniCmdResponse{CniResult: resultBytes.Bytes()}, nil
Expand All @@ -449,15 +463,16 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
}
// Release IP to IPAM driver
if err := ipam.ExecIPAMDelete(cniConfig.CniCmdArgs, cniConfig.IPAM.Type, infraContainer); err != nil {
klog.Errorf("Failed to delete IP addresses by IPAM driver: %v", err)
klog.Errorf("Failed to delete IP addresses for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}
klog.Info("Deleted IP addresses by IPAM driver")
klog.Infof("Deleted IP addresses for container %v", cniConfig.ContainerId)
// Remove host interface and OVS configuration
if err := s.podConfigurator.removeInterfaces(cniConfig.ContainerId); err != nil {
klog.Errorf("Failed to remove interfaces for container %s: %v", cniConfig.ContainerId, err)
return s.configInterfaceFailureResponse(err), nil
}
klog.Infof("CmdDel for container %v succeeded", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand All @@ -479,7 +494,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
}

if err := ipam.ExecIPAMCheck(cniConfig.CniCmdArgs, cniConfig.IPAM.Type); err != nil {
klog.Errorf("Failed to check IPAM configuration: %v", err)
klog.Errorf("Failed to check IPAM configuration for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}

Expand All @@ -491,7 +506,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
return response, nil
}
}
klog.Info("Succeed to check network configuration")
klog.Infof("CmdCheck for container %v succeeded", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand All @@ -502,6 +517,7 @@ func New(
podUpdates chan<- v1beta2.PodReference,
isChaining bool,
routeClient route.Interface,
networkReadyCh <-chan struct{},
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -514,6 +530,7 @@ func New(
podUpdates: podUpdates,
isChaining: isChaining,
routeClient: routeClient,
networkReadyCh: networkReadyCh,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,15 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[
}

func newCNIServer(t *testing.T) *CNIServer {
networkReadyCh := make(chan struct{})
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
networkReadyCh: networkReadyCh,
}
close(networkReadyCh)
cniServer.supportedCNIVersions = buildVersionSet()
return cniServer
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type Interface interface {
// Initialize should initialize all infrastructures required to route container packets in host network.
// It should be idempotent and can be safely called on every startup.
Initialize(nodeConfig *config.NodeConfig) error
Initialize(nodeConfig *config.NodeConfig, done func()) error

// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs.
Reconcile(podCIDRs []string) error
Expand Down
22 changes: 18 additions & 4 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"sync"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN

// Initialize initializes all infrastructures required to route container packets in host network.
// It is idempotent and can be safely called on every startup.
func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig

// Sets up the ipset that will be used in iptables.
Expand All @@ -91,9 +92,8 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
}

// Sets up the iptables infrastructure required to route packets in host network.
if err := c.initIPTables(); err != nil {
return fmt.Errorf("failed to initialize iptables: %v", err)
}
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -103,6 +103,20 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
for {
time.Sleep(2 * time.Second)
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v", err)
continue
}
return
}
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN

// Initialize sets nodeConfig on Window.
// Service LoadBalancing is provided by OpenFlow.
func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig
if err := c.initFwRules(); err != nil {
return err
Expand All @@ -70,6 +70,7 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
if err := util.EnableIPForwarding(nodeConfig.GatewayConfig.Name); err != nil {
return err
}
done()
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/route/route_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func TestRouteOperation(t *testing.T) {
LinkIndex: gwLink,
},
}
err = client.Initialize(nodeConfig)
called := false
err = client.Initialize(nodeConfig, func() { called = true })
require.Nil(t, err)
require.True(called)

// Add initial routes.
err = client.AddRoutes(destCIDR1, peerNodeIP1, gwIP1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/agent/util/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *Client) Restore(data []byte, flush bool) error {
if c.restoreWaitSupported {
cmd.Args = append(cmd.Args, "-w", strconv.Itoa(waitSeconds), "-W", strconv.Itoa(waitIntervalMicroSeconds))
} else {
unlockFunc, err := lock(xtablesLockFilePath, waitSeconds*time.Second)
unlockFunc, err := Lock(XtablesLockFilePath, waitSeconds*time.Second)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/util/iptables/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
)

const (
xtablesLockFilePath = "/var/run/xtables.lock"
XtablesLockFilePath = "/var/run/xtables.lock"
xtablesLockFilePermission = 0600
)

// lock acquires the provided file lock. It's thread-safe.
// It will block until the lock is acquired or the timeout is reached.
func lock(lockFilePath string, timeout time.Duration) (func() error, error) {
func Lock(lockFilePath string, timeout time.Duration) (func() error, error) {
lockFile, err := os.OpenFile(lockFilePath, os.O_CREATE, xtablesLockFilePermission)
if err != nil {
return nil, fmt.Errorf("error opening xtables lock file: %v", err)
Expand Down
Loading

0 comments on commit 2293e4c

Please sign in to comment.