-
Notifications
You must be signed in to change notification settings - Fork 3
/
pause_tagger.go
47 lines (39 loc) · 1.29 KB
/
pause_tagger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package steps
import (
"fmt"
"strconv"
"time"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
)
func RegisterPauseTagger(b reg.ProcessorRegistry) {
create := func(pipeline *bitflow.SamplePipeline, params map[string]string) error {
tag := params["tag"]
duration, err := time.ParseDuration(params["minPause"])
if err != nil {
return reg.ParameterError("minPause", err)
}
pipeline.Add(&PauseTagger{MinimumPause: duration, Tag: tag})
return nil
}
b.RegisterAnalysisParamsErr("tag-pauses", create, "Set a given tag to an integer value, that increments whenever the timestamps of two samples are more apart than a given duration", reg.RequiredParams("tag", "minPause"))
}
type PauseTagger struct {
bitflow.NoopProcessor
MinimumPause time.Duration
Tag string
counter int
lastTime time.Time
}
func (d *PauseTagger) String() string {
return fmt.Sprintf("increment tag '%v' after pauses of %v", d.Tag, d.MinimumPause.String())
}
func (d *PauseTagger) Sample(sample *bitflow.Sample, header *bitflow.Header) error {
last := d.lastTime
d.lastTime = sample.Time
if !last.IsZero() && sample.Time.Sub(last) >= d.MinimumPause {
d.counter++
}
sample.SetTag(d.Tag, strconv.Itoa(d.counter))
return d.GetSink().Sample(sample, header)
}