/
ticker.go
129 lines (118 loc) 路 2.51 KB
/
ticker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package beacon
import (
"time"
"github.com/drand/drand/chain"
clock "github.com/jonboulle/clockwork"
)
const tickerChanBacklog = 5
type ticker struct {
clock clock.Clock
period time.Duration
genesis int64
newCh chan channelInfo
stop chan bool
}
func newTicker(c clock.Clock, period time.Duration, genesis int64) *ticker {
t := &ticker{
clock: c,
period: period,
genesis: genesis,
newCh: make(chan channelInfo, tickerChanBacklog),
stop: make(chan bool, 1),
}
go t.Start()
return t
}
func (t *ticker) Channel() chan roundInfo {
newCh := make(chan roundInfo, 1)
t.newCh <- channelInfo{
ch: newCh,
startAt: t.clock.Now().Unix(),
}
return newCh
}
func (t *ticker) ChannelAt(start int64) chan roundInfo {
newCh := make(chan roundInfo, 1)
t.newCh <- channelInfo{
ch: newCh,
startAt: start,
}
return newCh
}
func (t *ticker) Stop() {
close(t.stop)
}
func (t *ticker) CurrentRound() uint64 {
return chain.CurrentRound(t.clock.Now().Unix(), t.period, t.genesis)
}
// Start will sleep until the next upcoming round and start sending out the
// ticks asap
func (t *ticker) Start() {
chanTime := make(chan time.Time, 1)
// whole reason of this function is to accept new incoming channels while
// still sleeping until the next time
go func() {
now := t.clock.Now().Unix()
_, ttime := chain.NextRound(now, t.period, t.genesis)
if ttime > now {
t.clock.Sleep(time.Duration(ttime-now) * time.Second)
}
// first tick happens at specified time
chanTime <- t.clock.Now()
ticker := t.clock.NewTicker(t.period)
defer ticker.Stop()
tickChan := ticker.Chan()
for {
select {
case nt := <-tickChan:
chanTime <- nt
case <-t.stop:
return
}
}
}()
var channels []channelInfo
var sendTicks = false
var ttime int64
var tround uint64
for {
if sendTicks {
sendTicks = false
info := roundInfo{
round: tround,
time: ttime,
}
for _, chinfo := range channels {
if chinfo.startAt > ttime {
continue
}
select {
case chinfo.ch <- info:
default:
// pass on, do not send if channel is full
}
}
}
select {
case nt := <-chanTime:
tround = chain.CurrentRound(nt.Unix(), t.period, t.genesis)
ttime = nt.Unix()
sendTicks = true
case newChan := <-t.newCh:
channels = append(channels, newChan)
case <-t.stop:
for _, ch := range channels {
close(ch.ch)
}
return
}
}
}
type roundInfo struct {
round uint64
time int64
}
type channelInfo struct {
ch chan roundInfo
startAt int64
}