Skip to content

Commit

Permalink
Added support for InfluxDB batches of more than 1 and made the emissi…
Browse files Browse the repository at this point in the history
…on channel buffer size configurable.

Signed-off-by: Rudolf Visagie <rudolf.visagie@adp.com>
  • Loading branch information
Rudolf Visagie committed May 29, 2019
1 parent 0a73b40 commit 8c16e04
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 29 deletions.
3 changes: 2 additions & 1 deletion atc/atccmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type RunCommand struct {
Metrics struct {
HostName string `long:"metrics-host-name" description:"Host string to attach to emitted metrics."`
Attributes map[string]string `long:"metrics-attribute" description:"A key-value attribute to attach to emitted metrics. Can be specified multiple times." value-name:"NAME:VALUE"`
BufferSize uint32 `long:"metrics-buffer-size" default:"1000" description:"The size of the buffer used in emitting event metrics."`
CaptureErrorMetrics bool `long:"capture-error-metrics" description:"Enable capturing of error log metrics"`
} `group:"Metrics & Diagnostics"`

Expand Down Expand Up @@ -1082,7 +1083,7 @@ func (cmd *RunCommand) configureMetrics(logger lager.Logger) error {
host, _ = os.Hostname()
}

return metric.Initialize(logger.Session("metrics"), host, cmd.Metrics.Attributes)
return metric.Initialize(logger.Session("metrics"), host, cmd.Metrics.Attributes, int(cmd.Metrics.BufferSize))
}

