Skip to content

Commit

Permalink
Fix for crash due to concurrent access of clustersvc Map
Browse files Browse the repository at this point in the history
  • Loading branch information
vklohiya committed Apr 30, 2024
1 parent d552363 commit 11617db
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 72 deletions.
61 changes: 36 additions & 25 deletions pkg/controller/multiClusterWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
return
}

ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()

for _, svc := range clusterSvcs {
if ctlr.checkValidExtendedService(svc) != nil {
// Skip processing invalid extended service
Expand All @@ -31,13 +28,19 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
clusterName: svc.ClusterName,
}

if ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName] == nil {
ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName] = make(map[MultiClusterServiceKey]map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
var multiClusterServicePoolMap MultiClusterServicePoolMap
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(svc.ClusterName); !ok {
multiClusterServicePoolMap = make(MultiClusterServicePoolMap)
} else {
multiClusterServicePoolMap = valInt.(MultiClusterServicePoolMap)

if _, ok := ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName][svcKey]; !ok {
ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName][svcKey] = make(map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
// if service not found in clusterSvcMap, add it
if _, ok := multiClusterServicePoolMap[svcKey]; !ok {
multiClusterServicePoolMap[svcKey] = make(map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
// update the multi cluster resource map
ctlr.multiClusterResources.clusterSvcMap.Store(svc.ClusterName, multiClusterServicePoolMap)

// update the multi cluster resource map
if _, ok := ctlr.multiClusterResources.rscSvcMap[rscKey]; !ok {
Expand Down Expand Up @@ -73,22 +76,21 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
//}

func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey resourceRef) {
ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()
// remove resource and service mapping
if svcs, ok := ctlr.multiClusterResources.rscSvcMap[rsKey]; ok {
// for service referring to resource, remove the resource from clusterSvcMap
for mSvcKey, port := range svcs {
if _, ok = ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName]; ok {
if _, ok = ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey]; ok {
if poolIdsMap, found := ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey][port]; found {
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(mSvcKey.clusterName); ok {
multiClusterServicePoolMap := valInt.(MultiClusterServicePoolMap)
if _, ok = multiClusterServicePoolMap[mSvcKey]; ok {
if poolIdsMap, found := multiClusterServicePoolMap[mSvcKey][port]; found {
for poolId := range poolIdsMap {
if poolId.rsKey == rsKey {
delete(poolIdsMap, poolId)
}
}
if len(poolIdsMap) == 0 {
delete(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey], port)
delete(multiClusterServicePoolMap[mSvcKey], port)
//delete the poolMem Cache as well
log.Debugf("Deleting Service '%v' from CIS cache as it's not referenced by monitored resources", mSvcKey)
ctlr.resources.poolMemCache.Delete(mSvcKey)
Expand All @@ -100,13 +102,15 @@ func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey res
}
}
} else {
ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey][port] = poolIdsMap
multiClusterServicePoolMap[mSvcKey][port] = poolIdsMap
}
}
}
}
if len(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey]) == 0 {
delete(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName], mSvcKey)
if len(multiClusterServicePoolMap[mSvcKey]) == 0 {
delete(multiClusterServicePoolMap, mSvcKey)
}
// store the updated clusterSvcMap
ctlr.multiClusterResources.clusterSvcMap.Store(mSvcKey.clusterName, multiClusterServicePoolMap)
}
}
//remove resource entry
Expand All @@ -117,19 +121,26 @@ func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey res
// when route is processed check for the clusters whose services references are removed
// if any cluster is present with no references of services, stop the cluster informers
func (ctlr *Controller) deleteUnrefereedMultiClusterInformers() {

ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()

for clusterName, svcs := range ctlr.multiClusterResources.clusterSvcMap {
// Channel to receive keys to delete
keysToDelete := make(chan interface{})
defer close(keysToDelete)
ctlr.multiClusterResources.clusterSvcMap.Range(func(key, value interface{}) bool {
clusterName := key.(string)
svcs := value.(MultiClusterServicePoolMap)
// If no services are referenced from this cluster and this isn't HA peer cluster in case of active-active/ratio
// then remove the clusterName key from the clusterSvcMap and stop the informers for this cluster
if len(svcs) == 0 && ((ctlr.haModeType == StandAloneCIS || ctlr.haModeType == StandBy) ||
ctlr.multiClusterConfigs.HAPairClusterName != clusterName) {
delete(ctlr.multiClusterResources.clusterSvcMap, clusterName)
ctlr.stopMultiClusterInformers(clusterName, true)
keysToDelete <- key
}
return true
})
// Delete keys received from the channel
for key := range keysToDelete {
ctlr.multiClusterResources.clusterSvcMap.Delete(key)
ctlr.stopMultiClusterInformers(key.(string), true)
}

}

func (ctlr *Controller) getSvcPortFromHACluster(svcNameSpace, svcName, portName, rscType string) (int32, error) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/nativeResourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ func (ctlr *Controller) UpdatePoolHealthMonitors(svcKey MultiClusterServiceKey)
"",
)
// for each cluster -> referred svcs -> for each svc -> port info and bigip vs and dependant resource(route)
if serviceKeys, ok := ctlr.multiClusterResources.clusterSvcMap[svcKey.clusterName]; ok {
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(svcKey.clusterName); ok {
serviceKeys := valInt.(MultiClusterServicePoolMap)
if svcPorts, ok2 := serviceKeys[svcKey]; ok2 {
for _, poolIds := range svcPorts {
for poolId := range poolIds {
Expand Down Expand Up @@ -1798,8 +1799,6 @@ func (ctlr *Controller) checkValidRoute(route *routeapi.Route, plcSSLProfiles rg
var clusterSvcs []cisapiv1.MultiClusterServiceReference
err := json.Unmarshal([]byte(annotation), &clusterSvcs)
if err == nil {
ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()
for _, svc := range clusterSvcs {
err := ctlr.checkValidExtendedService(svc)
if err != nil {
Expand Down
97 changes: 68 additions & 29 deletions pkg/controller/nativeResourceWorker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2203,8 +2203,10 @@ extendedRouteSpec:

It("Process Route with multi cluster annotation without multicluster config", func() {
Expect(mockCtlr.prepareResourceConfigFromRoute(rsCfg, route1, intstr.IntOrString{IntVal: 80}, ps)).To(BeNil())
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(1))
multiClusterServicecPoolMap, ok := mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(1))
})

It("Process Route with multi cluster annotation with multicluster config", func() {
Expand All @@ -2229,9 +2231,12 @@ extendedRouteSpec:
route1.Annotations["virtual-server.f5.com/multiClusterServices"] = `[{"clusterName": "cluster3", "service":"svc", "namespace": "default", "servicePort": "8080" },
{"clusterName": "cluster3", "service":"svc1", "namespace": "default", "servicePort": "8081" }]`
Expect(mockCtlr.prepareResourceConfigFromRoute(rsCfg, route1, intstr.IntOrString{IntVal: 80}, ps)).To(BeNil())
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))
multiClusterServicecPoolMap, ok := mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))

resourceKey := resourceRef{
kind: Route,
Expand All @@ -2254,16 +2259,24 @@ extendedRouteSpec:
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
Expect(mockCtlr.prepareResourceConfigFromRoute(rsCfg, route1, intstr.IntOrString{IntVal: 80}, ps)).To(BeNil())
// for local cluster service mapping must be present
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(0))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(0))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))

route1.Annotations["virtual-server.f5.com/multiClusterServices"] = `[{"clusterName": "cluster3", "service":"svc1", "namespace": "default", "servicePort": "8081" }]`
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
Expect(mockCtlr.prepareResourceConfigFromRoute(rsCfg, route1, intstr.IntOrString{IntVal: 80}, ps)).To(BeNil())
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))

})

