/
api.go
127 lines (113 loc) · 3.96 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Package config includes utilities for handling configuration files.
package config
import (
"errors"
"github.com/Symantec/scotty/lib/yamlutil"
"github.com/Symantec/scotty/pstore"
"github.com/Symantec/scotty/pstore/config/influx"
"github.com/Symantec/scotty/pstore/config/kafka"
"github.com/Symantec/scotty/pstore/config/lmm"
"github.com/Symantec/scotty/pstore/config/mock"
"github.com/Symantec/scotty/pstore/config/tsdb"
"io"
"time"
)
// NewConsumerBuilders creates consumer builders from a reader.
func NewConsumerBuilders(reader io.Reader) (
result []*pstore.ConsumerWithMetricsBuilder, err error) {
var c ConfigList
if err = yamlutil.Read(reader, &c); err != nil {
return
}
return c.CreateConsumerBuilders()
}
type WriterFactory interface {
NewWriter() (pstore.LimitedRecordWriter, error)
}
// ConsumerConfig creates a consumer builder
type ConsumerConfig struct {
// The name of the consumer. Required.
Name string `yaml:"name"`
// The number of goroutines doing writing. Optional.
// A zero value means 1.
Concurrency uint `yaml:"concurrency"`
// The number of values written each time. Optional.
// A zero value means 1000.
BatchSize uint `yaml:"batchSize"`
// The length of time for rolling up values when writing. Optional.
// Zero means write every value and do no rollup.
RollUpSpan time.Duration `yaml:"rollUpSpan"`
// Maximum reocrds to write per second. Optional.
// 0 means no limit.
RecordsPerSecond uint `yaml:"recordsPerSecond"`
// Metrics whose name matches DebugMetricRegex AND whose host matches
// DebugHostRegex are written to the debug file. Empty values in
// both of these fields means no debugging. An empty value in
// one of these fields means ignore it when deciding if a metric
// matches.
DebugMetricRegex string `yaml:"debugMetricRegex"`
DebugHostRegex string `yaml:"debugHostRegex"`
// The full path of the debug file. Optional.
// If empty, debug goes to stdout.
DebugFilePath string `yaml:"debugFilePath"`
// If true, this consumer is paused
Paused bool `yaml:"paused"`
RegexesOfMetricsToExclude []string `yaml:"regexesOfMetricsToExclude"`
}
func (c *ConsumerConfig) UnmarshalYAML(
unmarshal func(interface{}) error) error {
type consumerConfigFields ConsumerConfig
return yamlutil.StrictUnmarshalYAML(unmarshal, (*consumerConfigFields)(c))
}
// NewConsumerBuilder creates a new consumer builder using the given
// WriterFactory.
func (c *ConsumerConfig) NewConsumerBuilder(wf WriterFactory) (
*pstore.ConsumerWithMetricsBuilder, error) {
return c.newConsumerBuilder(wf)
}
func (c *ConsumerConfig) Reset() {
*c = ConsumerConfig{}
}
// ConfigPlus represents one persistent store.
type ConfigPlus struct {
// One of these pointer fields must be non-nil
Kafka *kafka.Config `yaml:"kafka"`
Influx *influx.Config `yaml:"influx"`
OpenTSDB *tsdb.Config `yaml:"openTSDB"`
LMM *lmm.Config `yaml:"lmm"`
Mock *mock.Config `yaml:"mock"`
Consumer ConsumerConfig `yaml:"consumer"`
}
func (c *ConfigPlus) UnmarshalYAML(unmarshal func(interface{}) error) error {
type configPlusFields ConfigPlus
return yamlutil.StrictUnmarshalYAML(unmarshal, (*configPlusFields)(c))
}
func (c *ConfigPlus) NewConsumerBuilder() (
*pstore.ConsumerWithMetricsBuilder, error) {
switch {
case c.Kafka != nil:
return c.Consumer.NewConsumerBuilder(c.Kafka)
case c.Influx != nil:
return c.Consumer.NewConsumerBuilder(c.Influx)
case c.OpenTSDB != nil:
return c.Consumer.NewConsumerBuilder(c.OpenTSDB)
case c.LMM != nil:
return c.Consumer.NewConsumerBuilder(c.LMM)
case c.Mock != nil:
return c.Consumer.NewConsumerBuilder(c.Mock)
default:
return nil, errors.New("One writer field must be defined.")
}
}
func (c *ConfigPlus) Reset() {
*c = ConfigPlus{}
}
// ConfigList represents a list of persistent stores.
type ConfigList []ConfigPlus
func (c ConfigList) CreateConsumerBuilders() (
list []*pstore.ConsumerWithMetricsBuilder, err error) {
return c.createConsumerBuilders()
}
func (c *ConfigList) Reset() {
*c = nil
}