forked from Mintegral-official/mtggokit
/
schedule.go
84 lines (72 loc) · 1.53 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package streamer
import (
"container/heap"
"context"
"time"
)
type SchedInfo struct {
TimeInterval int
}
type SchedUnit struct {
name string
streamer Streamer
deadline int
index int
}
type Sched []*SchedUnit
func (s Sched) Len() int { return len(s) }
func (s Sched) Less(i, j int) bool {
return s[i].deadline < s[j].deadline
}
func (s Sched) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
s[i].index = i
s[j].index = j
}
func (s *Sched) Push(x interface{}) {
item := x.(*SchedUnit)
n := len(*s)
item.index = n
*s = append(*s, item)
}
func (s *Sched) Top() *SchedUnit {
if len(*s) == 0 {
return nil
}
return (*s)[0]
}
func (s *Sched) Pop() interface{} {
old := *s
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*s = old[0 : n-1]
return item
}
func (s *Sched) AddStreamer(name string, dataStreamer Streamer) {
s.Push(&SchedUnit{
name: name,
streamer: dataStreamer,
deadline: int(time.Now().Unix()) + dataStreamer.GetSchedInfo().TimeInterval,
})
}
func (s *Sched) Schedule(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
if time.Now().Unix() < int64(s.Top().deadline) {
time.Sleep(time.Second * time.Duration(int(time.Now().Unix())-s.Top().deadline))
continue
}
x := heap.Pop(s)
su := x.(*SchedUnit)
//TODO handler error
_ = su.streamer.UpdateData(context.Background())
su.deadline += su.streamer.GetSchedInfo().TimeInterval
heap.Push(s, su)
time.Sleep(time.Second * time.Duration(s.Top().deadline-int(time.Now().Unix())))
}
}
}