Skip to content

Commit

Permalink
eni: Fix unexpected IP release when agent restarts
Browse files Browse the repository at this point in the history
When cilium agent in eni mode restarts, ciliumnode custom resource
is cleared and refilled by several updates. Specifically,
Status.IPAM.Used map which holds all used IPs is first updated
to an empty map before endpoints finish restoration.

This becomes critical if `--aws-release-excess-ips` is enabled
since cilium operator treats empty IPAM.Used map as no address used
hence releases addresses arbitraryly, causing restored endpoints
disconnected.

This patch fixes this by combining per endpoint update requests
into one update request after all endpoint restoration finishes so
that Status.IPAM.Used keeps the desired state during agent restart.

Signed-off-by: Jaff Cheng <jaff.cheng.sh@gmail.com>
  • Loading branch information
jaffcheng authored and tgraf committed Mar 24, 2020
1 parent 0dbc05b commit 91071da
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 39 deletions.
11 changes: 11 additions & 0 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,17 @@ func NewDaemon(ctx context.Context, dp datapath.Datapath) (*Daemon, *endpointRes

d.nodeDiscovery.StartDiscovery(node.GetName())

// Trigger refresh and update custom resource in the apiserver with all restored endpoints.
// Trigger after nodeDiscovery.StartDiscovery to avoid custom resource update conflict.
if option.Config.IPAM == option.IPAMCRD || option.Config.IPAM == option.IPAMENI || option.Config.IPAM == option.IPAMAzure {
if option.Config.EnableIPv6 {
d.ipam.IPv6Allocator.RestoreFinished()
}
if option.Config.EnableIPv4 {
d.ipam.IPv4Allocator.RestoreFinished()
}
}

// This needs to be done after the node addressing has been configured
// as the node address is required as suffix.
// well known identities have already been initialized above.
Expand Down
8 changes: 4 additions & 4 deletions daemon/cmd/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (d *Daemon) allocateDatapathIPs(family datapath.NodeAddressingFamily) (rout
// endpoints have been regenerated.
routerIP = family.Router()
if routerIP != nil {
err = d.ipam.AllocateIP(routerIP, "router")
err = d.ipam.AllocateIPWithoutSyncUpstream(routerIP, "router")
if err != nil {
log.Warningf("Router IP could not be re-allocated. Need to re-allocate. This will cause brief network disruption")

Expand All @@ -197,7 +197,7 @@ func (d *Daemon) allocateDatapathIPs(family datapath.NodeAddressingFamily) (rout
if routerIP == nil {
var result *ipam.AllocationResult
family := ipam.DeriveFamily(family.PrimaryExternal())
result, err = d.ipam.AllocateNextFamily(family, "router")
result, err = d.ipam.AllocateNextFamilyWithoutSyncUpstream(family, "router")
if err != nil {
err = fmt.Errorf("Unable to allocate router IP for family %s: %s", family, err)
return
Expand All @@ -212,7 +212,7 @@ func (d *Daemon) allocateHealthIPs() error {
bootstrapStats.healthCheck.Start()
if option.Config.EnableHealthChecking && option.Config.EnableEndpointHealthChecking {
if option.Config.EnableIPv4 {
result, err := d.ipam.AllocateNextFamily(ipam.IPv4, "health")
result, err := d.ipam.AllocateNextFamilyWithoutSyncUpstream(ipam.IPv4, "health")
if err != nil {
return fmt.Errorf("unable to allocate health IPs: %s,see https://cilium.link/ipam-range-full", err)
}
Expand All @@ -222,7 +222,7 @@ func (d *Daemon) allocateHealthIPs() error {
}

if option.Config.EnableIPv6 {
result, err := d.ipam.AllocateNextFamily(ipam.IPv6, "health")
result, err := d.ipam.AllocateNextFamilyWithoutSyncUpstream(ipam.IPv6, "health")
if err != nil {
if d.nodeDiscovery.LocalNode.IPv4HealthIP != nil {
d.ipam.ReleaseIP(d.nodeDiscovery.LocalNode.IPv4HealthIP)
Expand Down
4 changes: 2 additions & 2 deletions daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) error {
var err error

if option.Config.EnableIPv6 && ep.IPv6 != nil {
err = d.ipam.AllocateIP(ep.IPv6.IP(), ep.HumanStringLocked()+" [restored]")
err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv6.IP(), ep.HumanStringLocked()+" [restored]")
if err != nil {
return fmt.Errorf("unable to reallocate %s IPv6 address: %s", ep.IPv6.IP(), err)
}
Expand All @@ -337,7 +337,7 @@ func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) error {
}

if option.Config.EnableIPv4 && ep.IPv4 != nil {
if err = d.ipam.AllocateIP(ep.IPv4.IP(), ep.HumanStringLocked()+" [restored]"); err != nil {
if err = d.ipam.AllocateIPWithoutSyncUpstream(ep.IPv4.IP(), ep.HumanStringLocked()+" [restored]"); err != nil {
return fmt.Errorf("unable to reallocate %s IPv4 address: %s", ep.IPv4.IP(), err)
}
}
Expand Down
89 changes: 66 additions & 23 deletions pkg/ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,27 @@ func (ipam *IPAM) lookupIPsByOwner(owner string) (ips []net.IP) {

// AllocateIP allocates a IP address.
func (ipam *IPAM) AllocateIP(ip net.IP, owner string) (err error) {
needSyncUpstream := true
return ipam.allocateIP(ip, owner, needSyncUpstream)
}

// AllocateIPWithoutSyncUpstream allocates a IP address without syncing upstream.
func (ipam *IPAM) AllocateIPWithoutSyncUpstream(ip net.IP, owner string) (err error) {
needSyncUpstream := false
return ipam.allocateIP(ip, owner, needSyncUpstream)
}

// AllocateIPString is identical to AllocateIP but takes a string
func (ipam *IPAM) AllocateIPString(ipAddr, owner string) error {
ip := net.ParseIP(ipAddr)
if ip == nil {
return fmt.Errorf("Invalid IP address: %s", ipAddr)
}

return ipam.AllocateIP(ip, owner)
}

func (ipam *IPAM) allocateIP(ip net.IP, owner string, needSyncUpstream bool) (err error) {
ipam.allocatorMutex.Lock()
defer ipam.allocatorMutex.Unlock()

Expand All @@ -75,8 +96,14 @@ func (ipam *IPAM) AllocateIP(ip net.IP, owner string) (err error) {
return
}

if _, err = ipam.IPv4Allocator.Allocate(ip, owner); err != nil {
return
if needSyncUpstream {
if _, err = ipam.IPv4Allocator.Allocate(ip, owner); err != nil {
return
}
} else {
if _, err = ipam.IPv4Allocator.AllocateWithoutSyncUpstream(ip, owner); err != nil {
return
}
}
} else {
family = familyIPv6
Expand All @@ -85,8 +112,14 @@ func (ipam *IPAM) AllocateIP(ip net.IP, owner string) (err error) {
return
}

if _, err = ipam.IPv6Allocator.Allocate(ip, owner); err != nil {
return
if needSyncUpstream {
if _, err = ipam.IPv6Allocator.Allocate(ip, owner); err != nil {
return
}
} else {
if _, err = ipam.IPv6Allocator.AllocateWithoutSyncUpstream(ip, owner); err != nil {
return
}
}
}

Expand All @@ -100,24 +133,30 @@ func (ipam *IPAM) AllocateIP(ip net.IP, owner string) (err error) {
return
}

// AllocateIPString is identical to AllocateIP but takes a string
func (ipam *IPAM) AllocateIPString(ipAddr, owner string) error {
ip := net.ParseIP(ipAddr)
if ip == nil {
return fmt.Errorf("Invalid IP address: %s", ipAddr)
}
func (ipam *IPAM) allocateNextFamily(family Family, owner string, needSyncUpstream bool) (result *AllocationResult, err error) {
var allocator Allocator
switch family {
case IPv6:
allocator = ipam.IPv6Allocator
case IPv4:
allocator = ipam.IPv4Allocator

return ipam.AllocateIP(ip, owner)
}
default:
err = fmt.Errorf("unknown address \"%s\" family requested", family)
return
}

func (ipam *IPAM) allocateNextFamily(family Family, allocator Allocator, owner string) (result *AllocationResult, err error) {
if allocator == nil {
err = fmt.Errorf("%s allocator not available", family)
return
}

for {
result, err = allocator.AllocateNext(owner)
if needSyncUpstream {
result, err = allocator.AllocateNext(owner)
} else {
result, err = allocator.AllocateNextWithoutSyncUpstream(owner)
}
if err != nil {
return
}
Expand All @@ -144,16 +183,20 @@ func (ipam *IPAM) AllocateNextFamily(family Family, owner string) (result *Alloc
ipam.allocatorMutex.Lock()
defer ipam.allocatorMutex.Unlock()

switch family {
case IPv6:
result, err = ipam.allocateNextFamily(family, ipam.IPv6Allocator, owner)
case IPv4:
result, err = ipam.allocateNextFamily(family, ipam.IPv4Allocator, owner)
needSyncUpstream := true

default:
err = fmt.Errorf("unknown address \"%s\" family requested", family)
}
return
return ipam.allocateNextFamily(family, owner, needSyncUpstream)
}

// AllocateNextFamilyWithoutSyncUpstream allocates the next IP of the requested address family
// without syncing upstream
func (ipam *IPAM) AllocateNextFamilyWithoutSyncUpstream(family Family, owner string) (result *AllocationResult, err error) {
ipam.allocatorMutex.Lock()
defer ipam.allocatorMutex.Unlock()

needSyncUpstream := false

return ipam.allocateNextFamily(family, owner, needSyncUpstream)
}

// AllocateNext allocates the next available IPv4 and IPv6 address out of the
Expand Down
64 changes: 60 additions & 4 deletions pkg/ipam/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type nodeStore struct {
// family
allocationPoolSize map[Family]int

// signal for completion of restoration
restoreFinished chan bool

conf Configuration
}

Expand All @@ -89,6 +92,7 @@ func newNodeStore(nodeName string, conf Configuration, owner Owner, k8sEventReg
allocationPoolSize: map[Family]int{},
conf: conf,
}
store.restoreFinished = make(chan bool)
ciliumClient := k8s.CiliumClient()

t, err := trigger.NewTrigger(trigger.Parameters{
Expand Down Expand Up @@ -178,7 +182,12 @@ func newNodeStore(nodeName string, conf Configuration, owner Owner, k8sEventReg
time.Sleep(5 * time.Second)
}

store.refreshTrigger.TriggerWithReason("initial sync")
go func() {
// Initial upstream sync must wait for the allocated IPs
// to be restored
<-store.restoreFinished
store.refreshTrigger.TriggerWithReason("initial sync")
}()

return store
}
Expand Down Expand Up @@ -500,6 +509,30 @@ func (a *crdAllocator) Allocate(ip net.IP, owner string) (*AllocationResult, err
return nil, err
}

a.markAllocated(ip, owner, *ipInfo)
// Update custom resource to reflect the newly allocated IP.
a.store.refreshTrigger.TriggerWithReason(fmt.Sprintf("allocation of IP %s", ip.String()))

return a.buildAllocationResult(ip, ipInfo)
}

// AllocateWithoutSyncUpstream will attempt to find the specified IP in the
// custom resource and allocate it if it is available. If the IP is
// unavailable or already allocated, an error is returned. The custom resource
// will not be updated.
func (a *crdAllocator) AllocateWithoutSyncUpstream(ip net.IP, owner string) (*AllocationResult, error) {
a.mutex.Lock()
defer a.mutex.Unlock()

if _, ok := a.allocated[ip.String()]; ok {
return nil, fmt.Errorf("IP already in use")
}

ipInfo, err := a.store.allocate(ip)
if err != nil {
return nil, err
}

a.markAllocated(ip, owner, *ipInfo)

return a.buildAllocationResult(ip, ipInfo)
Expand All @@ -517,17 +550,16 @@ func (a *crdAllocator) Release(ip net.IP) error {
}

delete(a.allocated, ip.String())
// Update custom resource to reflect the newly released IP.
a.store.refreshTrigger.TriggerWithReason(fmt.Sprintf("release of IP %s", ip.String()))

return nil
}

// markAllocated marks a particular IP as allocated and triggers the custom
// resource update
// markAllocated marks a particular IP as allocated
func (a *crdAllocator) markAllocated(ip net.IP, owner string, ipInfo ipamTypes.AllocationIP) {
ipInfo.Owner = owner
a.allocated[ip.String()] = ipInfo
a.store.refreshTrigger.TriggerWithReason(fmt.Sprintf("allocation of IP %s", ip.String()))
}

// AllocateNext allocates the next available IP as offered by the custom
Expand All @@ -542,6 +574,25 @@ func (a *crdAllocator) AllocateNext(owner string) (*AllocationResult, error) {
return nil, err
}

a.markAllocated(ip, owner, *ipInfo)
// Update custom resource to reflect the newly allocated IP.
a.store.refreshTrigger.TriggerWithReason(fmt.Sprintf("allocation of IP %s", ip.String()))

return a.buildAllocationResult(ip, ipInfo)
}

// AllocateNextWithoutSyncUpstream allocates the next available IP as offered
// by the custom resource or return an error if no IP is available. The custom
// resource will not be updated.
func (a *crdAllocator) AllocateNextWithoutSyncUpstream(owner string) (*AllocationResult, error) {
a.mutex.Lock()
defer a.mutex.Unlock()

ip, ipInfo, err := a.store.allocateNext(a.allocated, a.family)
if err != nil {
return nil, err
}

a.markAllocated(ip, owner, *ipInfo)

return a.buildAllocationResult(ip, ipInfo)
Expand Down Expand Up @@ -569,3 +620,8 @@ func (a *crdAllocator) Dump() (map[string]string, string) {
status := fmt.Sprintf("%d/%d allocated", len(allocs), a.totalPoolSize())
return allocs, status
}

// RestoreFinished marks the status of restoration as done
func (a *crdAllocator) RestoreFinished() {
close(a.store.restoreFinished)
}
20 changes: 20 additions & 0 deletions pkg/ipam/hostscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func (h *hostScopeAllocator) Allocate(ip net.IP, owner string) (*AllocationResul
return &AllocationResult{IP: ip}, nil
}

func (h *hostScopeAllocator) AllocateWithoutSyncUpstream(ip net.IP, owner string) (*AllocationResult, error) {
if err := h.allocator.Allocate(ip); err != nil {
return nil, err
}

return &AllocationResult{IP: ip}, nil
}

func (h *hostScopeAllocator) Release(ip net.IP) error {
return h.allocator.Release(ip)
}
Expand All @@ -63,6 +71,15 @@ func (h *hostScopeAllocator) AllocateNext(owner string) (*AllocationResult, erro
return &AllocationResult{IP: ip}, nil
}

func (h *hostScopeAllocator) AllocateNextWithoutSyncUpstream(owner string) (*AllocationResult, error) {
ip, err := h.allocator.AllocateNext()
if err != nil {
return nil, err
}

return &AllocationResult{IP: ip}, nil
}

func (h *hostScopeAllocator) Dump() (map[string]string, string) {
var origIP *big.Int
alloc := map[string]string{}
Expand All @@ -88,3 +105,6 @@ func (h *hostScopeAllocator) Dump() (map[string]string, string) {

return alloc, status
}

// RestoreFinished marks the status of restoration as done
func (h *hostScopeAllocator) RestoreFinished() {}
11 changes: 11 additions & 0 deletions pkg/ipam/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,29 @@ type Allocator interface {
// Allocate allocates a specific IP or fails
Allocate(ip net.IP, owner string) (*AllocationResult, error)

// AllocateWithoutSyncUpstream allocates a specific IP without syncing
// upstream or fails
AllocateWithoutSyncUpstream(ip net.IP, owner string) (*AllocationResult, error)

// Release releases a previously allocated IP or fails
Release(ip net.IP) error

// AllocateNext allocates the next available IP or fails if no more IPs
// are available
AllocateNext(owner string) (*AllocationResult, error)

// AllocateNextWithoutSyncUpstream allocates the next available IP without syncing
// upstream or fails if no more IPs are available
AllocateNextWithoutSyncUpstream(owner string) (*AllocationResult, error)

// Dump returns a map of all allocated IPs with the IP represented as
// key in the map. Dump must also provide a status one-liner to
// represent the overall status, e.g. number of IPs allocated and
// overall health information if available.
Dump() (map[string]string, string)

// RestoreFinished marks the status of restoration as done
RestoreFinished()
}

// IPNetWithOwner is a structure containing a net.IPNet struct with the owner
Expand Down

0 comments on commit 91071da

Please sign in to comment.