Skip to content

Commit

Permalink
Merge 11617db into 2457480
Browse files Browse the repository at this point in the history
  • Loading branch information
vklohiya committed Apr 30, 2024
2 parents 2457480 + 11617db commit 9d3b45a
Show file tree
Hide file tree
Showing 11 changed files with 623 additions and 347 deletions.
19 changes: 11 additions & 8 deletions pkg/controller/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var _ = Describe("Informers Tests", func() {
mockCtlr.resourceQueue = workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "custom-resource-controller")
mockCtlr.resources = NewResourceStore()
mockCtlr.resources.ltmConfig = make(map[string]*PartitionConfig, 0)
mockCtlr.resources.ltmConfig = sync.Map{}
mockCtlr.requestQueue = &requestQueue{sync.Mutex{}, list.New()}
mockCtlr.Partition = "test"

Expand Down Expand Up @@ -101,13 +101,14 @@ var _ = Describe("Informers Tests", func() {
Partition: "dev",
})
zero := 0
mockCtlr.resources.ltmConfig[mockCtlr.Partition] = &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero}
mockCtlr.resources.ltmConfig.Store(mockCtlr.Partition, &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero})
mockCtlr.enqueueUpdatedVirtualServer(vs, newVS)
key, quit = mockCtlr.resourceQueue.Get()
Expect(key).ToNot(BeNil(), "Enqueue Updated VS Failed")
Expect(quit).To(BeFalse(), "Enqueue Updated VS Failed")
Expect(*mockCtlr.resources.ltmConfig[mockCtlr.Partition].Priority).To(BeEquivalentTo(1), "Priority Not Updated")
delete(mockCtlr.resources.ltmConfig, mockCtlr.Partition)
pcInt, _ := mockCtlr.resources.ltmConfig.Load(mockCtlr.Partition)
Expect(*pcInt.(*PartitionConfig).Priority).To(BeEquivalentTo(1), "Priority Not Updated")
mockCtlr.resources.ltmConfig.Delete(mockCtlr.Partition)
key, quit = mockCtlr.resourceQueue.Get()
Expect(key).ToNot(BeNil(), "Enqueue Updated VS Failed")
Expect(quit).To(BeFalse(), "Enqueue Updated VS Failed")
Expand Down Expand Up @@ -252,9 +253,10 @@ var _ = Describe("Informers Tests", func() {
tsWithPartition := newTS.DeepCopy()
tsWithPartition.Spec.Partition = "dev"
zero := 0
mockCtlr.resources.ltmConfig[mockCtlr.Partition] = &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero}
mockCtlr.resources.ltmConfig.Store(mockCtlr.Partition, &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero})
mockCtlr.enqueueUpdatedTransportServer(newTS, tsWithPartition)
Expect(*mockCtlr.resources.ltmConfig[mockCtlr.Partition].Priority).To(BeEquivalentTo(1), "Priority Not Updated")
pcInt, _ := mockCtlr.resources.ltmConfig.Load(mockCtlr.Partition)
Expect(*pcInt.(*PartitionConfig).Priority).To(BeEquivalentTo(1), "Priority Not Updated")

// Verify TS status update event is not queued for processing
queueLen := mockCtlr.resourceQueue.Len()
Expand Down Expand Up @@ -326,9 +328,10 @@ var _ = Describe("Informers Tests", func() {
ilWithPartition := newIL.DeepCopy()
ilWithPartition.Spec.Partition = "dev"
zero := 0
mockCtlr.resources.ltmConfig[mockCtlr.Partition] = &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero}
mockCtlr.resources.ltmConfig.Store(mockCtlr.Partition, &PartitionConfig{ResourceMap: make(ResourceMap), Priority: &zero})
mockCtlr.enqueueUpdatedIngressLink(newIL, ilWithPartition)
Expect(*mockCtlr.resources.ltmConfig[mockCtlr.Partition].Priority).To(BeEquivalentTo(1), "Priority Not Updated")
pcInt, _ := mockCtlr.resources.ltmConfig.Load(mockCtlr.Partition)
Expect(*pcInt.(*PartitionConfig).Priority).To(BeEquivalentTo(1), "Priority Not Updated")

})

