Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make iptables initialization error non fatal #1497

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func run(o *Options) error {
// Create an ifaceStore that caches network interfaces managed by this node.
ifaceStore := interfacestore.NewInterfaceStore()

// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -127,6 +130,7 @@ func run(o *Options) error {
o.config.DefaultMTU,
serviceCIDRNet,
networkConfig,
networkReadyCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -188,7 +192,8 @@ func run(o *Options) error {
k8sClient,
podUpdates,
isChaining,
routeClient)
routeClient,
networkReadyCh)
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, o.config.OVSDatapathType)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"os"
"strconv"
"sync"
"time"

"github.com/containernetworking/plugins/pkg/ip"
Expand Down Expand Up @@ -66,6 +67,9 @@ type Initializer struct {
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
enableProxy bool
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
}

func NewInitializer(
Expand All @@ -79,6 +83,7 @@ func NewInitializer(
mtu int,
serviceCIDR *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -91,6 +96,7 @@ func NewInitializer(
mtu: mtu,
serviceCIDR: serviceCIDR,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -194,6 +200,8 @@ func (i *Initializer) initInterfaceStore() error {
// Initialize sets up agent initial configurations.
func (i *Initializer) Initialize() error {
klog.Info("Setting up node network")
// wg is used to wait for the asynchronous initialization.
var wg sync.WaitGroup

if err := i.initNodeLocalConfig(); err != nil {
return err
Expand All @@ -215,14 +223,20 @@ func (i *Initializer) Initialize() error {
return err
}

if err := i.routeClient.Initialize(i.nodeConfig); err != nil {
wg.Add(1)
if err := i.routeClient.Initialize(i.nodeConfig, wg.Done); err != nil {
return err
}

if err := i.setupExternalConnectivity(); err != nil {
return err
}

// The Node's network is ready only when both synchronous and asynchronous initialization are done.
go func() {
wg.Wait()
close(i.networkReadyCh)
}()
klog.Infof("Agent initialized NodeConfig=%v, NetworkConfig=%v", i.nodeConfig, i.networkConfig)
return nil
}
Expand Down
42 changes: 32 additions & 10 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"strings"
"sync"
"time"

cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
Expand All @@ -44,6 +45,15 @@ import (
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
)

const (
// networkReadyTimeout is the maximum time the CNI server will wait for network ready when processing CNI Add
// requests. If timeout occurs, tryAgainLaterResponse will be returned.
// The default runtime request timeout of kubelet is 2 minutes.
// https://github.com/kubernetes/kubernetes/blob/v1.19.3/staging/src/k8s.io/kubelet/config/v1beta1/types.go#L451
// networkReadyTimeout is set to a shorter time so it returns a clear message to the runtime.
networkReadyTimeout = 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the default kubelet / container runtime timeout for CNI Add? could you add the info as a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

// containerAccessArbitrator is used to ensure that concurrent goroutines cannot perfom operations
// on the same containerID. Other parts of the code make this assumption (in particular the
// InstallPodFlows / UninstallPodFlows methods of the OpenFlow client, which are invoked
Expand Down Expand Up @@ -100,6 +110,8 @@ type CNIServer struct {
podUpdates chan<- v1beta2.PodReference
isChaining bool
routeClient route.Interface
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
}

var supportedCNIVersionSet map[string]bool
Expand Down Expand Up @@ -184,7 +196,7 @@ func (s *CNIServer) loadNetworkConfig(request *cnipb.CniCmdRequest) (*CNIConfig,
if cniConfig.MTU == 0 {
cniConfig.MTU = s.nodeConfig.NodeMTU
}
klog.Infof("Load network configurations: %v", cniConfig)
klog.V(3).Infof("Load network configurations: %v", cniConfig)
return cniConfig, nil
}

Expand Down Expand Up @@ -348,6 +360,13 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
return response, nil
}

select {
case <-time.After(networkReadyTimeout):
klog.Errorf("Cannot process CmdAdd request for container %v because network is not ready", cniConfig.ContainerId)
return s.tryAgainLaterResponse(), nil
case <-s.networkReadyCh:
}

cniVersion := cniConfig.CNIVersion
result := &current.Result{CNIVersion: cniVersion}
netNS := s.hostNetNsPath(cniConfig.Netns)
Expand All @@ -358,12 +377,12 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Rollback to delete configurations once ADD is failure.
if !success {
if isInfraContainer {
klog.Warningf("CmdAdd has failed, and try to rollback")
klog.Warningf("CmdAdd for container %v failed, and try to rollback", cniConfig.ContainerId)
if _, err := s.CmdDel(ctx, request); err != nil {
klog.Warningf("Failed to rollback after CNI add failure: %v", err)
}
} else {
klog.Warningf("CmdAdd has failed")
klog.Warningf("CmdAdd for container %v failed", cniConfig.ContainerId)
}
}
}()
Expand Down Expand Up @@ -392,11 +411,11 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// Request IP Address from IPAM driver.
ipamResult, err = ipam.ExecIPAMAdd(cniConfig.CniCmdArgs, cniConfig.IPAM.Type, infraContainer)
if err != nil {
klog.Errorf("Failed to add IP addresses from IPAM driver: %v", err)
klog.Errorf("Failed to request IP addresses for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}
}
klog.Infof("Added ip addresses from IPAM driver, %v", ipamResult)
klog.Infof("Requested ip addresses for container %v: %v", cniConfig.ContainerId, ipamResult)
result.IPs = ipamResult.IPs
result.Routes = ipamResult.Routes
// Ensure interface gateway setting and mapping relations between result.Interfaces and result.IPs
Expand Down Expand Up @@ -425,7 +444,7 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*

var resultBytes bytes.Buffer
_ = result.PrintTo(&resultBytes)
klog.Infof("CmdAdd succeeded")
klog.Infof("CmdAdd for container %v succeeded", cniConfig.ContainerId)
// mark success as true to avoid rollback
success = true
return &cnipb.CniCmdResponse{CniResult: resultBytes.Bytes()}, nil
Expand All @@ -449,15 +468,16 @@ func (s *CNIServer) CmdDel(_ context.Context, request *cnipb.CniCmdRequest) (
}
// Release IP to IPAM driver
if err := ipam.ExecIPAMDelete(cniConfig.CniCmdArgs, cniConfig.IPAM.Type, infraContainer); err != nil {
klog.Errorf("Failed to delete IP addresses by IPAM driver: %v", err)
klog.Errorf("Failed to delete IP addresses for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}
klog.Info("Deleted IP addresses by IPAM driver")
klog.Infof("Deleted IP addresses for container %v", cniConfig.ContainerId)
// Remove host interface and OVS configuration
if err := s.podConfigurator.removeInterfaces(cniConfig.ContainerId); err != nil {
klog.Errorf("Failed to remove interfaces for container %s: %v", cniConfig.ContainerId, err)
return s.configInterfaceFailureResponse(err), nil
}
klog.Infof("CmdDel for container %v succeeded", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand All @@ -479,7 +499,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
}

if err := ipam.ExecIPAMCheck(cniConfig.CniCmdArgs, cniConfig.IPAM.Type); err != nil {
klog.Errorf("Failed to check IPAM configuration: %v", err)
klog.Errorf("Failed to check IPAM configuration for container %v: %v", cniConfig.ContainerId, err)
return s.ipamFailureResponse(err), nil
}

Expand All @@ -491,7 +511,7 @@ func (s *CNIServer) CmdCheck(_ context.Context, request *cnipb.CniCmdRequest) (
return response, nil
}
}
klog.Info("Succeed to check network configuration")
klog.Infof("CmdCheck for container %v succeeded", cniConfig.ContainerId)
return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand All @@ -502,6 +522,7 @@ func New(
podUpdates chan<- v1beta2.PodReference,
isChaining bool,
routeClient route.Interface,
networkReadyCh <-chan struct{},
) *CNIServer {
return &CNIServer{
cniSocket: cniSocket,
Expand All @@ -514,6 +535,7 @@ func New(
podUpdates: podUpdates,
isChaining: isChaining,
routeClient: routeClient,
networkReadyCh: networkReadyCh,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/cniserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,15 @@ func translateRawPrevResult(prevResult *current.Result, cniVersion string) (map[
}

func newCNIServer(t *testing.T) *CNIServer {
networkReadyCh := make(chan struct{})
cniServer := &CNIServer{
cniSocket: testSocket,
nodeConfig: testNodeConfig,
serverVersion: cni.AntreaCNIVersion,
containerAccess: newContainerAccessArbitrator(),
networkReadyCh: networkReadyCh,
}
close(networkReadyCh)
cniServer.supportedCNIVersions = buildVersionSet()
return cniServer
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type Interface interface {
// Initialize should initialize all infrastructures required to route container packets in host network.
// It should be idempotent and can be safely called on every startup.
Initialize(nodeConfig *config.NodeConfig) error
Initialize(nodeConfig *config.NodeConfig, done func()) error

// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs.
Reconcile(podCIDRs []string) error
Expand Down
24 changes: 20 additions & 4 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"sync"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN

// Initialize initializes all infrastructures required to route container packets in host network.
// It is idempotent and can be safely called on every startup.
func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig

// Sets up the ipset that will be used in iptables.
Expand All @@ -91,9 +92,8 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
}

// Sets up the iptables infrastructure required to route packets in host network.
if err := c.initIPTables(); err != nil {
return fmt.Errorf("failed to initialize iptables: %v", err)
}
// It's called in a goroutine because xtables lock may not be acquired immediately.
go c.initIPTablesOnce(done)

// Sets up the IP routes and IP rule required to route packets in host network.
if err := c.initIPRoutes(); err != nil {
Expand All @@ -103,6 +103,22 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
return nil
}

// initIPTablesOnce starts a loop that initializes the iptables infrastructure.
// It returns after one successful execution.
func (c *Client) initIPTablesOnce(done func()) {
defer done()
backoffTime := 2 * time.Second
for {
if err := c.initIPTables(); err != nil {
klog.Errorf("Failed to initialize iptables: %v - will retry in %v", err, backoffTime)
time.Sleep(backoffTime)
continue
}
klog.Info("Initialized iptables")
return
}
}

// initIPSet ensures that the required ipset exists and it has the initial members.
func (c *Client) initIPSet() error {
// In policy-only mode, Node Pod CIDR is undefined.
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/route/route_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewClient(serviceCIDR *net.IPNet, networkConfig *config.NetworkConfig, noSN

// Initialize sets nodeConfig on Window.
// Service LoadBalancing is provided by OpenFlow.
func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
func (c *Client) Initialize(nodeConfig *config.NodeConfig, done func()) error {
c.nodeConfig = nodeConfig
if err := c.initFwRules(); err != nil {
return err
Expand All @@ -70,6 +70,7 @@ func (c *Client) Initialize(nodeConfig *config.NodeConfig) error {
if err := util.EnableIPForwarding(nodeConfig.GatewayConfig.Name); err != nil {
return err
}
done()
return nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/route/route_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ func TestRouteOperation(t *testing.T) {
LinkIndex: gwLink,
},
}
err = client.Initialize(nodeConfig)
called := false
err = client.Initialize(nodeConfig, func() { called = true })
require.Nil(t, err)
require.True(t, called)

// Add initial routes.
err = client.AddRoutes(destCIDR1, peerNodeIP1, gwIP1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/route/testing/mock_route.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/agent/util/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Client) Restore(data []byte, flush bool) error {
if c.restoreWaitSupported {
cmd.Args = append(cmd.Args, "-w", strconv.Itoa(waitSeconds), "-W", strconv.Itoa(waitIntervalMicroSeconds))
} else {
unlockFunc, err := lock(xtablesLockFilePath, waitSeconds*time.Second)
unlockFunc, err := Lock(XtablesLockFilePath, waitSeconds*time.Second)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/util/iptables/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
)

const (
xtablesLockFilePath = "/var/run/xtables.lock"
XtablesLockFilePath = "/var/run/xtables.lock"
xtablesLockFilePermission = 0600
)

// lock acquires the provided file lock. It's thread-safe.
// Lock acquires the provided file lock. It's thread-safe.
// It will block until the lock is acquired or the timeout is reached.
func lock(lockFilePath string, timeout time.Duration) (func() error, error) {
func Lock(lockFilePath string, timeout time.Duration) (func() error, error) {
lockFile, err := os.OpenFile(lockFilePath, os.O_CREATE, xtablesLockFilePermission)
if err != nil {
return nil, fmt.Errorf("error opening xtables lock file: %v", err)
Expand Down
Loading