-
Notifications
You must be signed in to change notification settings - Fork 3
/
sort.go
64 lines (54 loc) · 1.59 KB
/
sort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package steps
import (
"sort"
"strings"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
log "github.com/sirupsen/logrus"
)
// Sort based on given Tags, use Timestamp as last sort criterion
type SampleSorter struct {
Tags []string
}
type SampleSlice struct {
samples []*bitflow.Sample
sorter *SampleSorter
}
func (s SampleSlice) Len() int {
return len(s.samples)
}
func (s SampleSlice) Less(i, j int) bool {
a := s.samples[i]
b := s.samples[j]
for _, tag := range s.sorter.Tags {
tagA := a.Tag(tag)
tagB := b.Tag(tag)
if tagA == tagB {
continue
}
return tagA < tagB
}
return a.Time.Before(b.Time)
}
func (s SampleSlice) Swap(i, j int) {
s.samples[i], s.samples[j] = s.samples[j], s.samples[i]
}
func (sorter *SampleSorter) ProcessBatch(header *bitflow.Header, samples []*bitflow.Sample) (*bitflow.Header, []*bitflow.Sample, error) {
log.Println("Sorting", len(samples), "samples")
sort.Sort(SampleSlice{samples, sorter})
return header, samples, nil
}
func (sorter *SampleSorter) String() string {
all := make([]string, len(sorter.Tags)+1)
copy(all, sorter.Tags)
all[len(all)-1] = "Timestamp"
return "Sort: " + strings.Join(all, ", ")
}
func RegisterSampleSorter(b reg.ProcessorRegistry) {
b.RegisterBatchStep("sort",
func(params map[string]interface{}) (bitflow.BatchProcessingStep, error) {
return &SampleSorter{params["tags"].([]string)}, nil
},
"Sort a batch of samples based on the values of the given comma-separated tags. The default criterion is the timestamp.").
Optional("tags", reg.List(reg.String()), []string{})
}