Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ipcache: add ipcacher interface #24274

Merged
merged 1 commit into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Configuration struct {
// remote cluster.
RemoteIdentityWatcher RemoteIdentityWatcher

IPCache *ipcache.IPCache
IPCache ipcache.IPCacher
}

func SetClusterConfig(clusterName string, config *cmtypes.CiliumClusterConfig, backend kvstore.BackendOperations) error {
Expand Down Expand Up @@ -144,7 +144,7 @@ type ClusterMesh struct {
controllers *controller.Manager
configWatcher *configDirectoryWatcher

ipcache *ipcache.IPCache
ipcache ipcache.IPCacher

// globalServices is a list of all global services. The datastructure
// is protected by its own mutex inside the structure.
Expand Down
8 changes: 8 additions & 0 deletions pkg/ipcache/ipcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,11 @@ func (m *K8sMetadata) Equal(o *K8sMetadata) bool {
}
return m.Namespace == o.Namespace && m.PodName == o.PodName
}

func (ipc *IPCache) ForEachListener(f func(listener IPIdentityMappingListener)) {
ipc.mutex.Lock()
defer ipc.mutex.Unlock()
for _, listener := range ipc.listeners {
f(listener)
}
}
20 changes: 12 additions & 8 deletions pkg/ipcache/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

var (
// IPIdentitiesPath is the path to where endpoint IPs are stored in the key-value
//store.
// store.
IPIdentitiesPath = path.Join(kvstore.BaseKeyPrefix, "state", "ip", "v1")

// AddressSpace is the address space (cluster, etc.) in which policy is
Expand Down Expand Up @@ -201,12 +201,18 @@ type IPIdentityWatcher struct {

clusterID uint32

ipcache *IPCache
ipcache IPCacher
}

type IPCacher interface {
Upsert(ip string, hostIP net.IP, hostKey uint8, k8sMeta *K8sMetadata, newIdentity Identity) (bool, error)
ForEachListener(f func(listener IPIdentityMappingListener))
Delete(IP string, source source.Source) (namedPortsChanged bool)
}

// NewIPIdentityWatcher creates a new IPIdentityWatcher using the specified
// kvstore backend.
func NewIPIdentityWatcher(ipc *IPCache, backend kvstore.BackendOperations) *IPIdentityWatcher {
func NewIPIdentityWatcher(ipc IPCacher, backend kvstore.BackendOperations) *IPIdentityWatcher {
return NewClusterIPIdentityWatcher(0, ipc, backend)
}

Expand All @@ -215,7 +221,7 @@ func NewIPIdentityWatcher(ipc *IPCache, backend kvstore.BackendOperations) *IPId
// is that each IP <=> Identity mapping will be annotated with ClusterID. Thus, it
// can be used for watching the kvstore of the remote cluster with overlapping PodCIDR.
// Calling this function with clusterID = 0 is identical to calling NewIPIdentityWatcher.
func NewClusterIPIdentityWatcher(clusterID uint32, ipc *IPCache, backend kvstore.BackendOperations) *IPIdentityWatcher {
func NewClusterIPIdentityWatcher(clusterID uint32, ipc IPCacher, backend kvstore.BackendOperations) *IPIdentityWatcher {
watcher := &IPIdentityWatcher{
clusterID: clusterID,
backend: backend,
Expand Down Expand Up @@ -275,11 +281,9 @@ restart:
// the deletion event.
switch event.Typ {
case kvstore.EventTypeListDone:
iw.ipcache.Lock()
for _, listener := range iw.ipcache.listeners {
iw.ipcache.ForEachListener(func(listener IPIdentityMappingListener) {
listener.OnIPIdentityCacheGC()
}
iw.ipcache.Unlock()
})
iw.closeSynced()

case kvstore.EventTypeCreate, kvstore.EventTypeModify:
Expand Down