func (cmd *RunCommand) constructDBConn(
Expand Down
19 changes: 17 additions & 2 deletions atc/metric/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ var (
emissions chan eventEmission
)

func Initialize(logger lager.Logger, host string, attributes map[string]string) error {
func Initialize(logger lager.Logger, host string, attributes map[string]string, bufferSize int) error {
logger.Debug("metric-initialize", lager.Data{
"host": host,
"attributes": attributes,
"buffer-size": bufferSize,
})

var (
emitterDescriptions []string
err error
Expand Down Expand Up @@ -97,7 +103,7 @@ func Initialize(logger lager.Logger, host string, attributes map[string]string)
emitter = emitter
eventHost = host
eventAttributes = attributes
emissions = make(chan eventEmission, 1000)
emissions = make(chan eventEmission, bufferSize)

go emitLoop()

Expand All @@ -110,6 +116,9 @@ func Deinitialize(logger lager.Logger) {
}

func emit(logger lager.Logger, event Event) {
logger.Debug("emit-event", lager.Data{
"event": event, "emitter": emitter,
})
if emitter == nil {
return
}
Expand All @@ -132,13 +141,19 @@ func emit(logger lager.Logger, event Event) {

select {
case emissions <- eventEmission{logger: logger, event: event}:
logger.Debug("emit-event-write-to-channel", lager.Data{
"event": event,
})
default:
logger.Error("queue-full", nil)
}
}

func emitLoop() {
for emission := range emissions {
emission.logger.Debug("emit-event-loop", lager.Data{
"event": emission.event,
})
emitter.Emit(emission.logger.Session("emit"), emission.event)
}
}
89 changes: 66 additions & 23 deletions atc/metric/emitter/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type InfluxDBEmitter struct {
client influxclient.Client
database string
batchSize int
batchDuration time.Duration
}

type InfluxDBConfig struct {
Expand All @@ -24,9 +26,22 @@ type InfluxDBConfig struct {
Password string `long:"influxdb-password" description:"InfluxDB server password."`

InsecureSkipVerify bool `long:"influxdb-insecure-skip-verify" description:"Skip SSL verification when emitting to InfluxDB."`

// https://github.com/influxdata/docs.influxdata.com/issues/454
// https://docs.influxdata.com/influxdb/v0.13/write_protocols/write_syntax/#write-a-batch-of-points-with-curl
// 5000 seems to be the batch size recommended by the InfluxDB team
BatchSize uint32 `long:"influxdb-batch-size" default:"5000" description:"Number of points to batch together when emitting to InfluxDB."`
BatchDuration time.Duration `long:"influxdb-batch-duration" default:"300s" description:"The duration to wait before emitting a batch of points to InfluxDB, disregarding influxdb-batch-size."`
}

var (
batch []metric.Event
lastBatchTime time.Time
)

func init() {
batch = make([]metric.Event, 0)
lastBatchTime = time.Now()
metric.RegisterEmitter(&InfluxDBConfig{})
}

Expand All @@ -48,10 +63,16 @@ func (config *InfluxDBConfig) NewEmitter() (metric.Emitter, error) {
return &InfluxDBEmitter{
client: client,
database: config.Database,
batchSize: int(config.BatchSize),
batchDuration: config.BatchDuration,
}, nil
}

func (emitter *InfluxDBEmitter) Emit(logger lager.Logger, event metric.Event) {
func emitBatch(emitter *InfluxDBEmitter, logger lager.Logger, events []metric.Event) {

logger.Debug("influxdb-emit-batch", lager.Data{
"size": len(events),
})
bp, err := influxclient.NewBatchPoints(influxclient.BatchPointsConfig{
Database: emitter.database,
})
Expand All @@ -60,34 +81,56 @@ func (emitter *InfluxDBEmitter) Emit(logger lager.Logger, event metric.Event) {
return
}

tags := map[string]string{
"host": event.Host,
}

for k, v := range event.Attributes {
tags[k] = v
for _, event := range events {
tags := map[string]string{
"host": event.Host,
}

for k, v := range event.Attributes {
tags[k] = v
}

point, err := influxclient.NewPoint(
event.Name,
tags,
map[string]interface{}{
"value": event.Value,
"state": string(event.State),
},
event.Time,
)
if err != nil {
logger.Error("failed-to-construct-point", err)
continue
}

bp.AddPoint(point)
}

point, err := influxclient.NewPoint(
event.Name,
tags,
map[string]interface{}{
"value": event.Value,
"state": string(event.State),
},
event.Time,
)
if err != nil {
logger.Error("failed-to-construct-point", err)
return
}

bp.AddPoint(point)

err = emitter.client.Write(bp)
if err != nil {
logger.Error("failed-to-send-points",
errors.Wrap(metric.ErrFailedToEmit, err.Error()))
return
}
logger.Info("influxdb-emitter-fork-influxdb-batch-emitted", lager.Data{
"size": len(events),
})
}


func (emitter *InfluxDBEmitter) Emit(logger lager.Logger, event metric.Event) {
batch = append(batch, event)
duration := time.Since(lastBatchTime)
if len(batch) > emitter.batchSize || duration > emitter.batchDuration {
logger.Debug("influxdb-pre-emit-batch", lager.Data{
"influxdb-batch-size": emitter.batchSize,
"current-batch-size": len(batch),
"influxdb-batch-duration": emitter.batchDuration,
"current-duration": duration,
})
go emitBatch(emitter, logger, batch)
batch = make([]metric.Event, 0)
lastBatchTime = time.Now()
}
}
2 changes: 1 addition & 1 deletion atc/metric/error_sink_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("ErrorSinkCollector", func() {
metric.RegisterEmitter(emitterFactory)
emitterFactory.IsConfiguredReturns(true)
emitterFactory.NewEmitterReturns(emitter, nil)
metric.Initialize(testLogger, "test", map[string]string{})
metric.Initialize(testLogger, "test", map[string]string{}, 1000)
})

AfterEach(func() {
Expand Down
2 changes: 1 addition & 1 deletion atc/metric/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var _ = Describe("MetricsHandler", func() {
emitterFactory.IsConfiguredReturns(true)
emitterFactory.NewEmitterReturns(emitter, nil)

metric.Initialize(dummyLogger, "test", map[string]string{})
metric.Initialize(dummyLogger, "test", map[string]string{}, 1000)

ts = httptest.NewServer(
WrapHandler(dummyLogger, "ApiEndpoint", http.HandlerFunc(noopHandler)))
Expand Down
4 changes: 3 additions & 1 deletion atc/metric/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var _ = Describe("Periodic emission of metrics", func() {
)

BeforeEach(func() {
testLogger := lager.NewLogger("test")

emitterFactory := &metricfakes.FakeEmitterFactory{}
emitter = &metricfakes.FakeEmitter{}

Expand All @@ -35,7 +37,7 @@ var _ = Describe("Periodic emission of metrics", func() {
b := &dbfakes.FakeConn{}
b.NameReturns("B")
metric.Databases = []db.Conn{a, b}
metric.Initialize(nil, "test", map[string]string{})
metric.Initialize(testLogger, "test", map[string]string{}, 1000)

process = ifrit.Invoke(metric.PeriodicallyEmit(lager.NewLogger("dont care"), 250*time.Millisecond))
})
Expand Down

0 comments on commit 8c16e04

Please sign in to comment.