-
Notifications
You must be signed in to change notification settings - Fork 5
/
group_trigger.go
295 lines (268 loc) · 10.6 KB
/
group_trigger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package dastard
import (
"fmt"
"log"
"sort"
"time"
"github.com/davecgh/go-spew/spew"
)
// TriggerCounter is a per-channel struct that counts triggers over an interval of
// FrameIndex values and stores a slice of messages about the count. It does not
// send these messages anywhere; that's the job of the TriggerBroker.
// It takes advantage of the fact that TriggerBroker provides a synchronization point
// so several TriggerCounters can count triggers for all channels in sync.
// Counts triggers between the FrameIndex values of [lo, hi] to learn trigger rate.
type TriggerCounter struct {
channelIndex int
hi FrameIndex // the highest FrameIndex for which we should count triggers
lo FrameIndex // count trigs starting at this FrameIndex (earlier are errors)
hiTime time.Time // expected real-world time corresponding to hi
countsSeen int
stepDuration time.Duration // how long each trigger counting step should last
sampleRate float64
keyTime time.Time // the time of one recent correspondence between time and FrameIndex
keyFrame FrameIndex // keyFrame occured at keyTime to the best of our knowledge
initialized bool
messages []triggerCounterMessage
}
type triggerCounterMessage struct {
hiTime time.Time
duration time.Duration
countsSeen int
}
// TriggerRateMessage is used to publish trigger rate info over zmq
type TriggerRateMessage struct {
HiTime time.Time
Duration time.Duration
CountsSeen []int
}
// NewTriggerCounter returns a TriggerCounter
func NewTriggerCounter(channelIndex int, stepDuration time.Duration) TriggerCounter {
return TriggerCounter{channelIndex: channelIndex, stepDuration: stepDuration, messages: make([]triggerCounterMessage, 0)}
}
// initialize initializes the counter by starting the trigger-count "integration period"
func (tc *TriggerCounter) initialize() {
// Set hiTime (end of the integration period) to the first multiple of stepDuration after keyTime
hiTime := tc.keyTime.Round(tc.stepDuration)
for hiTime.Before(tc.keyTime) {
hiTime = hiTime.Add(tc.stepDuration)
}
tc.hiTime = hiTime
tc.hi = tc.keyFrame + FrameIndex(roundint(tc.sampleRate*tc.hiTime.Sub(tc.keyTime).Seconds()))
tc.lo = tc.hi - FrameIndex(roundint(tc.sampleRate*tc.stepDuration.Seconds())) + 1
tc.initialized = true
}
// messageAndReset appends a new triggerCounterMessage to our slice of them and
// resets to count triggers in the subsequent interval.
func (tc *TriggerCounter) messageAndReset() {
// Generate a message
message := triggerCounterMessage{hiTime: tc.hiTime, duration: tc.stepDuration, countsSeen: tc.countsSeen}
tc.messages = append(tc.messages, message)
// Reset counters and lo/hi times.
tc.countsSeen = 0
tc.hiTime = tc.hiTime.Add(tc.stepDuration)
tc.lo = tc.hi + 1
hi_minus_key := tc.hiTime.Sub(tc.keyTime).Seconds()
tc.hi = tc.keyFrame + FrameIndex(roundint(tc.sampleRate*hi_minus_key))
}
// countNewTriggers increments the relevant per-channel counters.
// It also generates a set of messages in `tc.messages` at the
// chosen message rate (i.e., each `tc.stepDuration`).
func (tc *TriggerCounter) countNewTriggers(tList *triggerList) error {
// Update keyFrame and keyTime to have a new, recent correspondence between
// the real-world time and frame number.
tc.keyFrame = tList.keyFrame
tc.keyTime = tList.keyTime
tc.sampleRate = tList.sampleRate
if tc.sampleRate <= 0 {
// Counting trigger rates makes no sense if the counter has no understanding of the
// data sample rate. Give up.
return nil
}
if !tc.initialized {
tc.initialize()
}
for _, frame := range tList.frames {
// The following loop might appear infinite, but it isn't, because tc.hi increases in each
// call to tc.messageAndReset().
for frame > tc.hi {
tc.messageAndReset()
}
if frame >= tc.lo {
tc.countsSeen++
}
}
// The following loop might appear infinite; again, tc.messageAndReset() ensures it isn't.
for tList.firstFrameThatCannotTrigger > tc.hi {
tc.messageAndReset()
}
return nil
}
// GroupTriggerState contains all the state that controls all group trigger connections.
// It is also used to communicate with clients about connections to add or remove.
type GroupTriggerState struct {
Connections map[int][]int // Map sense is connections[source] = []int{rxA, rxB, ...}
}
// TriggerBroker communicates with DataChannel objects to allow them to operate independently
// yet still share group triggering information.
type TriggerBroker struct {
nchannels int
nconnections int
sources []map[int]bool // sources[rx] is a map whose non-empty entries are the sources for that rx
latestPrimaries [][]FrameIndex
triggerCounters []TriggerCounter
}
// NewTriggerBroker creates a new TriggerBroker object for nchan channels to share group triggers.
func NewTriggerBroker(nchan int) *TriggerBroker {
broker := new(TriggerBroker)
broker.nchannels = nchan
broker.sources = make([]map[int]bool, nchan)
for i := 0; i < nchan; i++ {
broker.sources[i] = make(map[int]bool)
}
broker.latestPrimaries = make([][]FrameIndex, nchan)
broker.triggerCounters = make([]TriggerCounter, nchan)
triggerReportingPeriod := time.Second // could be programmable in future
for i := 0; i < nchan; i++ {
broker.triggerCounters[i] = NewTriggerCounter(i, triggerReportingPeriod)
}
return broker
}
// AddConnection connects source -> receiver for group triggers.
// It is safe to add connections that already exist.
func (broker *TriggerBroker) AddConnection(source, receiver int) error {
// Don't connect a channel to itself. (Silently ignore this request.)
if source == receiver {
return nil
}
if receiver < 0 || receiver >= broker.nchannels {
return fmt.Errorf("could not add channel %d as a group receiver (nchannels=%d)",
receiver, broker.nchannels)
}
if !broker.sources[receiver][source] {
broker.nconnections++
}
broker.sources[receiver][source] = true
return nil
}
// DeleteConnection disconnects source -> receiver for group triggers.
// It is safe to delete connections whether they exist or not.
func (broker *TriggerBroker) DeleteConnection(source, receiver int) error {
if receiver < 0 || receiver >= broker.nchannels {
return fmt.Errorf("could not remove channel %d as a group receiver (nchannels=%d)",
receiver, broker.nchannels)
}
if broker.sources[receiver][source] {
broker.nconnections--
}
delete(broker.sources[receiver], source)
return nil
}
// StopTriggerCoupling ends all trigger coupling: both group triggering and TDM-style FB-Err coupling.
func (broker *TriggerBroker) StopTriggerCoupling() error {
for i := range broker.sources {
broker.sources[i] = make(map[int]bool)
}
broker.nconnections = 0
return nil
}
// isConnected returns whether source->receiver is connected.
func (broker *TriggerBroker) isConnected(source, receiver int) bool {
if receiver < 0 || receiver >= broker.nchannels {
return false
}
_, ok := broker.sources[receiver][source]
return ok
}
// SourcesForReceiver returns a set of all sources for the given receiver.
func (broker *TriggerBroker) SourcesForReceiver(receiver int) map[int]bool {
if receiver < 0 || receiver >= broker.nchannels {
return nil
}
sources := broker.sources[receiver]
return sources
}
func (broker *TriggerBroker) computeGroupTriggerState() (gts GroupTriggerState) {
conns := make(map[int][]int)
for rx, sources := range broker.sources {
for source := range sources {
conns[source] = append(conns[source], rx)
}
}
gts.Connections = conns
return gts
}
// FrameIdxSlice attaches the methods of sort.Interface to []FrameIndex, sorting in increasing order.
type FrameIdxSlice []FrameIndex
func (p FrameIdxSlice) Len() int { return len(p) }
func (p FrameIdxSlice) Less(i, j int) bool { return p[i] < p[j] }
func (p FrameIdxSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Distribute runs one pass of brokering trigger frame #s from sources to receivers given
// the map of primary triggers as a map[int]triggerList..
func (broker *TriggerBroker) Distribute(primaries map[int]triggerList) (map[int][]FrameIndex, error) {
// Store all primary trigger indices
nprimaries := 0
for idx, tlist := range primaries {
broker.latestPrimaries[idx] = tlist.frames
nprimaries += len(tlist.frames)
err := broker.triggerCounters[idx].countNewTriggers(&tlist)
if err != nil {
msg := fmt.Sprintf("Triggering assumptions broken!\n%v\n%v\n%v", err,
spew.Sdump(tlist), spew.Sdump(broker.triggerCounters[idx]))
log.Print(msg)
}
}
// Stop now if there are obviously no secondary triggers (either b/c no primaries to
// cause them, or b/c no trigger connections are set).
secondaryMap := make(map[int][]FrameIndex)
if nprimaries == 0 || broker.nconnections == 0 {
return secondaryMap, nil
}
// Loop over all receivers. If any, make list of all triggers they receive, sort, and store.
for idx := 0; idx < broker.nchannels; idx++ {
sources := broker.SourcesForReceiver(idx)
if len(sources) > 0 {
var trigs []FrameIndex
for source := range sources {
trigs = append(trigs, broker.latestPrimaries[source]...)
}
sort.Sort(FrameIdxSlice(trigs))
secondaryMap[idx] = trigs
}
}
return secondaryMap, nil
}
// GenerateTriggerMessages makes one or more trigger rate message. It combines all channels' trigger
// rate info into a single message, and it sends that message onto `clientMessageChan`.
// There might be more than one count stored in the triggerCounters[].messages, so this might
// generate multiple messages.
func (broker *TriggerBroker) GenerateTriggerMessages() {
var hiTime time.Time
var duration time.Duration
nMessages := len(broker.triggerCounters[0].messages)
for j := 1; j < broker.nchannels; j++ {
if len(broker.triggerCounters[j].messages) != nMessages {
msg := fmt.Sprintf("triggerCounter[%d] has %d messages, want %d", j, len(broker.triggerCounters[j].messages), nMessages)
panic(msg)
}
}
for i := 0; i < nMessages; i++ {
// It's a data race if we don't make a new slice for each message:
countsSeen := make([]int, broker.nchannels)
for j := 0; j < broker.nchannels; j++ {
message := broker.triggerCounters[j].messages[i]
if j == 0 { // first channel
hiTime = message.hiTime
duration = message.duration
}
if message.hiTime.Nanosecond() != hiTime.Nanosecond() || message.duration.Nanoseconds() != duration.Nanoseconds() {
panic("trigger messages not in sync")
}
countsSeen[j] = message.countsSeen
}
clientMessageChan <- ClientUpdate{tag: "TRIGGERRATE", state: TriggerRateMessage{HiTime: hiTime, Duration: duration, CountsSeen: countsSeen}}
}
for j := 0; j < broker.nchannels; j++ {
broker.triggerCounters[j].messages = make([]triggerCounterMessage, 0) // release all memory
}
}