-
Notifications
You must be signed in to change notification settings - Fork 3
/
stats.go
92 lines (80 loc) · 2.42 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package steps
import (
"sort"
"strconv"
"github.com/antongulenko/golib"
"github.com/bitflow-stream/go-bitflow/bitflow"
"github.com/bitflow-stream/go-bitflow/script/reg"
"github.com/go-ini/ini"
log "github.com/sirupsen/logrus"
)
type StoreStats struct {
bitflow.NoopProcessor
TargetFile string
stats map[string]*FeatureStats
}
func NewStoreStats(targetFile string) *StoreStats {
return &StoreStats{
TargetFile: targetFile,
stats: make(map[string]*FeatureStats),
}
}
func RegisterStoreStats(b reg.ProcessorRegistry) {
b.RegisterStep("stats",
func(p *bitflow.SamplePipeline, params map[string]interface{}) error {
p.Add(NewStoreStats(params["file"].(string)))
return nil
},
"Output statistics about processed samples to a given ini-file").
Required("file", reg.String())
}
func (stats *StoreStats) Sample(inSample *bitflow.Sample, header *bitflow.Header) error {
for index, field := range header.Fields {
val := inSample.Values[index]
feature, ok := stats.stats[field]
if !ok {
feature = NewFeatureStats()
stats.stats[field] = feature
}
feature.Push(float64(val))
}
return stats.NoopProcessor.Sample(inSample, header)
}
func (stats *StoreStats) Close() {
defer stats.CloseSink()
if err := stats.StoreStatistics(); err != nil {
log.Println("Error storing feature statistics:", err)
stats.Error(err)
}
}
func (stats *StoreStats) StoreStatistics() error {
printFloat := func(val float64) string {
return strconv.FormatFloat(val, 'g', -1, 64)
}
cfg := ini.Empty()
for _, name := range stats.sortedFeatures() {
feature := stats.stats[name]
section := cfg.Section(name)
var multiErr golib.MultiError
multiErr.AddMulti(section.NewKey("avg", printFloat(feature.Mean())))
multiErr.AddMulti(section.NewKey("stddev", printFloat(feature.Stddev())))
multiErr.AddMulti(section.NewKey("count", strconv.FormatUint(uint64(feature.Len()), 10)))
multiErr.AddMulti(section.NewKey("min", printFloat(feature.Min)))
multiErr.AddMulti(section.NewKey("max", printFloat(feature.Max)))
if err := multiErr.NilOrError(); err != nil {
return err
}
}
return cfg.SaveTo(stats.TargetFile)
}
func (stats *StoreStats) sortedFeatures() []string {
features := make([]string, 0, len(stats.stats))
for name := range stats.stats {
features = append(features, name)
}
sort.Strings(features)
return features
}
func (stats *StoreStats) String() string {
return "Store Statistics (to " + stats.TargetFile + ")"
}