diff --git a/resourcetopo/event_queue.go b/resourcetopo/event_queue.go index 92483e3a..06db9967 100644 --- a/resourcetopo/event_queue.go +++ b/resourcetopo/event_queue.go @@ -17,8 +17,10 @@ package resourcetopo import ( + "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) @@ -33,23 +35,14 @@ const ( ) const ( - defaultNodeEventQueueSize = 1000 - defaultRelationEventQueueSize = 1000 - defaultNodeEventHandlePeriod = time.Second - defaultRelationEventHandlePeriod = time.Second + defaultNodeEventHandleRateMinDelay = time.Millisecond + defaultNodeEventHandleRateMaxDelay = time.Second + defaultRelationEventHandleRateMinDelay = time.Millisecond + defaultRelationEventHandleRateMaxDelay = time.Second + defaultNodeEventHandlePeriod = time.Second + defaultRelationEventHandlePeriod = time.Second ) -type nodeEvent struct { - eventType eventType - node *nodeInfo -} - -type relationEvent struct { - eventType eventType - preNode *nodeInfo - postNode *nodeInfo -} - func (m *manager) startHandleEvent(stopCh <-chan struct{}) { go wait.Until(m.handleNodeEvent, defaultNodeEventHandlePeriod, stopCh) go wait.Until(m.handleRelationEvent, defaultRelationEventHandlePeriod, stopCh) @@ -57,79 +50,165 @@ func (m *manager) startHandleEvent(stopCh <-chan struct{}) { func (m *manager) handleNodeEvent() { for { - select { - case e := <-m.nodeEventQueue: - storage := e.node.storageRef - if storage == nil { - klog.Errorf("Unexpected nil nodeStorage for nodeEvent node %v", e.node) - continue - } + item, shutdown := m.nodeEventQueue.Get() + if shutdown { + return + } + info, ok := item.(string) + if !ok { + klog.Errorf("Unexpected node event queue item %v", item) + continue + } + eventType, node := m.decodeString2NodeEvent(info) + if node == nil { + continue + } - switch e.eventType { - case EventTypeAdd: - for _, h := range storage.nodeUpdateHandler { - h.OnAdd(e.node) - } - case EventTypeUpdate: - for _, h := range storage.nodeUpdateHandler { - h.OnUpdate(e.node) - } - case EventTypeDelete: - for _, h := range storage.nodeUpdateHandler { - h.OnDelete(e.node) - } - case EventTypeRelatedUpdate: - for _, h := range storage.nodeUpdateHandler { - h.OnRelatedUpdate(e.node) - } + storage := node.storageRef + if storage == nil { + klog.Errorf("Unexpected nil nodeStorage for nodeEvent node %v", node) + continue + } + + switch eventType { + case EventTypeAdd: + for _, h := range storage.nodeUpdateHandler { + h.OnAdd(node) + } + case EventTypeUpdate: + for _, h := range storage.nodeUpdateHandler { + h.OnUpdate(node) + } + case EventTypeDelete: + for _, h := range storage.nodeUpdateHandler { + h.OnDelete(node) + } + case EventTypeRelatedUpdate: + for _, h := range storage.nodeUpdateHandler { + h.OnRelatedUpdate(node) } - default: - break } + m.nodeEventQueue.Done(item) } } func (m *manager) handleRelationEvent() { for { - select { - case e := <-m.relationEventQueue: - storage := e.preNode.storageRef - if storage == nil { - klog.Errorf("Unexpected nil nodeStorage for relaltion event preNode node %v", e.preNode) - continue - } - handlers := storage.relationUpdateHandler[e.postNode.storageRef.metaKey] - if handlers == nil { - continue - } + item, shutdown := m.relationEventQueue.Get() + if shutdown { + return + } - switch e.eventType { - case EventTypeAdd: - for _, handler := range handlers { - handler.OnAdd(e.preNode, e.postNode) - } - case EventTypeDelete: - for _, handler := range handlers { - handler.OnDelete(e.preNode, e.postNode) - } + info, ok := item.(string) + if !ok { + klog.Errorf("Unexpected relation event queue item %v", item) + continue + } + eventType, preNode, postNode := m.decodeString2RelationEvent(info) + if preNode == nil || postNode == nil { + continue + } + + storage := preNode.storageRef + if storage == nil { + klog.Errorf("Unexpected nil nodeStorage for relaltion event preNode node %v", preNode) + continue + } + handlers := storage.relationUpdateHandler[postNode.storageRef.metaKey] + if handlers == nil { + continue + } + + switch eventType { + case EventTypeAdd: + for _, handler := range handlers { + handler.OnAdd(preNode, postNode) + } + case EventTypeDelete: + for _, handler := range handlers { + handler.OnDelete(preNode, postNode) } - default: - break } + m.relationEventQueue.Done(item) } } func (m *manager) newNodeEvent(info *nodeInfo, eType eventType) { - m.nodeEventQueue <- nodeEvent{ - eventType: eType, - node: info, - } + key := m.encodeNodeEvent2String(eType, info) + m.nodeEventQueue.AddRateLimited(key) } func (m *manager) newRelationEvent(preNode, postNode *nodeInfo, eType eventType) { - m.relationEventQueue <- relationEvent{ - eventType: eType, - preNode: preNode, - postNode: postNode, + key := m.encodeRelationEvent2String(eType, preNode, postNode) + m.relationEventQueue.AddRateLimited(key) +} + +const keySpliter = "%" + +func (m *manager) encodeNodeEvent2String(eType eventType, node *nodeInfo) string { + return strings.Join([]string{ + node.storageRef.meta.APIVersion, + node.storageRef.meta.Kind, + node.cluster, + node.namespace, + node.name, + string(eType), + }, keySpliter) +} + +func (m *manager) decodeString2NodeEvent(key string) (eventType, *nodeInfo) { + info := strings.Split(key, keySpliter) + if len(info) != 6 { + klog.Errorf("Unexpected event key %v", key) + return "", nil + } + + node := m.getOrCreateMockNode(info[0], info[1], info[2], info[3], info[4]) + return eventType(info[5]), node +} + +func (m *manager) encodeRelationEvent2String(eType eventType, preNode, postNode *nodeInfo) string { + return strings.Join([]string{ + preNode.storageRef.meta.APIVersion, + preNode.storageRef.meta.Kind, + preNode.cluster, preNode.namespace, preNode.name, + postNode.storageRef.meta.APIVersion, + postNode.storageRef.meta.Kind, + postNode.cluster, postNode.namespace, postNode.name, + string(eType), + }, keySpliter) +} + +func (m *manager) decodeString2RelationEvent(key string) (eventType, *nodeInfo, *nodeInfo) { + info := strings.Split(key, keySpliter) + if len(info) != 11 { + klog.Errorf("Unexpected relation event key %v", key) + return "", nil, nil + } + preNode := m.getOrCreateMockNode(info[0], info[1], info[2], info[3], info[4]) + postNode := m.getOrCreateMockNode(info[5], info[6], info[7], info[8], info[9]) + + return eventType(info[10]), preNode, postNode +} + +func (m *manager) getOrCreateMockNode(apiVersion, kind, cluster, namespace, name string) *nodeInfo { + s := m.getStorage(metav1.TypeMeta{ + APIVersion: apiVersion, + Kind: kind, + }) + if s == nil { + klog.Errorf("Unexpected nil nodeStorage for %s %s", apiVersion, kind) + return nil + } + node := s.getNode(cluster, namespace, name) + if node == nil { + // node may be deleted, reconstruct it for event handle + node = &nodeInfo{ + storageRef: s, + cluster: cluster, + namespace: namespace, + name: name, + } } + return node } diff --git a/resourcetopo/examples/base_example.go b/resourcetopo/examples/base_example.go index 9b3d13ca..0fa8b0b8 100644 --- a/resourcetopo/examples/base_example.go +++ b/resourcetopo/examples/base_example.go @@ -48,9 +48,7 @@ func main() { topoManager, err := resourcetopo.NewResourcesTopoManager( resourcetopo.ManagerConfig{ - NodeEventQueueSize: 1024, - RelationEventQueueSize: 1024, - TopologyConfig: buildExampleTopologyConfig(), // could also be set later by topoManager.AddTopologyConfig + TopologyConfig: buildExampleTopologyConfig(), // could also be set later by topoManager.AddTopologyConfig }, ) if err != nil { diff --git a/resourcetopo/manager.go b/resourcetopo/manager.go index 93f87c83..e3c83c00 100644 --- a/resourcetopo/manager.go +++ b/resourcetopo/manager.go @@ -22,16 +22,16 @@ import ( "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" ) var _ Manager = &manager{} type manager struct { - relationEventQueue chan relationEvent - nodeEventQueue chan nodeEvent + relationEventQueue workqueue.RateLimitingInterface + nodeEventQueue workqueue.RateLimitingInterface configLock sync.Mutex started bool @@ -40,9 +40,12 @@ type manager struct { func NewResourcesTopoManager(cfg ManagerConfig) (Manager, error) { checkManagerConfig(&cfg) + + nodeRatelimiter := workqueue.NewItemExponentialFailureRateLimiter(cfg.NodeEventHandleRateMinDelay, cfg.NodeEventHandleRateMaxDelay) + relationRatelimiter := workqueue.NewItemExponentialFailureRateLimiter(cfg.RelationEventHandleRateMinDelay, cfg.RelationEventHandleRateMaxDelay) m := &manager{ - nodeEventQueue: make(chan nodeEvent, cfg.NodeEventQueueSize), - relationEventQueue: make(chan relationEvent, cfg.RelationEventQueueSize), + nodeEventQueue: workqueue.NewNamedRateLimitingQueue(nodeRatelimiter, "resourcetopoNodeEventQueue"), + relationEventQueue: workqueue.NewNamedRateLimitingQueue(relationRatelimiter, "resourcetopoRelaltionEventQueue"), storages: make(map[string]*nodeStorage), } @@ -211,10 +214,16 @@ func (m *manager) dagCheck() error { } func checkManagerConfig(c *ManagerConfig) { - if c.NodeEventQueueSize <= 0 { - c.NodeEventQueueSize = defaultNodeEventQueueSize + if c.NodeEventHandleRateMinDelay <= 0 { + c.NodeEventHandleRateMinDelay = defaultNodeEventHandleRateMinDelay + } + if c.NodeEventHandleRateMaxDelay < c.NodeEventHandleRateMinDelay { + c.NodeEventHandleRateMaxDelay = defaultNodeEventHandleRateMaxDelay + } + if c.RelationEventHandleRateMinDelay <= 0 { + c.RelationEventHandleRateMinDelay = defaultRelationEventHandleRateMinDelay } - if c.RelationEventQueueSize <= 0 { - c.RelationEventQueueSize = defaultRelationEventQueueSize + if c.RelationEventHandleRateMaxDelay < c.RelationEventHandleRateMinDelay { + c.RelationEventHandleRateMaxDelay = defaultRelationEventHandleRateMaxDelay } } diff --git a/resourcetopo/node_info.go b/resourcetopo/node_info.go index 72e8bdaa..cebbd573 100644 --- a/resourcetopo/node_info.go +++ b/resourcetopo/node_info.go @@ -35,8 +35,9 @@ type ownerInfo struct { type nodeInfo struct { storageRef *nodeStorage - lock sync.RWMutex + // lock for PreOrders and PostOrders + lock sync.RWMutex // Lists of nodeInfo ref, cached all existed relation for this node. directReferredPreOrders *list.List labelReferredPreOrders *list.List @@ -49,7 +50,9 @@ type nodeInfo struct { name string ownerNodes []ownerInfo labels labels.Set // labels is a ref to object.meta.labels, do not edit! - relations []ResourceRelation + + relations []ResourceRelation + relationsLock sync.RWMutex // objectExisted is added for directRef relation to cache the relation before post object added, // will be updated to true after the object added to cache or the manager is of virtual type. diff --git a/resourcetopo/node_storage.go b/resourcetopo/node_storage.go index 8e46fda9..0e3153a5 100644 --- a/resourcetopo/node_storage.go +++ b/resourcetopo/node_storage.go @@ -301,27 +301,37 @@ func (s *nodeStorage) checkForLabelUpdate(postNode *nodeInfo) { } nodeList := clsNodes.namespaceNodes[ns] for _, n := range nodeList { - if len(n.relations) == 0 { - continue + s.checkLabelUpdateForNode(n, postNode) + } +} + +func (s *nodeStorage) checkLabelUpdateForNode(preNode, postNode *nodeInfo) { + if preNode == nil { + return + } + + preNode.relationsLock.RLock() + defer preNode.relationsLock.RUnlock() + if len(preNode.relations) == 0 { + return + } + for _, relation := range preNode.relations { + if !typeEqual(relation.PostMeta, postNode.storageRef.meta) { + return } - for _, relation := range n.relations { - if !typeEqual(relation.PostMeta, postNode.storageRef.meta) { - continue - } - selector, err := metav1.LabelSelectorAsSelector(relation.LabelSelector) - if err != nil { - klog.Errorf("Failed to resolve selector %v: %s", relation.LabelSelector, err.Error()) - continue + selector, err := metav1.LabelSelectorAsSelector(relation.LabelSelector) + if err != nil { + klog.Errorf("Failed to resolve selector %v: %s", relation.LabelSelector, err.Error()) + return + } + if postNode.matched(selector) { + if _, ok := s.ownerRelation[postNode.storageRef.metaKey]; ok && !postNode.ownerMatched(preNode) { + return } - if postNode.matched(selector) { - if _, ok := s.ownerRelation[postNode.storageRef.metaKey]; ok && !postNode.ownerMatched(n) { - continue - } - rangeAndSetLabelRelation(n, postNode, s.manager) - } else { - if deleteLabelRelation(n, postNode) { - n.postOrderRelationDeleted(postNode) - } + rangeAndSetLabelRelation(preNode, postNode, s.manager) + } else { + if deleteLabelRelation(preNode, postNode) { + preNode.postOrderRelationDeleted(postNode) } } } diff --git a/resourcetopo/node_topology_updater.go b/resourcetopo/node_topology_updater.go index 0d66d514..66ef5277 100644 --- a/resourcetopo/node_topology_updater.go +++ b/resourcetopo/node_topology_updater.go @@ -66,6 +66,7 @@ func (s *nodeStorage) OnUpdate(oldObj, newObj interface{}) { } slices.SortFunc(resolvedRelations, compareResourceRelation) + node.relationsLock.Lock() sortedSlicesCompare(node.relations, resolvedRelations, func(relation ResourceRelation) { s.removeResourceRelation(node, &relation) @@ -75,6 +76,7 @@ func (s *nodeStorage) OnUpdate(oldObj, newObj interface{}) { }, compareResourceRelation) node.relations = resolvedRelations + node.relationsLock.Unlock() if !node.labelEqualed(newTopoObj.GetLabels()) { node.updateNodeMeta(newTopoObj) @@ -137,15 +139,16 @@ func (s *nodeStorage) addNode(obj Object, node *nodeInfo) { klog.Warningf("unexpected relations {%v}", node.relations) node.relations = nil } + node.relationsLock.Lock() for _, resolver := range s.resolvers { relations := resolver.Resolve(obj) node.relations = append(node.relations, relations...) - for _, relation := range relations { - s.addResourceRelation(node, &relation) - } } - slices.SortFunc(node.relations, compareResourceRelation) + for _, relation := range node.relations { + s.addResourceRelation(node, &relation) + } + node.relationsLock.Unlock() for _, discoverer := range s.discoverers { preStorage := s.manager.getStorage(discoverer.PreMeta) @@ -198,7 +201,7 @@ func (s *nodeStorage) removeResourceRelation(node *nodeInfo, relation *ResourceR } if len(relation.DirectRefs) > 0 { for _, ref := range relation.DirectRefs { - postNode := postStorage.getNode(node.cluster, ref.Namespace, ref.Name) + postNode := postStorage.getNode(relation.Cluster, ref.Namespace, ref.Name) if deleteDirectRelation(node, postNode) { postNode.preOrderRelationDeleted(node) } diff --git a/resourcetopo/resourcetopo_test.go b/resourcetopo/resourcetopo_test.go index b7533b2c..8fd7da7c 100644 --- a/resourcetopo/resourcetopo_test.go +++ b/resourcetopo/resourcetopo_test.go @@ -308,6 +308,7 @@ var _ = Describe("test suite with ists config(label selector and virtual rersour podHandler.deleteCallExpected() err = fakeClient.CoreV1().Pods(namespaceDefault).Delete(ctx, pod1Name, metav1.DeleteOptions{}) Expect(err).To(BeNil()) + syncStatus(checkAll) Eventually(func(g Gomega) { pod, _ := podStorage.GetNode(types.NamespacedName{Namespace: namespaceDefault, Name: pod1Name}) @@ -690,6 +691,47 @@ var _ = Describe("test suite with cluster role config(cluster role and direct re }).Should(Succeed()) }) + It("create all, delete clusterrolebinding and create again, should match", func() { + ns := "testclusterresource" + crbName := "crbtest" + crName := "crName" + saName := "saName" + + clusterRoleBindingHandler.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, newClusterRoleBinding(crbName, crName, []types.NamespacedName{{Name: saName, Namespace: ns}}), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + saHandler.addCallExpected() + clusterRoleBindingHandler.relatedCallExpected() + saBindingRelation.addCallExpected() + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, newServiceAccount(ns, saName), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleHandler.addCallExpected() + clusterRoleBindingHandler.relatedCallExpected() + roleBindingRelation.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoles().Create(ctx, newClusterRole(crName), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleBindingHandler.deleteCallExpected() + roleBindingRelation.deleteCallExpected() + saBindingRelation.deleteCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Delete(ctx, crbName, metav1.DeleteOptions{})).NotTo(HaveOccurred()) + syncStatus(checkAll) + + clusterRoleBindingHandler.addCallExpected() + saBindingRelation.addCallExpected() + roleBindingRelation.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, newClusterRoleBinding(crbName, crName, []types.NamespacedName{{Name: saName, Namespace: ns}}), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + Eventually(func(g Gomega) { + clusterrolebindingNode, _ := clusterRoleBindingStorage.GetNode(types.NamespacedName{Name: crbName}) + g.Expect(clusterrolebindingNode).NotTo(BeNil()) + postNodes := clusterrolebindingNode.GetPostOrders() + g.Expect(len(postNodes)).To(Equal(2)) + }).Should(Succeed()) + }) It("create all and delete all", func() { ns := "testclusterresource" crbName := "crbtest" @@ -1313,3 +1355,402 @@ var _ = Describe("test suite with deploy config(label selector and owner referen }).Should(Succeed()) }) }) + +var _ = Describe("test suite with relations update", func() { + var manager Manager + var fakeClient *fake.Clientset + var clusterRoleBindingHandler, clusterRoleHandler, saHandler *objecthandler + var saBindingRelation, roleBindingRelation *relationHandler + + var clusterRoleBindingStorage, clusterroleStorage, saStorage TopoNodeStorage + var ctx context.Context + var cancel func() + + checkAll := func() bool { + return clusterRoleBindingHandler.matchExpected() && + clusterRoleHandler.matchExpected() && + saHandler.matchExpected() && + saBindingRelation.matchExpected() && + roleBindingRelation.matchExpected() + } + + BeforeEach(func() { + fakeClient = fake.NewSimpleClientset() + k8sInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + var err error + manager, err = NewResourcesTopoManager(*buildManagerConfig(buildClusterTest(k8sInformerFactory))) + Expect(err).NotTo(HaveOccurred()) + + clusterRoleBindingStorage, _ = manager.GetTopoNodeStorage(ClusterRoleBindingMeta) + clusterroleStorage, _ = manager.GetTopoNodeStorage(ClusterRoleMeta) + saStorage, _ = manager.GetTopoNodeStorage(ServiceAccountMeta) + + Expect(clusterRoleBindingStorage).NotTo(BeNil()) + Expect(clusterroleStorage).NotTo(BeNil()) + Expect(saStorage).NotTo(BeNil()) + + ctx, cancel = context.WithCancel((context.Background())) + clusterRoleBindingHandler = &objecthandler{} + Expect(manager.AddNodeHandler(ClusterRoleBindingMeta, clusterRoleBindingHandler)).NotTo(HaveOccurred()) + clusterRoleHandler = &objecthandler{} + Expect(manager.AddNodeHandler(ClusterRoleMeta, clusterRoleHandler)).NotTo(HaveOccurred()) + saHandler = &objecthandler{} + Expect(manager.AddNodeHandler(ServiceAccountMeta, saHandler)).NotTo(HaveOccurred()) + + roleBindingRelation = &relationHandler{} + Expect(manager.AddRelationHandler(ClusterRoleBindingMeta, ClusterRoleMeta, roleBindingRelation)).To(BeNil()) + saBindingRelation = &relationHandler{} + Expect(manager.AddRelationHandler(ClusterRoleBindingMeta, ServiceAccountMeta, saBindingRelation)).To(BeNil()) + + manager.Start(ctx.Done()) + k8sInformerFactory.Start(ctx.Done()) + k8sInformerFactory.WaitForCacheSync(ctx.Done()) + }) + AfterEach(func() { + if !checkAll() { + klog.Infof("end with object [%s, %s, %s]", clusterRoleBindingHandler.string(), clusterRoleHandler.string(), saHandler.string()) + klog.Infof("end with relation [%s, %s]", roleBindingRelation.string(), saBindingRelation.string()) + } + cancel() + }) + + It("create two serviceAccounts, clusterRole and clusterRoleBinding", func() { + ns := "testclusterresource" + crbName := "crbtest" + crName := "crName" + saName := "saName" + saName2 := "saName2" + + saHandler.addCallExpected() + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, newServiceAccount(ns, saName), metav1.CreateOptions{})).NotTo(BeNil()) + saHandler.addCallExpected() + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, newServiceAccount(ns, saName2), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleHandler.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoles().Create(ctx, newClusterRole(crName), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleBindingHandler.addCallExpected() + saBindingRelation.addCallExpected() + saBindingRelation.addCallExpected() + roleBindingRelation.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}, {Name: saName2, Namespace: ns}}), + metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) + + It("create all, clear crb spec, and reset", func() { + ns := "testclusterresource" + crbName := "crbtest" + crName := "crName" + saName := "saName" + saName2 := "saName2" + + saHandler.addCallExpected() + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, newServiceAccount(ns, saName), metav1.CreateOptions{})).NotTo(BeNil()) + saHandler.addCallExpected() + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, newServiceAccount(ns, saName2), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleHandler.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoles().Create(ctx, newClusterRole(crName), metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleBindingHandler.addCallExpected() + saBindingRelation.addCallExpected() + saBindingRelation.addCallExpected() + roleBindingRelation.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}, {Name: saName2, Namespace: ns}}), + metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + saBindingRelation.deleteCallExpected() + saBindingRelation.deleteCallExpected() + clusterRoleBindingHandler.updateCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{}), + metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + saBindingRelation.addCallExpected() + clusterRoleBindingHandler.updateCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}}), + metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + saBindingRelation.addCallExpected() + saBindingRelation.deleteCallExpected() + clusterRoleBindingHandler.updateCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName2, Namespace: ns}}), + metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + saBindingRelation.addCallExpected() + saBindingRelation.deleteCallExpected() + clusterRoleBindingHandler.updateCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Update(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}}), + metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) + + It("create two serviceAccounts with clusters and clusterRoleBinding", func() { + ns := "testclusterresource" + crbName := "crbtest" + crName := "crName" + saName := "saName" + saName2 := "saName2" + cluster1 := "cluster1" + cluster2 := "cluster2" + + saHandler.addCallExpected() + sa1 := newServiceAccount(ns, saName) + setObjectCluster(sa1, cluster1) + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, sa1, metav1.CreateOptions{})).NotTo(BeNil()) + saHandler.addCallExpected() + sa2 := newServiceAccount(ns, saName2) + setObjectCluster(sa2, cluster2) + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, sa2, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleBindingHandler.addCallExpected() + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}, {Name: saName2, Namespace: ns}}), + metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) + + It("create two serviceAccounts with clusters and clusterRoleBinding", func() { + ns := "testclusterresource" + crbName := "crbtest" + crName := "crName" + saName := "saName" + saName2 := "saName2" + cluster1 := "cluster1" + cluster2 := "cluster2" + + saHandler.addCallExpected() + sa1 := newServiceAccount(ns, saName) + setObjectCluster(sa1, cluster1) + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, sa1, metav1.CreateOptions{})).NotTo(BeNil()) + saHandler.addCallExpected() + sa2 := newServiceAccount(ns, saName2) + setObjectCluster(sa2, cluster2) + Expect(fakeClient.CoreV1().ServiceAccounts(ns).Create(ctx, sa2, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + clusterRoleBindingHandler.addCallExpected() + saBindingRelation.addCallExpected() + crb := newClusterRoleBinding(crbName, crName, + []types.NamespacedName{{Name: saName, Namespace: ns}, {Name: saName2, Namespace: ns}}) + setObjectCluster(crb, cluster1) + Expect(fakeClient.RbacV1().ClusterRoleBindings().Create(ctx, crb, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) +}) + +var _ = Describe("test suite with mock relation for fed namespaces and local cluster pods", func() { + var manager Manager + var fakeClient *fake.Clientset + var nsHandler *objecthandler + var nsPodRelation *relationHandler + + var nsStorage TopoNodeStorage + var ctx context.Context + var cancel func() + + checkAll := func() bool { + return nsHandler.matchExpected() && + nsPodRelation.matchExpected() + } + + BeforeEach(func() { + fakeClient = fake.NewSimpleClientset() + k8sInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + var err error + manager, err = NewResourcesTopoManager(*buildManagerConfig(buildMultiClustertopoConfig(k8sInformerFactory))) + Expect(err).NotTo(HaveOccurred()) + + nsStorage, _ = manager.GetTopoNodeStorage(NamespaceMeta) + + Expect(nsStorage).NotTo(BeNil()) + + ctx, cancel = context.WithCancel((context.Background())) + nsHandler = &objecthandler{} + Expect(manager.AddNodeHandler(NamespaceMeta, nsHandler)).NotTo(HaveOccurred()) + + nsPodRelation = &relationHandler{} + Expect(manager.AddRelationHandler(NamespaceMeta, PodMeta, nsPodRelation)).To(BeNil()) + + manager.Start(ctx.Done()) + k8sInformerFactory.Start(ctx.Done()) + k8sInformerFactory.WaitForCacheSync(ctx.Done()) + }) + + AfterEach(func() { + if !checkAll() { + klog.Infof("end with object [%s]", nsHandler.string()) + klog.Infof("end with relation [%s]", nsPodRelation.string()) + } + cancel() + }) + + It("create fed ns and local pod ", func() { + nsName := "ns" + podName := "podName" + fedCluster := "fed" + localCluster := "localCluster" + + ns1 := newNamespaceWithCluster(nsName, fedCluster) + pod := newPod(nsName, podName) + setObjectCluster(pod, localCluster) + setMultiClusterDepend(ns1, []MultiClusterDepend{{ + Cluster: localCluster, + Namespace: nsName, + Name: podName, + }}) + + nsHandler.addCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Create(ctx, ns1, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) + + It("create fed ns and local pod in different clusters, test pod recreation", func() { + nsName := "ns" + podName := "podName" + podName2 := "podName2" + fedCluster := "fed" + localCluster := "localCluster" + localCluster2 := "localCluster2" + + ns1 := newNamespaceWithCluster(nsName, fedCluster) + pod := newPod(nsName, podName) + setObjectCluster(pod, localCluster) + pod2 := newPod(nsName, podName2) + setObjectCluster(pod2, localCluster2) + setMultiClusterDepend(ns1, []MultiClusterDepend{{ + Cluster: localCluster, + Namespace: nsName, + Name: podName, + }, + { + Cluster: localCluster2, + Namespace: nsName, + Name: podName2, + }, + }) + + nsHandler.addCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Create(ctx, ns1, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.deleteCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Delete(ctx, podName, metav1.DeleteOptions{})).To(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod2, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) + + It("create ns and pod in different cluster, test relation config changed", func() { + nsName := "ns" + podName := "podName" + podName2 := "podName2" + fedCluster := "fed" + localCluster := "localCluster" + localCluster2 := "localCluster2" + + ns1 := newNamespaceWithCluster(nsName, fedCluster) + pod := newPod(nsName, podName) + setObjectCluster(pod, localCluster) + pod2 := newPod(nsName, podName2) + setObjectCluster(pod2, localCluster2) + setMultiClusterDepend(ns1, []MultiClusterDepend{ + { + Cluster: localCluster, + Namespace: nsName, + Name: podName, + }, + { + Cluster: localCluster2, + Namespace: nsName, + Name: podName2, + }, + }) + + nsHandler.addCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Create(ctx, ns1, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod2, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + setMultiClusterDepend(ns1, []MultiClusterDepend{}) + nsHandler.updateCallExpected() + nsPodRelation.deleteCallExpected() + nsPodRelation.deleteCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Update(ctx, ns1, metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + setMultiClusterDepend(ns1, []MultiClusterDepend{{ + Cluster: localCluster, + Namespace: nsName, + Name: podName, + }}) + nsHandler.updateCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Update(ctx, ns1, metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.deleteCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Delete(ctx, pod.Name, metav1.DeleteOptions{})).To(BeNil()) + syncStatus(checkAll) + + nsHandler.relatedCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Pods(nsName).Create(ctx, pod, metav1.CreateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + + setMultiClusterDepend(ns1, []MultiClusterDepend{{ + Cluster: localCluster2, + Namespace: nsName, + Name: podName2, + }}) + nsHandler.updateCallExpected() + nsPodRelation.deleteCallExpected() + nsPodRelation.addCallExpected() + Expect(fakeClient.CoreV1().Namespaces().Update(ctx, ns1, metav1.UpdateOptions{})).NotTo(BeNil()) + syncStatus(checkAll) + }) +}) diff --git a/resourcetopo/resourcetopo_utils_test.go b/resourcetopo/resourcetopo_utils_test.go index a0d4b8fd..9cdfbd58 100644 --- a/resourcetopo/resourcetopo_utils_test.go +++ b/resourcetopo/resourcetopo_utils_test.go @@ -17,6 +17,7 @@ package resourcetopo import ( + "encoding/json" "fmt" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -28,6 +29,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/klog/v2" + + "kusionstack.io/kube-utils/multicluster/clusterinfo" ) const namespaceDefault = "default" @@ -42,6 +45,7 @@ var ( ClusterRoleBindingMeta = metav1.TypeMeta{Kind: "ClusterRoleBinding", APIVersion: "rbac/v1"} ClusterRoleMeta = metav1.TypeMeta{Kind: "ClusterRole", APIVersion: "rbac/v1"} ServiceAccountMeta = metav1.TypeMeta{Kind: "ServiceAccount", APIVersion: "core/v1"} + NamespaceMeta = metav1.TypeMeta{Kind: "Namespace", APIVersion: "core/v1"} ) func GetInformer(meta metav1.TypeMeta, k8sInformerFactory informers.SharedInformerFactory) Informer { @@ -63,6 +67,8 @@ func GetInformer(meta metav1.TypeMeta, k8sInformerFactory informers.SharedInform return k8sInformerFactory.Apps().V1().Deployments().Informer() case ReplicaSetMeta.String(): return k8sInformerFactory.Apps().V1().ReplicaSets().Informer() + case NamespaceMeta.String(): + return k8sInformerFactory.Core().V1().Namespaces().Informer() default: return nil @@ -133,6 +139,7 @@ func buildClusterTest(k8sInformerFactory informers.SharedInformerFactory) *Topol return nil } var saRefs []types.NamespacedName + cluster := getObjectCluster(crbObject) for _, s := range crbObject.Subjects { switch s.Kind { @@ -143,6 +150,7 @@ func buildClusterTest(k8sInformerFactory informers.SharedInformerFactory) *Topol return []ResourceRelation{ { PostMeta: ClusterRoleMeta, + Cluster: cluster, DirectRefs: []types.NamespacedName{ { Name: crbObject.RoleRef.Name, @@ -151,6 +159,7 @@ func buildClusterTest(k8sInformerFactory informers.SharedInformerFactory) *Topol }, { PostMeta: ServiceAccountMeta, + Cluster: cluster, DirectRefs: saRefs, }, } @@ -236,6 +245,39 @@ func buildDeployTopoConfig(k8sInformerFactory informers.SharedInformerFactory) * } } +func buildMultiClustertopoConfig(k8sInformerFactory informers.SharedInformerFactory) *TopologyConfig { + return &TopologyConfig{ + GetInformer: func(meta metav1.TypeMeta) Informer { + return GetInformer(meta, k8sInformerFactory) + }, + Resolvers: []RelationResolver{ + { + PreMeta: NamespaceMeta, + PostMetas: []metav1.TypeMeta{PodMeta}, + Resolve: func(preOrder Object) []ResourceRelation { + preObj, ok := preOrder.(*corev1.Namespace) + if !ok { + return nil + } + depends := getMultiClusterDepend(&preObj.ObjectMeta) + var relations []ResourceRelation + for _, v := range depends { + relations = append(relations, ResourceRelation{ + PostMeta: PodMeta, + Cluster: v.Cluster, + DirectRefs: []types.NamespacedName{{ + Name: v.Name, + Namespace: v.Namespace, + }}, + }) + } + + return relations + }, + }, + }, + } +} func newPod(namespace, name string, labels ...string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: *newObjectMeta(namespace, name, labels), @@ -359,6 +401,20 @@ func newServiceAccount(namespace, name string, labels ...string) *corev1.Service } } +func newNamespaceWithCluster(name string, cluster string) *corev1.Namespace { + ns := &corev1.Namespace{ + ObjectMeta: *newObjectMeta("", name, nil), + } + setObjectCluster(ns, cluster) + return ns +} + +type MultiClusterDepend struct { + Cluster string `json:"cluster"` + Namespace string `json:"namespace"` + Name string `json:"name"` +} + func syncStatus(f func() bool) { retry.RunWith(retry.TwoSeconds(), GinkgoT(), func(r *retry.R) { if !f() { @@ -387,6 +443,44 @@ func setOwner(object metav1.Object, meta metav1.TypeMeta, ownerName string) { })) } +func setObjectCluster(obj Object, cluster string) { + if labels := obj.GetLabels(); labels != nil { + labels[clusterinfo.ClusterLabelKey] = cluster + } else { + panic("labels is nil") + } +} + +const multiClusterDependKey = "kusionstack.io/depends-on" + +func setMultiClusterDepend(object metav1.Object, depends []MultiClusterDepend) { + if info, err := json.Marshal(depends); err != nil { + panic(err) + } else { + anno := object.GetAnnotations() + if anno == nil { + anno = make(map[string]string) + object.SetAnnotations(anno) + } + anno[multiClusterDependKey] = string(info) + } +} + +func getMultiClusterDepend(object metav1.Object) []MultiClusterDepend { + if len(object.GetAnnotations()) == 0 { + return nil + } + if info, ok := object.GetAnnotations()[multiClusterDependKey]; ok { + var depends []MultiClusterDepend + if err := json.Unmarshal([]byte(info), &depends); err != nil { + panic(err) + } + return depends + } else { + return nil + } +} + var _ NodeHandler = &objecthandler{} type objecthandler struct { diff --git a/resourcetopo/types.go b/resourcetopo/types.go index 9e16eec4..5950fca7 100644 --- a/resourcetopo/types.go +++ b/resourcetopo/types.go @@ -17,6 +17,8 @@ package resourcetopo import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" toolscache "k8s.io/client-go/tools/cache" @@ -40,10 +42,13 @@ type TopologyConfig struct { } type ManagerConfig struct { - NodeEventQueueSize int // size of NodeEvent cache channel, defaults to defaultNodeEventQueueSize - RelationEventQueueSize int // size of RelationEvent cache channel, defaults to defaultRelationEventQueueSize - TopologyConfig *TopologyConfig + + // Event handle rate limit config for node events and relation events + NodeEventHandleRateMinDelay time.Duration + NodeEventHandleRateMaxDelay time.Duration + RelationEventHandleRateMinDelay time.Duration + RelationEventHandleRateMaxDelay time.Duration } type Manager interface {