Expand Down
63 changes: 37 additions & 26 deletions pkg/controller/multiClusterWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
return
}

ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()

for _, svc := range clusterSvcs {
if ctlr.checkValidExtendedService(svc) != nil {
// Skip processing invalid extended service
Expand All @@ -31,13 +28,19 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
clusterName: svc.ClusterName,
}

if ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName] == nil {
ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName] = make(map[MultiClusterServiceKey]map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
var multiClusterServicePoolMap MultiClusterServicePoolMap
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(svc.ClusterName); !ok {
multiClusterServicePoolMap = make(MultiClusterServicePoolMap)
} else {
multiClusterServicePoolMap = valInt.(MultiClusterServicePoolMap)

if _, ok := ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName][svcKey]; !ok {
ctlr.multiClusterResources.clusterSvcMap[svc.ClusterName][svcKey] = make(map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
// if service not found in clusterSvcMap, add it
if _, ok := multiClusterServicePoolMap[svcKey]; !ok {
multiClusterServicePoolMap[svcKey] = make(map[MultiClusterServiceConfig]map[PoolIdentifier]struct{})
}
// update the multi cluster resource map
ctlr.multiClusterResources.clusterSvcMap.Store(svc.ClusterName, multiClusterServicePoolMap)

// update the multi cluster resource map
if _, ok := ctlr.multiClusterResources.rscSvcMap[rscKey]; !ok {
Expand Down Expand Up @@ -73,25 +76,24 @@ func (ctlr *Controller) processResourceExternalClusterServices(rscKey resourceRe
//}

func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey resourceRef) {
ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()
// remove resource and service mapping
if svcs, ok := ctlr.multiClusterResources.rscSvcMap[rsKey]; ok {
// for service referring to resource, remove the resource from clusterSvcMap
for mSvcKey, port := range svcs {
if _, ok = ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName]; ok {
if _, ok = ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey]; ok {
if poolIdsMap, found := ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey][port]; found {
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(mSvcKey.clusterName); ok {
multiClusterServicePoolMap := valInt.(MultiClusterServicePoolMap)
if _, ok = multiClusterServicePoolMap[mSvcKey]; ok {
if poolIdsMap, found := multiClusterServicePoolMap[mSvcKey][port]; found {
for poolId := range poolIdsMap {
if poolId.rsKey == rsKey {
delete(poolIdsMap, poolId)
}
}
if len(poolIdsMap) == 0 {
delete(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey], port)
delete(multiClusterServicePoolMap[mSvcKey], port)
//delete the poolMem Cache as well
log.Debugf("Deleting Service '%v' from CIS cache as it's not referenced by monitored resources", mSvcKey)
delete(ctlr.resources.poolMemCache, mSvcKey)
ctlr.resources.poolMemCache.Delete(mSvcKey)
// delete the pod cache as well in nodePortLocal mode
if ctlr.PoolMemberType == NodePortLocal {
pods := ctlr.GetPodsForService(mSvcKey.namespace, mSvcKey.serviceName, mSvcKey.clusterName, true)
Expand All @@ -100,13 +102,15 @@ func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey res
}
}
} else {
ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey][port] = poolIdsMap
multiClusterServicePoolMap[mSvcKey][port] = poolIdsMap
}
}
}
}
if len(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName][mSvcKey]) == 0 {
delete(ctlr.multiClusterResources.clusterSvcMap[mSvcKey.clusterName], mSvcKey)
if len(multiClusterServicePoolMap[mSvcKey]) == 0 {
delete(multiClusterServicePoolMap, mSvcKey)
}
// store the updated clusterSvcMap
ctlr.multiClusterResources.clusterSvcMap.Store(mSvcKey.clusterName, multiClusterServicePoolMap)
}
}
//remove resource entry
Expand All @@ -117,19 +121,26 @@ func (ctlr *Controller) deleteResourceExternalClusterSvcRouteReference(rsKey res
// when route is processed check for the clusters whose services references are removed
// if any cluster is present with no references of services, stop the cluster informers
func (ctlr *Controller) deleteUnrefereedMultiClusterInformers() {

ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()

for clusterName, svcs := range ctlr.multiClusterResources.clusterSvcMap {
// Channel to receive keys to delete
keysToDelete := make(chan interface{})
defer close(keysToDelete)
ctlr.multiClusterResources.clusterSvcMap.Range(func(key, value interface{}) bool {
clusterName := key.(string)
svcs := value.(MultiClusterServicePoolMap)
// If no services are referenced from this cluster and this isn't HA peer cluster in case of active-active/ratio
// then remove the clusterName key from the clusterSvcMap and stop the informers for this cluster
if len(svcs) == 0 && ((ctlr.haModeType == StandAloneCIS || ctlr.haModeType == StandBy) ||
ctlr.multiClusterConfigs.HAPairClusterName != clusterName) {
delete(ctlr.multiClusterResources.clusterSvcMap, clusterName)
ctlr.stopMultiClusterInformers(clusterName, true)
keysToDelete <- key
}
return true
})
// Delete keys received from the channel
for key := range keysToDelete {
ctlr.multiClusterResources.clusterSvcMap.Delete(key)
ctlr.stopMultiClusterInformers(key.(string), true)
}

}

func (ctlr *Controller) getSvcPortFromHACluster(svcNameSpace, svcName, portName, rscType string) (int32, error) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/controller/nativeResourceWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,8 @@ func (ctlr *Controller) UpdatePoolHealthMonitors(svcKey MultiClusterServiceKey)
"",
)
// for each cluster -> referred svcs -> for each svc -> port info and bigip vs and dependant resource(route)
if serviceKeys, ok := ctlr.multiClusterResources.clusterSvcMap[svcKey.clusterName]; ok {
if valInt, ok := ctlr.multiClusterResources.clusterSvcMap.Load(svcKey.clusterName); ok {
serviceKeys := valInt.(MultiClusterServicePoolMap)
if svcPorts, ok2 := serviceKeys[svcKey]; ok2 {
for _, poolIds := range svcPorts {
for poolId := range poolIds {
Expand Down Expand Up @@ -1109,7 +1110,7 @@ func (ctlr *Controller) processRouteConfigFromGlobalCM(es extendedSpec, isDelete
_ = ctlr.processRoutes(routeGroupKey, true)
// deleting the bigip partition when partition is changes
if ctlr.resources.extdSpecMap[routeGroupKey].partition != newExtdSpecMap[routeGroupKey].partition {
if _, ok := ctlr.resources.ltmConfig[ctlr.resources.extdSpecMap[routeGroupKey].partition]; ok {
if _, ok := ctlr.resources.ltmConfig.Load(ctlr.resources.extdSpecMap[routeGroupKey].partition); ok {
ctlr.resources.updatePartitionPriority(ctlr.resources.extdSpecMap[routeGroupKey].partition, 1)
}
}
Expand Down Expand Up @@ -1798,8 +1799,6 @@ func (ctlr *Controller) checkValidRoute(route *routeapi.Route, plcSSLProfiles rg
var clusterSvcs []cisapiv1.MultiClusterServiceReference
err := json.Unmarshal([]byte(annotation), &clusterSvcs)
if err == nil {
ctlr.multiClusterResources.Lock()
defer ctlr.multiClusterResources.Unlock()
for _, svc := range clusterSvcs {
err := ctlr.checkValidExtendedService(svc)
if err != nil {
Expand Down

0 comments on commit 9d3b45a

Please sign in to comment.