-
Notifications
You must be signed in to change notification settings - Fork 3
/
fork.go
92 lines (82 loc) · 3.12 KB
/
fork.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package steps
import (
"fmt"
"sort"
"strconv"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/bitflow/fork"
"github.com/bitflow-stream/go-bitflow/script/reg"
)
// This function is placed in this package to avoid circular dependency between the fork and the query package.
func RegisterForks(b reg.ProcessorRegistry) {
b.RegisterFork("rr", fork_round_robin, "The round-robin fork distributes the samples to the subpipelines based on weights. The pipeline selector keys must be positive integers denoting the weight of the respective pipeline.")
b.RegisterFork("fork_tag", fork_tag, "Fork based on the values of the given tag", reg.RequiredParams("tag"), reg.OptionalParams("regex", "exact"))
b.RegisterFork("fork_tag_template", fork_tag_template, "Fork based on a template string, placeholders like ${xxx} are replaced by tag values.", reg.RequiredParams("template"), reg.OptionalParams("regex", "exact"))
}
func fork_round_robin(subpipelines []reg.Subpipeline, _ map[string]string) (fork.Distributor, error) {
res := new(fork.RoundRobinDistributor)
res.Weights = make([]int, len(subpipelines))
res.Subpipelines = make([]*bitflow.SamplePipeline, len(subpipelines))
for i, subpipeAST := range subpipelines {
weightSum := 0
for _, keyStr := range subpipeAST.Keys() {
weight, err := strconv.Atoi(keyStr)
if err != nil {
return nil, fmt.Errorf("Failed to parse Round Robin subpipeline key '%v' to integer: %v", keyStr, err)
}
if weight <= 0 {
return nil, fmt.Errorf("Round robin subpipeline keys must be positive (wrong key: %v)", weight)
}
weightSum += weight
}
res.Weights[i] = weightSum
subpipe, err := subpipeAST.Build()
if err != nil {
return nil, err
}
res.Subpipelines[i] = subpipe
}
return res, nil
}
func fork_tag(subpipelines []reg.Subpipeline, params map[string]string) (fork.Distributor, error) {
tag := params["tag"]
delete(params, "tag")
params["template"] = "${" + tag + "}"
return fork_tag_template(subpipelines, params)
}
func fork_tag_template(subpipelines []reg.Subpipeline, params map[string]string) (fork.Distributor, error) {
wildcardPipelines := make(map[string]func() ([]*bitflow.SamplePipeline, error))
var keysArray []string
for _, pipe := range subpipelines {
for _, key := range pipe.Keys() {
if _, ok := wildcardPipelines[key]; ok {
return nil, fmt.Errorf("Subpipeline key occurs multiple times: %v", key)
}
wildcardPipelines[key] = (&wildcardSubpipeline{p: pipe}).build
keysArray = append(keysArray, key)
}
}
sort.Strings(keysArray)
var err error
dist := &fork.TagDistributor{
TagTemplate: bitflow.TagTemplate{
Template: params["template"],
},
RegexDistributor: fork.RegexDistributor{
Pipelines: wildcardPipelines,
ExactMatch: reg.BoolParam(params, "exact", false, true, &err),
RegexMatch: reg.BoolParam(params, "regex", false, true, &err),
},
}
if err == nil {
err = dist.Init()
}
return dist, err
}
type wildcardSubpipeline struct {
p reg.Subpipeline
}
func (m wildcardSubpipeline) build() ([]*bitflow.SamplePipeline, error) {
pipe, err := m.p.Build()
return []*bitflow.SamplePipeline{pipe}, err
}