forked from ha/doozerd
/
timer.go
124 lines (102 loc) · 2.27 KB
/
timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package timer
import (
"container/heap"
"container/vector"
"doozer/store"
"doozer/util"
"math"
"strconv"
"time"
)
const (
OneMicrosecond = 1e3 // ns
OneMillisecond = 1e6 // ns
OneSecond = 1e9 // ns
)
type Tick struct {
Path string
Cas int64
At int64
}
func (t Tick) Less(y interface{}) bool {
return t.At < y.(Tick).At
}
type Timer struct {
Glob *store.Glob
// Ticks are sent here
C <-chan Tick
wt *store.Watch
ticks *vector.Vector
ticker *time.Ticker
}
func New(glob *store.Glob, interval int64, st *store.Store) *Timer {
c := make(chan Tick)
t := &Timer{
Glob: glob,
C: c,
wt: store.NewWatch(st, glob),
ticks: new(vector.Vector),
ticker: time.NewTicker(interval),
}
go t.process(c)
return t
}
func (t *Timer) process(c chan Tick) {
defer close(c)
defer t.ticker.Stop()
logger := util.NewLogger("timer (%s)", t.Glob.Pattern)
peek := func() Tick {
if t.ticks.Len() == 0 {
return Tick{At: math.MaxInt64}
}
return t.ticks.At(0).(Tick)
}
for {
select {
case e := <-t.wt.C:
if closed(t.wt.C) {
return
}
logger.Printf("recvd: %v", e)
// TODO: Handle/Log the next error
// I'm not sure if we should notify the client
// on Set. That seems like it would be difficult
// with the currect way the code functions. Dunno.
at, _ := strconv.Atoi64(e.Body)
x := Tick{e.Path, e.Cas, at}
switch {
default:
break
case e.IsSet():
// First remove it if it's already there.
for i := 0; i < t.ticks.Len(); i++ {
if t.ticks.At(i).(Tick).Path == x.Path {
heap.Remove(t.ticks, i)
i = 0 // have to start over; heap could be reordered
}
}
heap.Push(t.ticks, x)
case e.IsDel():
logger.Println("deleting", e.Path, e.Body)
// This could be optimize since t.ticks is sorted; I can't
// find a way without implementing our own quick-find.
for i := 0; i < t.ticks.Len(); i++ {
if t.ticks.At(i).(Tick).Path == x.Path {
heap.Remove(t.ticks, i)
i = 0 // have to start over; heap could be reordered
}
}
}
case ns := <-t.ticker.C:
for next := peek(); next.At <= ns; next = peek() {
logger.Printf("ticked %#v", next)
heap.Pop(t.ticks)
c <- next
}
}
}
}
func (t *Timer) Close() {
t.wt.Stop()
close(t.wt.C)
}