Skip to content

Commit

Permalink
ipcache: fix potential deadlock in GetNamedPorts
Browse files Browse the repository at this point in the history
We have a partial lock ordering of IPCache > Endpoint as established in
InjectLabels, where the IPCache lock is held and all endpoint locks are
acquired one by one. On the other hand, GetNamedPorts was called in a
context with the Endpoint lock held (cf *Endpoint.GetNamedPortLocked),
and attempts to acquire the IPCache lock. This would establish EP >
IPCache, which potentially deadlocks with the above.

To fix it, remove the requirement for a write lock on IPCache for
GetNamedPorts. Instead, we always pre-compute the namedPorts map in
Upsert and Delete and simply return it in GetNamedPorts. To ensure
atomicity, we use atomics to load and store read-only pointer to the
namedPorts map and the needNamedPorts flag.

Co-authored-by: Sebastian Wicki <sebastian@isovalent.com>
Signed-off-by: David Bimmler <david.bimmler@isovalent.com>
  • Loading branch information
bimmlerd and gandro committed Apr 11, 2023
1 parent d91219a commit 93cd67f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 32 deletions.
81 changes: 50 additions & 31 deletions pkg/ipcache/ipcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"net"
"net/netip"
"sync/atomic"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -103,18 +104,17 @@ type IPCache struct {
// controllers manages the async controllers for this IPCache
controllers *controller.Manager

// needNamedPorts is initially 'false', but will be changd to 'true' when the
// clusterwide named port mappings are needed for network policy computation
// for the first time. This avoids the overhead of maintaining 'namedPorts' map
// when it is known not to be needed.
// Protected by 'mutex'.
needNamedPorts bool
// needNamedPorts is initially 'false', but will atomically be changed to 'true'
// when the clusterwide named port mappings are needed for network policy
// computation for the first time. This avoids the overhead of unnecessarily
// triggering policy updates when it is known not to be needed.
needNamedPorts atomic.Bool

// namedPorts is a collection of all named ports in the cluster. This is needed
// only if an egress policy refers to a port by name.
// This map is returned to users so all updates must be made into a fresh map that
// is then swapped in place while 'mutex' is being held.
namedPorts types.NamedPortMultiMap
// is then swapped in place atomically.
namedPorts atomic.Pointer[types.NamedPortMultiMap]

cacheStatus k8s.CacheStatus

Expand All @@ -141,7 +141,7 @@ func NewIPCache(c *Configuration) *IPCache {
ipToHostIPCache: map[string]IPKeyPair{},
ipToK8sMetadata: map[string]K8sMetadata{},
controllers: controller.NewManager(),
namedPorts: nil,
namedPorts: atomic.Pointer[types.NamedPortMultiMap]{},
metadata: newMetadata(),
Configuration: c,
}
Expand Down Expand Up @@ -244,13 +244,19 @@ func (ipc *IPCache) getK8sMetadata(ip string) *K8sMetadata {
return nil
}

// updateNamedPorts accumulates named ports from all K8sMetadata entries to a single map
func (ipc *IPCache) updateNamedPorts() (namedPortsChanged bool) {
if !ipc.needNamedPorts {
return false
}
// updateNamedPortsRLocked accumulates named ports from all K8sMetadata entries
// to a single map. It is required to hold _at least_ the read lock when calling
// this function, holding the write lock is also allowed.
func (ipc *IPCache) updateNamedPortsRLocked() (namedPortsChanged bool) {
// Collect new named Ports
npm := make(types.NamedPortMultiMap, len(ipc.namedPorts))
var oldLen = 0
var old types.NamedPortMultiMap
if m := ipc.namedPorts.Load(); m != nil {
old = *m
oldLen = len(old)
}

npm := make(types.NamedPortMultiMap, oldLen)
for _, km := range ipc.ipToK8sMetadata {
for name, port := range km.NamedPorts {
if npm[name] == nil {
Expand All @@ -259,15 +265,20 @@ func (ipc *IPCache) updateNamedPorts() (namedPortsChanged bool) {
npm[name][port] = struct{}{}
}
}
namedPortsChanged = !npm.Equal(ipc.namedPorts)
namedPortsChanged = !npm.Equal(old)
if namedPortsChanged {
// swap the new map in
if len(npm) == 0 {
ipc.namedPorts = nil
} else {
ipc.namedPorts = npm
}
// atomically swap the new map in
ipc.namedPorts.Store(&npm)
}

// Set namedPortsChanged to false if they have not (yet) been requested.
// This avoids triggering policy updates if named ports changed, but no
// policy actually needs them. We still pre-compute the namedPorts map,
// so we don't have to acquire the IPCache lock in GetNamedPorts
if !ipc.needNamedPorts.Load() {
namedPortsChanged = false
}

return namedPortsChanged
}

Expand Down Expand Up @@ -465,7 +476,7 @@ func (ipc *IPCache) upsertLocked(
if namedPortsChanged {
// It is possible that some other POD defines same values, check if
// anything changes over all the PODs.
namedPortsChanged = ipc.updateNamedPorts()
namedPortsChanged = ipc.updateNamedPortsRLocked()
}
}

Expand Down Expand Up @@ -658,7 +669,7 @@ func (ipc *IPCache) deleteLocked(ip string, source source.Source) (namedPortsCha
// Update named ports
namedPortsChanged = false
if oldK8sMeta != nil && len(oldK8sMeta.NamedPorts) > 0 {
namedPortsChanged = ipc.updateNamedPorts()
namedPortsChanged = ipc.updateNamedPortsRLocked()
}

if newHostIP != nil {
Expand All @@ -680,15 +691,23 @@ func (ipc *IPCache) deleteLocked(ip string, source source.Source) (namedPortsCha

// GetNamedPorts returns a copy of the named ports map. May return nil.
func (ipc *IPCache) GetNamedPorts() (npm types.NamedPortMultiMap) {
ipc.mutex.Lock()
if !ipc.needNamedPorts {
ipc.needNamedPorts = true
ipc.updateNamedPorts()
}
// We must not acquire the IPCache mutex here, as that would establish a lock ordering of
// Endpoint > IPCache (as endpoint.mutex can be held while calling GetNamedPorts)
// Since InjectLabels requires IPCache > Endpoint, a deadlock can occur otherwise.

// needNamedPorts is initially set to 'false'. This means that we will not trigger
// policy updates upon changes to named ports. Once this is set to 'true' though,
// Upsert and Delete will start to return 'namedPortsChanged = true' if the upsert
// or delete changed a named port, enabling the caller to trigger a policy update.
// Note that at the moment, this will never be set back to false, even if no policy
// uses named ports anymore.
ipc.needNamedPorts.Store(true)

// Caller can keep using the map after the lock is released, as the map is never changed
// once published.
npm = ipc.namedPorts
ipc.mutex.Unlock()
if ptr := ipc.namedPorts.Load(); ptr != nil {
npm = *ptr
}
return npm
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ipcache/ipcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (s *IPCacheTestSuite) TestIPCacheNamedPorts(c *C) {
namedPortsChanged = IPIdentityCache.Delete(endpointIP2, source.Kubernetes)
c.Assert(namedPortsChanged, Equals, true)
npm = IPIdentityCache.GetNamedPorts()
c.Assert(npm, IsNil)
c.Assert(npm, HasLen, 0)
}

type dummyListener struct {
Expand Down

0 comments on commit 93cd67f

Please sign in to comment.