-
Notifications
You must be signed in to change notification settings - Fork 14
/
scheduler.go
108 lines (97 loc) · 2.39 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package server
import (
"sync"
"time"
)
// <AvailableSamplingRates>
// <SamplingRate>0</SamplingRate>
// <SamplingRate>50</SamplingRate>
// <SamplingRate>100</SamplingRate>
// <SamplingRate>250</SamplingRate>
// <SamplingRate>500</SamplingRate>
// <SamplingRate>1000</SamplingRate>
// <SamplingRate>2000</SamplingRate>
// <SamplingRate>5000</SamplingRate>
// <SamplingRate>10000</SamplingRate>
// </AvailableSamplingRates>
type Scheduler struct {
sync.Mutex
cancellationCh chan struct{}
tickers map[time.Duration]*PollGroup
minSamplingInterval time.Duration
}
func NewScheduler(server *Server) *Scheduler {
s := &Scheduler{sync.Mutex{}, server.closing, make(map[time.Duration]*PollGroup), time.Duration(server.ServerCapabilities().MinSupportedSampleRate) * time.Millisecond}
return s
}
func (s *Scheduler) GetPollGroup(interval time.Duration) *PollGroup {
s.Lock()
defer s.Unlock()
if interval < s.minSamplingInterval {
interval = s.minSamplingInterval
}
if t, ok := s.tickers[interval]; ok {
return t
}
t := NewPollGroup(interval, s.cancellationCh)
s.tickers[interval] = t
return t
}
type PollGroup struct {
sync.Mutex
cancellationCh chan struct{}
interval time.Duration
subs map[PollListener]struct{}
}
func NewPollGroup(interval time.Duration, cancellationCh chan struct{}) *PollGroup {
b := &PollGroup{
Mutex: sync.Mutex{},
cancellationCh: cancellationCh,
interval: interval,
subs: map[PollListener]struct{}{},
}
go b.run()
// log.Printf("Opening PollGroup %d ms\n", b.interval.Nanoseconds()/1000000)
return b
}
func (b *PollGroup) run() {
ticker := time.NewTicker(b.interval)
for {
select {
case <-b.cancellationCh:
ticker.Stop()
b.Lock()
// log.Printf("Closing PollGroup %d ms with %d subs\n", b.interval.Nanoseconds()/1000000, len(b.subs))
for sub := range b.subs {
delete(b.subs, sub)
}
b.Unlock()
return
case <-ticker.C:
b.Lock()
listeners := make([]PollListener, len(b.subs))
i := 0
for sub := range b.subs {
listeners[i] = sub
i++
}
b.Unlock()
for _, listener := range listeners {
listener.Poll()
}
}
}
}
func (b *PollGroup) Subscribe(listener PollListener) {
b.Lock()
b.subs[listener] = struct{}{}
b.Unlock()
}
func (b *PollGroup) Unsubscribe(listener PollListener) {
b.Lock()
delete(b.subs, listener)
b.Unlock()
}
type PollListener interface {
Poll()
}