From b6c7b20c21abd8e99a336fdba0084eef584e5968 Mon Sep 17 00:00:00 2001 From: De Chen Date: Fri, 4 Jan 2019 14:50:33 +0800 Subject: [PATCH] fix: workflowrun limits problem (#714) --- pkg/common/constant.go | 11 --- .../handlers/workflowrun/handler.go | 7 +- pkg/workflow/workflowrun/limits.go | 85 ++++++++++++++----- 3 files changed, 71 insertions(+), 32 deletions(-) delete mode 100644 pkg/common/constant.go diff --git a/pkg/common/constant.go b/pkg/common/constant.go deleted file mode 100644 index 669de6187..000000000 --- a/pkg/common/constant.go +++ /dev/null @@ -1,11 +0,0 @@ -package common - -const ( - // CloneDir represents the dir which the repo clone to. - CloneDir = "/tmp/code" - - // GoTestReport represents the file name of golang test report. - // If user configured the golang test report in unit-test commands (eg: go test -coverprofile=coverage.out), - // We will cp the file(coverage.out) to go_test_report.cyclone, and use it at code-scan stage. - GoTestReport = "go_test_report.cyclone" -) diff --git a/pkg/workflow/controller/handlers/workflowrun/handler.go b/pkg/workflow/controller/handlers/workflowrun/handler.go index 591aea875..8aca6196f 100644 --- a/pkg/workflow/controller/handlers/workflowrun/handler.go +++ b/pkg/workflow/controller/handlers/workflowrun/handler.go @@ -26,7 +26,7 @@ func (h *Handler) ObjectCreated(obj interface{}) { log.Warning("unknown resource type") return } - log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun.") + log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun create") // AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the // maximum size, the oldest one would be deleted. And if the WorkflowRun already exists in @@ -65,7 +65,10 @@ func (h *Handler) ObjectUpdated(obj interface{}) { log.Warning("unknown resource type") return } - log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun.") + log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun update") + + // Refresh updates 'refresh' time field of the WorkflowRun in the queue. + h.LimitedQueues.Refresh(originWfr) // Add the WorkflowRun object to GC processor, it will be checked before actually added to // the GC queue. diff --git a/pkg/workflow/workflowrun/limits.go b/pkg/workflow/workflowrun/limits.go index ed586fe3c..c55bd1c98 100644 --- a/pkg/workflow/workflowrun/limits.go +++ b/pkg/workflow/workflowrun/limits.go @@ -42,13 +42,24 @@ func key(wfr *v1alpha1.WorkflowRun) string { return fmt.Sprintf("%s/%s", wfr.Spec.WorkflowRef.Namespace, wfr.Spec.WorkflowRef.Name) } +// Refresh refreshes the WorkflowRun in the queue, the refresh time would be updated. +func (w *LimitedQueues) Refresh(wfr *v1alpha1.WorkflowRun) { + q, ok := w.Queues[key(wfr)] + if !ok { + log.WithField("key", key(wfr)).Warn("Queue not exist") + return + } + + q.Refresh(wfr) +} + // AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the // oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field // would be refreshed. func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun) { q, ok := w.Queues[key(wfr)] if !ok { - q = NewQueue(w.MaxQueueSize) + q = NewQueue(key(wfr), w.MaxQueueSize) w.Queues[key(wfr)] = q } @@ -56,11 +67,14 @@ func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun) { // time would be updated to now. q.PushOrRefresh(wfr) - if q.size > w.MaxQueueSize { + for q.size > w.MaxQueueSize { + log.WithField("max", w.MaxQueueSize).Debug("Max WorkflowRun exceeded, delete the oldest one") old := q.Pop() err := w.Client.CycloneV1alpha1().WorkflowRuns(old.namespace).Delete(old.wfr, &metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { log.WithField("wfr", old.wfr).Error("Delete old WorkflowRun error: ", err) + } else { + log.WithField("wfr", old.wfr).Info("Old WorkflowRun deleted") } } } @@ -68,30 +82,47 @@ func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun) { // AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough // refresh time. func (w *LimitedQueues) AutoScan() { - ticker := time.NewTicker(time.Minute * 30) + ticker := time.NewTicker(time.Hour) for { select { case <-ticker.C: for _, q := range w.Queues { - h := q.head - for h.next != nil { - // If the node's refresh time is old enough compared to the resync time - // (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun - // is actually removed from etcd somehow, so we will remove it also here. - if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) { - h.next = h.next.next - } - } + scanQueue(q) } } } } +// scanQueue scans all WorkflowRun in the queue, check their refresh time with current time, if refresh +// time is old enough, it means WorkflowRun is actually deleted in k8s, but somehow Workflow Controller +// didn't know (this seldom happen), in this case, remove the WorkflowRun from the queue. +func scanQueue(q *LimitedSortedQueue) { + q.lock.Lock() + defer q.lock.Unlock() + + h := q.head + for h.next != nil { + // If the node's refresh time is old enough compared to the resync time + // (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun + // is actually removed from etcd somehow, so we will remove it also here. + if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) { + log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue") + h.next = h.next.next + q.size-- + continue + } + + h = h.next + } +} + // LimitedSortedQueue is a sorted fixed length queue implemented with single linked list. // Note that each queue would have a sentinel node to assist the implementation, it's a // dummy node, and won't be counted in the queue size. So an empty queue would have head // pointed to dummy node, with queue size 0. type LimitedSortedQueue struct { + // Key of the Workflow, it's generated by namespace and workflow name + key string // Lock to for concurrency control lock sync.Mutex // Maximum queue size @@ -103,9 +134,10 @@ type LimitedSortedQueue struct { } // NewQueue creates a limited sorted queue. -func NewQueue(max int) *LimitedSortedQueue { +func NewQueue(key string, max int) *LimitedSortedQueue { dummy := &Node{} return &LimitedSortedQueue{ + key: key, max: max, size: 0, head: dummy, @@ -149,22 +181,37 @@ func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun) { refresh: time.Now(), } + if q.Refresh(wfr) { + return + } + p := q.head for p.next != nil && p.next.created < node.created { p = p.next } - // If the WorkflowRun already existed in the queue, update its refresh time. - if p.next != nil && p.next.wfr == wfr.Name && p.next.namespace == wfr.Namespace { - p.next.refresh = time.Now() - return - } - node.next = p.next p.next = node q.size++ } +// Refresh updates refresh time of WorkflowRun in the queue, if the WorkflowRun found in the queue +// and update successfully, return true, otherwise return false. +func (q *LimitedSortedQueue) Refresh(wfr *v1alpha1.WorkflowRun) bool { + p := q.head + for p.next != nil && (p.next.namespace != wfr.Namespace || p.next.wfr != wfr.Name) { + p = p.next + } + + if p.next != nil { + log.WithField("queue", q.key).WithField("wfr", wfr.Name).Debug("Update refresh time") + p.next.refresh = time.Now() + return true + } + + return false +} + // Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped. func (q *LimitedSortedQueue) Pop() *Node { if q.size <= 0 {