Expand Down Expand Up @@ -2528,9 +2541,13 @@ externalClustersConfig:
}
mockCtlr.haModeType = Ratio
mockCtlr.prepareRSConfigFromVirtualServer(rsCfg, vs, false, "")
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
multiClusterServicecPoolMap, ok := mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))

// Verify that distinct health monitors are created for all pools in ratio mode
expectedHealthMonitors := make(map[string]struct{})
Expand All @@ -2556,9 +2573,13 @@ externalClustersConfig:
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
mockCtlr.prepareRSConfigFromVirtualServer(rsCfg, vs, false, "")
// for local cluster service mapping must be present
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(0))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(0))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))

vs.Spec.Pools[0].MultiClusterServices = []cisapiv1.MultiClusterServiceReference{
{
Expand All @@ -2570,9 +2591,13 @@ externalClustersConfig:
}
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
mockCtlr.prepareRSConfigFromVirtualServer(rsCfg, vs, false, "")
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(3))
})

It("Process TS with multi cluster config", func() {
Expand Down Expand Up @@ -2629,9 +2654,15 @@ externalClustersConfig:
}
}
mockCtlr.prepareRSConfigFromTransportServer(rsCfg, ts)
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))
multiClusterServicecPoolMap, ok := mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))

multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))

Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))

