forked from goodrain/rainbond
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
145 lines (119 loc) · 3.35 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2019 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package envoy
import (
"sync"
"time"
"github.com/Sirupsen/logrus"
)
//Event event
type Event int
const (
// EventAdd is sent when an object is added
EventAdd Event = iota
// EventUpdate is sent when an object is modified
// Captures the modified object
EventUpdate
// EventDelete is sent when an object is deleted
// Captures the object at the last known state
EventDelete
)
// Queue of work tickets processed using a rate-limiting loop
type Queue interface {
// Push a ticket
Push(Task)
// Run the loop until a signal on the channel
Run(<-chan struct{})
}
// Handler specifies a function to apply on an object for a given event type
type Handler func(obj interface{}, event Event) error
// Task object for the event watchers; processes until handler succeeds
type Task struct {
handler Handler
obj interface{}
event Event
}
// NewTask creates a task from a work item
func NewTask(handler Handler, obj interface{}, event Event) Task {
return Task{handler: handler, obj: obj, event: event}
}
type queueImpl struct {
delay time.Duration
queue []Task
cond *sync.Cond
closing bool
}
// NewQueue instantiates a queue with a processing function
func NewQueue(errorDelay time.Duration) Queue {
return &queueImpl{
delay: errorDelay,
queue: make([]Task, 0),
closing: false,
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.queue = append(q.queue, item)
}
q.cond.Signal()
}
func (q *queueImpl) Run(stop <-chan struct{}) {
go func() {
<-stop
q.cond.L.Lock()
q.closing = true
q.cond.L.Unlock()
}()
for {
q.cond.L.Lock()
for !q.closing && len(q.queue) == 0 {
q.cond.Wait()
}
if len(q.queue) == 0 {
q.cond.L.Unlock()
// We must be shutting down.
return
}
var item Task
item, q.queue = q.queue[0], q.queue[1:]
q.cond.L.Unlock()
if err := item.handler(item.obj, item.event); err != nil {
logrus.Infof("Work item handle failed (%v), retry after delay %v", err, q.delay)
time.AfterFunc(q.delay, func() {
q.Push(item)
})
}
}
}
// ChainHandler applies handlers in a sequence
type ChainHandler struct {
funcs []Handler
}
// Apply is the handler function
func (ch *ChainHandler) Apply(obj interface{}, event Event) error {
for _, f := range ch.funcs {
if err := f(obj, event); err != nil {
return err
}
}
return nil
}
// Append a handler as the last handler in the chain
func (ch *ChainHandler) Append(h Handler) {
ch.funcs = append(ch.funcs, h)
}