-
Notifications
You must be signed in to change notification settings - Fork 3
/
pick_samples.go
115 lines (109 loc) · 3.64 KB
/
pick_samples.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package steps
import (
"fmt"
"log"
"strconv"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
)
func RegisterPickPercent(b reg.ProcessorRegistry) {
b.RegisterAnalysisParamsErr("pick",
func(p *bitflow.SamplePipeline, params map[string]string) error {
pick_percentage, err := strconv.ParseFloat(params["percent"], 64)
if err != nil {
return reg.ParameterError("percent", err)
}
counter := float64(0)
p.Add(&SampleFilter{
Description: bitflow.String(fmt.Sprintf("Pick %.2f%%", pick_percentage*100)),
IncludeFilter: func(_ *bitflow.Sample, _ *bitflow.Header) (bool, error) {
counter += pick_percentage
if counter > 1.0 {
counter -= 1.0
return true, nil
}
return false, nil
},
})
return nil
},
"Forward only a percentage of samples, parameter is in the range 0..1", reg.OptionalParams("percent"))
}
func RegisterPickHead(b reg.ProcessorRegistry) {
b.RegisterAnalysisParamsErr("head",
func(p *bitflow.SamplePipeline, params map[string]string) (err error) {
doClose := reg.BoolParam(params, "close", false, true, &err)
num := reg.IntParam(params, "num", 0, false, &err)
if err == nil {
processed := 0
proc := &bitflow.SimpleProcessor{
Description: "Pick first " + strconv.Itoa(num) + " samples",
}
proc.Process = func(sample *bitflow.Sample, header *bitflow.Header) (*bitflow.Sample, *bitflow.Header, error) {
if num > processed {
processed++
return sample, header, nil
} else {
if doClose {
proc.Error(nil) // Stop processing without an error
}
return nil, nil, nil
}
}
p.Add(proc)
}
return
},
"Forward only a number of the first processed samples. The whole pipeline is closed afterwards, unless close=false is given.", reg.RequiredParams("num"), reg.OptionalParams("close"))
}
func RegisterSkipHead(b reg.ProcessorRegistry) {
b.RegisterAnalysisParamsErr("skip",
func(p *bitflow.SamplePipeline, params map[string]string) (err error) {
num := reg.IntParam(params, "num", 0, false, &err)
if err == nil {
dropped := 0
p.Add(&bitflow.SimpleProcessor{
Description: "Drop first " + strconv.Itoa(num) + " samples",
Process: func(sample *bitflow.Sample, header *bitflow.Header) (*bitflow.Sample, *bitflow.Header, error) {
if dropped >= num {
return sample, header, nil
} else {
dropped++
return nil, nil, nil
}
},
})
}
return
},
"Drop a number of samples in the beginning", reg.OptionalParams("num"))
}
func RegisterPickTail(b reg.ProcessorRegistry) {
b.RegisterAnalysisParamsErr("tail",
func(p *bitflow.SamplePipeline, params map[string]string) (err error) {
num := reg.IntParam(params, "num", 0, false, &err)
if err == nil {
ring := bitflow.NewSampleRing(num)
proc := &bitflow.SimpleProcessor{
Description: "Read until end of stream, and forward only the last " + strconv.Itoa(num) + " samples",
Process: func(sample *bitflow.Sample, header *bitflow.Header) (*bitflow.Sample, *bitflow.Header, error) {
ring.Push(sample, header)
return nil, nil, nil
},
}
proc.OnClose = func() {
flush := ring.Get()
log.Printf("%v: Reached end of stream, now flushing %v samples", proc, len(flush))
for _, sample := range flush {
if err := proc.NoopProcessor.Sample(sample.Sample, sample.Header); err != nil {
proc.Error(err)
break
}
}
}
p.Add(proc)
}
return
},
"Forward only a number of the first processed samples. The whole pipeline is closed afterwards, unless close=false is given.", reg.RequiredParams("num"))
}