Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 150 additions & 71 deletions resourcetopo/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package resourcetopo

import (
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
Expand All @@ -33,103 +35,180 @@ const (
)

const (
defaultNodeEventQueueSize = 1000
defaultRelationEventQueueSize = 1000
defaultNodeEventHandlePeriod = time.Second
defaultRelationEventHandlePeriod = time.Second
defaultNodeEventHandleRateMinDelay = time.Millisecond
defaultNodeEventHandleRateMaxDelay = time.Second
defaultRelationEventHandleRateMinDelay = time.Millisecond
defaultRelationEventHandleRateMaxDelay = time.Second
defaultNodeEventHandlePeriod = time.Second
defaultRelationEventHandlePeriod = time.Second
)

type nodeEvent struct {
eventType eventType
node *nodeInfo
}

type relationEvent struct {
eventType eventType
preNode *nodeInfo
postNode *nodeInfo
}

func (m *manager) startHandleEvent(stopCh <-chan struct{}) {
go wait.Until(m.handleNodeEvent, defaultNodeEventHandlePeriod, stopCh)
go wait.Until(m.handleRelationEvent, defaultRelationEventHandlePeriod, stopCh)
}

func (m *manager) handleNodeEvent() {
for {
select {
case e := <-m.nodeEventQueue:
storage := e.node.storageRef
if storage == nil {
klog.Errorf("Unexpected nil nodeStorage for nodeEvent node %v", e.node)
continue
}
item, shutdown := m.nodeEventQueue.Get()
if shutdown {
return
}
info, ok := item.(string)
if !ok {
klog.Errorf("Unexpected node event queue item %v", item)
continue
}
eventType, node := m.decodeString2NodeEvent(info)
if node == nil {
continue
}

switch e.eventType {
case EventTypeAdd:
for _, h := range storage.nodeUpdateHandler {
h.OnAdd(e.node)
}
case EventTypeUpdate:
for _, h := range storage.nodeUpdateHandler {
h.OnUpdate(e.node)
}
case EventTypeDelete:
for _, h := range storage.nodeUpdateHandler {
h.OnDelete(e.node)
}
case EventTypeRelatedUpdate:
for _, h := range storage.nodeUpdateHandler {
h.OnRelatedUpdate(e.node)
}
storage := node.storageRef
if storage == nil {
klog.Errorf("Unexpected nil nodeStorage for nodeEvent node %v", node)
continue
}

switch eventType {
case EventTypeAdd:
for _, h := range storage.nodeUpdateHandler {
h.OnAdd(node)
}
case EventTypeUpdate:
for _, h := range storage.nodeUpdateHandler {
h.OnUpdate(node)
}
case EventTypeDelete:
for _, h := range storage.nodeUpdateHandler {
h.OnDelete(node)
}
case EventTypeRelatedUpdate:
for _, h := range storage.nodeUpdateHandler {
h.OnRelatedUpdate(node)
}
default:
break
}
m.nodeEventQueue.Done(item)
}
}

func (m *manager) handleRelationEvent() {
for {
select {
case e := <-m.relationEventQueue:
storage := e.preNode.storageRef
if storage == nil {
klog.Errorf("Unexpected nil nodeStorage for relaltion event preNode node %v", e.preNode)
continue
}
handlers := storage.relationUpdateHandler[e.postNode.storageRef.metaKey]
if handlers == nil {
continue
}
item, shutdown := m.relationEventQueue.Get()
if shutdown {
return
}

switch e.eventType {
case EventTypeAdd:
for _, handler := range handlers {
handler.OnAdd(e.preNode, e.postNode)
}
case EventTypeDelete:
for _, handler := range handlers {
handler.OnDelete(e.preNode, e.postNode)
}
info, ok := item.(string)
if !ok {
klog.Errorf("Unexpected relation event queue item %v", item)
continue
}
eventType, preNode, postNode := m.decodeString2RelationEvent(info)
if preNode == nil || postNode == nil {
continue
}

storage := preNode.storageRef
if storage == nil {
klog.Errorf("Unexpected nil nodeStorage for relaltion event preNode node %v", preNode)
continue
}
handlers := storage.relationUpdateHandler[postNode.storageRef.metaKey]
if handlers == nil {
continue
}

switch eventType {
case EventTypeAdd:
for _, handler := range handlers {
handler.OnAdd(preNode, postNode)
}
case EventTypeDelete:
for _, handler := range handlers {
handler.OnDelete(preNode, postNode)
}
default:
break
}
m.relationEventQueue.Done(item)
}
}

func (m *manager) newNodeEvent(info *nodeInfo, eType eventType) {
m.nodeEventQueue <- nodeEvent{
eventType: eType,
node: info,
}
key := m.encodeNodeEvent2String(eType, info)
m.nodeEventQueue.AddRateLimited(key)
}

func (m *manager) newRelationEvent(preNode, postNode *nodeInfo, eType eventType) {
m.relationEventQueue <- relationEvent{
eventType: eType,
preNode: preNode,
postNode: postNode,
key := m.encodeRelationEvent2String(eType, preNode, postNode)
m.relationEventQueue.AddRateLimited(key)
}

const keySpliter = "%"

func (m *manager) encodeNodeEvent2String(eType eventType, node *nodeInfo) string {
return strings.Join([]string{
node.storageRef.meta.APIVersion,
node.storageRef.meta.Kind,
node.cluster,
node.namespace,
node.name,
string(eType),
}, keySpliter)
}

func (m *manager) decodeString2NodeEvent(key string) (eventType, *nodeInfo) {
info := strings.Split(key, keySpliter)
if len(info) != 6 {
klog.Errorf("Unexpected event key %v", key)
return "", nil
}

node := m.getOrCreateMockNode(info[0], info[1], info[2], info[3], info[4])
return eventType(info[5]), node
}

func (m *manager) encodeRelationEvent2String(eType eventType, preNode, postNode *nodeInfo) string {
return strings.Join([]string{
preNode.storageRef.meta.APIVersion,
preNode.storageRef.meta.Kind,
preNode.cluster, preNode.namespace, preNode.name,
postNode.storageRef.meta.APIVersion,
postNode.storageRef.meta.Kind,
postNode.cluster, postNode.namespace, postNode.name,
string(eType),
}, keySpliter)
}

func (m *manager) decodeString2RelationEvent(key string) (eventType, *nodeInfo, *nodeInfo) {
info := strings.Split(key, keySpliter)
if len(info) != 11 {
klog.Errorf("Unexpected relation event key %v", key)
return "", nil, nil
}
preNode := m.getOrCreateMockNode(info[0], info[1], info[2], info[3], info[4])
postNode := m.getOrCreateMockNode(info[5], info[6], info[7], info[8], info[9])

return eventType(info[10]), preNode, postNode
}

func (m *manager) getOrCreateMockNode(apiVersion, kind, cluster, namespace, name string) *nodeInfo {
s := m.getStorage(metav1.TypeMeta{
APIVersion: apiVersion,
Kind: kind,
})
if s == nil {
klog.Errorf("Unexpected nil nodeStorage for %s %s", apiVersion, kind)
return nil
}
node := s.getNode(cluster, namespace, name)
if node == nil {
// node may be deleted, reconstruct it for event handle
node = &nodeInfo{
storageRef: s,
cluster: cluster,
namespace: namespace,
name: name,
}
}
return node
}
4 changes: 1 addition & 3 deletions resourcetopo/examples/base_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ func main() {

topoManager, err := resourcetopo.NewResourcesTopoManager(
resourcetopo.ManagerConfig{
NodeEventQueueSize: 1024,
RelationEventQueueSize: 1024,
TopologyConfig: buildExampleTopologyConfig(), // could also be set later by topoManager.AddTopologyConfig
TopologyConfig: buildExampleTopologyConfig(), // could also be set later by topoManager.AddTopologyConfig
},
)
if err != nil {
Expand Down
27 changes: 18 additions & 9 deletions resourcetopo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

var _ Manager = &manager{}

type manager struct {
relationEventQueue chan relationEvent
nodeEventQueue chan nodeEvent
relationEventQueue workqueue.RateLimitingInterface
nodeEventQueue workqueue.RateLimitingInterface
configLock sync.Mutex
started bool

Expand All @@ -40,9 +40,12 @@ type manager struct {

func NewResourcesTopoManager(cfg ManagerConfig) (Manager, error) {
checkManagerConfig(&cfg)

nodeRatelimiter := workqueue.NewItemExponentialFailureRateLimiter(cfg.NodeEventHandleRateMinDelay, cfg.NodeEventHandleRateMaxDelay)
relationRatelimiter := workqueue.NewItemExponentialFailureRateLimiter(cfg.RelationEventHandleRateMinDelay, cfg.RelationEventHandleRateMaxDelay)
m := &manager{
nodeEventQueue: make(chan nodeEvent, cfg.NodeEventQueueSize),
relationEventQueue: make(chan relationEvent, cfg.RelationEventQueueSize),
nodeEventQueue: workqueue.NewNamedRateLimitingQueue(nodeRatelimiter, "resourcetopoNodeEventQueue"),
relationEventQueue: workqueue.NewNamedRateLimitingQueue(relationRatelimiter, "resourcetopoRelaltionEventQueue"),
storages: make(map[string]*nodeStorage),
}

Expand Down Expand Up @@ -211,10 +214,16 @@ func (m *manager) dagCheck() error {
}

func checkManagerConfig(c *ManagerConfig) {
if c.NodeEventQueueSize <= 0 {
c.NodeEventQueueSize = defaultNodeEventQueueSize
if c.NodeEventHandleRateMinDelay <= 0 {
c.NodeEventHandleRateMinDelay = defaultNodeEventHandleRateMinDelay
}
if c.NodeEventHandleRateMaxDelay < c.NodeEventHandleRateMinDelay {
c.NodeEventHandleRateMaxDelay = defaultNodeEventHandleRateMaxDelay
}
if c.RelationEventHandleRateMinDelay <= 0 {
c.RelationEventHandleRateMinDelay = defaultRelationEventHandleRateMinDelay
}
if c.RelationEventQueueSize <= 0 {
c.RelationEventQueueSize = defaultRelationEventQueueSize
if c.RelationEventHandleRateMaxDelay < c.RelationEventHandleRateMinDelay {
c.RelationEventHandleRateMaxDelay = defaultRelationEventHandleRateMaxDelay
}
}
7 changes: 5 additions & 2 deletions resourcetopo/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type ownerInfo struct {

type nodeInfo struct {
storageRef *nodeStorage
lock sync.RWMutex

// lock for PreOrders and PostOrders
lock sync.RWMutex
// Lists of nodeInfo ref, cached all existed relation for this node.
directReferredPreOrders *list.List
labelReferredPreOrders *list.List
Expand All @@ -49,7 +50,9 @@ type nodeInfo struct {
name string
ownerNodes []ownerInfo
labels labels.Set // labels is a ref to object.meta.labels, do not edit!
relations []ResourceRelation

relations []ResourceRelation
relationsLock sync.RWMutex

// objectExisted is added for directRef relation to cache the relation before post object added,
// will be updated to true after the object added to cache or the manager is of virtual type.
Expand Down
Loading
Loading