resourceKey := resourceRef{
kind: TransportServer,
Expand All @@ -2643,9 +2674,13 @@ externalClustersConfig:
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
mockCtlr.prepareRSConfigFromTransportServer(rsCfg, ts)
// for local cluster service mapping must be present
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(0))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(0))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))

ts.Spec.Pool.MultiClusterServices = []cisapiv1.MultiClusterServiceReference{
{
Expand All @@ -2657,9 +2692,13 @@ externalClustersConfig:
}
mockCtlr.deleteResourceExternalClusterSvcRouteReference(resourceKey)
mockCtlr.prepareRSConfigFromTransportServer(rsCfg, ts)
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap[""])).To(Equal(2))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap["cluster3"])).To(Equal(1))
Expect(len(mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(2))
multiClusterServicecPoolMap, ok = mockCtlr.multiClusterResources.clusterSvcMap.Load("cluster3")
Expect(ok).To(BeTrue())
Expect(len(multiClusterServicecPoolMap.(MultiClusterServicePoolMap))).To(Equal(1))
Expect(test.LenSyncMap(&mockCtlr.multiClusterResources.clusterSvcMap)).To(Equal(2))

})
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/node_poll_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (ctlr *Controller) ProcessNodeUpdate(obj interface{}, clusterName string) {
log.Debugf("Processing Node Updates for local cluster")
// Update node cache
ctlr.oldNodes = newNodes
if _, ok := ctlr.multiClusterResources.clusterSvcMap[clusterName]; ok {
if _, ok := ctlr.multiClusterResources.clusterSvcMap.Load(clusterName); ok {
ctlr.UpdatePoolMembersForNodeUpdate(clusterName)
}
}
Expand All @@ -76,8 +76,8 @@ func (ctlr *Controller) ProcessNodeUpdate(obj interface{}, clusterName string) {
log.Debugf("[MultiCluster] Processing Node Updates for cluster: %s", clusterName)
// Update node cache
nodeInf.oldNodes = newNodes
if ctlr.multiClusterResources.clusterSvcMap != nil {
if _, ok := ctlr.multiClusterResources.clusterSvcMap[clusterName]; ok {
if &ctlr.multiClusterResources.clusterSvcMap != nil {
if _, ok := ctlr.multiClusterResources.clusterSvcMap.Load(clusterName); ok {
ctlr.UpdatePoolMembersForNodeUpdate(clusterName)
}
}
Expand All @@ -99,7 +99,8 @@ func (ctlr *Controller) ProcessNodeUpdate(obj interface{}, clusterName string) {
}

func (ctlr *Controller) UpdatePoolMembersForNodeUpdate(clusterName string) {
if svcKeys, ok := ctlr.multiClusterResources.clusterSvcMap[clusterName]; ok {
if sKey, ok := ctlr.multiClusterResources.clusterSvcMap.Load(clusterName); ok {
svcKeys, _ := sKey.(MultiClusterServicePoolMap)
for svcKey, _ := range svcKeys {
ctlr.updatePoolMembersForService(svcKey, false)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/resourceConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewResourceStore() *ResourceStore {
func newMultiClusterResourceStore() *MultiClusterResourceStore {
var rs MultiClusterResourceStore
rs.rscSvcMap = make(map[resourceRef]map[MultiClusterServiceKey]MultiClusterServiceConfig)
rs.clusterSvcMap = make(map[string]map[MultiClusterServiceKey]map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
rs.clusterSvcMap = sync.Map{}
return &rs
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,9 +1409,10 @@ type (

MultiClusterResourceStore struct {
rscSvcMap map[resourceRef]map[MultiClusterServiceKey]MultiClusterServiceConfig
clusterSvcMap map[string]map[MultiClusterServiceKey]map[MultiClusterServiceConfig]map[PoolIdentifier]struct{}
sync.Mutex
clusterSvcMap sync.Map
}
MultiClusterServicePoolMap map[MultiClusterServiceKey]map[MultiClusterServiceConfig]map[PoolIdentifier]struct{}

MultiClusterServiceKey struct {
serviceName string
clusterName string
Expand Down

0 comments on commit 11617db

Please sign in to comment.