Skip to content

Commit

Permalink
fix: workflowrun limits problem (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
cd1989 committed Jan 4, 2019
1 parent f88f8d0 commit b6c7b20
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 32 deletions.
11 changes: 0 additions & 11 deletions pkg/common/constant.go

This file was deleted.

7 changes: 5 additions & 2 deletions pkg/workflow/controller/handlers/workflowrun/handler.go
Expand Up @@ -26,7 +26,7 @@ func (h *Handler) ObjectCreated(obj interface{}) {
log.Warning("unknown resource type")
return
}
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun.")
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun create")

// AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the
// maximum size, the oldest one would be deleted. And if the WorkflowRun already exists in
Expand Down Expand Up @@ -65,7 +65,10 @@ func (h *Handler) ObjectUpdated(obj interface{}) {
log.Warning("unknown resource type")
return
}
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun.")
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun update")

// Refresh updates 'refresh' time field of the WorkflowRun in the queue.
h.LimitedQueues.Refresh(originWfr)

// Add the WorkflowRun object to GC processor, it will be checked before actually added to
// the GC queue.
Expand Down
85 changes: 66 additions & 19 deletions pkg/workflow/workflowrun/limits.go
Expand Up @@ -42,56 +42,87 @@ func key(wfr *v1alpha1.WorkflowRun) string {
return fmt.Sprintf("%s/%s", wfr.Spec.WorkflowRef.Namespace, wfr.Spec.WorkflowRef.Name)
}

// Refresh refreshes the WorkflowRun in the queue, the refresh time would be updated.
func (w *LimitedQueues) Refresh(wfr *v1alpha1.WorkflowRun) {
q, ok := w.Queues[key(wfr)]
if !ok {
log.WithField("key", key(wfr)).Warn("Queue not exist")
return
}

q.Refresh(wfr)
}

// AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the
// oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field
// would be refreshed.
func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun) {
q, ok := w.Queues[key(wfr)]
if !ok {
q = NewQueue(w.MaxQueueSize)
q = NewQueue(key(wfr), w.MaxQueueSize)
w.Queues[key(wfr)] = q
}

// PushOrRefresh push the WorkflowRun to the queue. If it's already existed in the queue, its refresh
// time would be updated to now.
q.PushOrRefresh(wfr)

if q.size > w.MaxQueueSize {
for q.size > w.MaxQueueSize {
log.WithField("max", w.MaxQueueSize).Debug("Max WorkflowRun exceeded, delete the oldest one")
old := q.Pop()
err := w.Client.CycloneV1alpha1().WorkflowRuns(old.namespace).Delete(old.wfr, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
log.WithField("wfr", old.wfr).Error("Delete old WorkflowRun error: ", err)
} else {
log.WithField("wfr", old.wfr).Info("Old WorkflowRun deleted")
}
}
}

// AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough
// refresh time.
func (w *LimitedQueues) AutoScan() {
ticker := time.NewTicker(time.Minute * 30)
ticker := time.NewTicker(time.Hour)
for {
select {
case <-ticker.C:
for _, q := range w.Queues {
h := q.head
for h.next != nil {
// If the node's refresh time is old enough compared to the resync time
// (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun
// is actually removed from etcd somehow, so we will remove it also here.
if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) {
h.next = h.next.next
}
}
scanQueue(q)
}
}
}
}

// scanQueue scans all WorkflowRun in the queue, check their refresh time with current time, if refresh
// time is old enough, it means WorkflowRun is actually deleted in k8s, but somehow Workflow Controller
// didn't know (this seldom happen), in this case, remove the WorkflowRun from the queue.
func scanQueue(q *LimitedSortedQueue) {
q.lock.Lock()
defer q.lock.Unlock()

h := q.head
for h.next != nil {
// If the node's refresh time is old enough compared to the resync time
// (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun
// is actually removed from etcd somehow, so we will remove it also here.
if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) {
log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue")
h.next = h.next.next
q.size--
continue
}

h = h.next
}
}

// LimitedSortedQueue is a sorted fixed length queue implemented with single linked list.
// Note that each queue would have a sentinel node to assist the implementation, it's a
// dummy node, and won't be counted in the queue size. So an empty queue would have head
// pointed to dummy node, with queue size 0.
type LimitedSortedQueue struct {
// Key of the Workflow, it's generated by namespace and workflow name
key string
// Lock to for concurrency control
lock sync.Mutex
// Maximum queue size
Expand All @@ -103,9 +134,10 @@ type LimitedSortedQueue struct {
}

// NewQueue creates a limited sorted queue.
func NewQueue(max int) *LimitedSortedQueue {
func NewQueue(key string, max int) *LimitedSortedQueue {
dummy := &Node{}
return &LimitedSortedQueue{
key: key,
max: max,
size: 0,
head: dummy,
Expand Down Expand Up @@ -149,22 +181,37 @@ func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun) {
refresh: time.Now(),
}

if q.Refresh(wfr) {
return
}

p := q.head
for p.next != nil && p.next.created < node.created {
p = p.next
}

// If the WorkflowRun already existed in the queue, update its refresh time.
if p.next != nil && p.next.wfr == wfr.Name && p.next.namespace == wfr.Namespace {
p.next.refresh = time.Now()
return
}

node.next = p.next
p.next = node
q.size++
}

// Refresh updates refresh time of WorkflowRun in the queue, if the WorkflowRun found in the queue
// and update successfully, return true, otherwise return false.
func (q *LimitedSortedQueue) Refresh(wfr *v1alpha1.WorkflowRun) bool {
p := q.head
for p.next != nil && (p.next.namespace != wfr.Namespace || p.next.wfr != wfr.Name) {
p = p.next
}

if p.next != nil {
log.WithField("queue", q.key).WithField("wfr", wfr.Name).Debug("Update refresh time")
p.next.refresh = time.Now()
return true
}

return false
}

// Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped.
func (q *LimitedSortedQueue) Pop() *Node {
if q.size <= 0 {
Expand Down

0 comments on commit b6c7b20

Please sign in to comment.