Skip to content

Commit

Permalink
Flush reloc metrics before exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
pushrax authored and hkdsun committed Mar 15, 2018
1 parent 971fbdb commit 4ca3f75
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 0 deletions.
16 changes: 16 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ghostferry

import (
"fmt"
"sync"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -29,6 +30,8 @@ type Metrics struct {
Prefix string
DefaultTags []MetricTag
Sink chan interface{}

wg sync.WaitGroup
}

func SetGlobalMetrics(prefix string, sink chan interface{}) *Metrics {
Expand Down Expand Up @@ -93,6 +96,19 @@ func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f fu
m.Timer(key, time.Since(start), m.mergeWithDefaultTags(tags), sampleRate)
}

func (m *Metrics) AddConsumer() {
m.wg.Add(1)
}

func (m *Metrics) DoneConsumer() {
m.wg.Done()
}

func (m *Metrics) StopAndFlush() {
close(m.Sink)
m.wg.Wait()
}

func (m *Metrics) sendMetric(metric interface{}) {
if m.Sink == nil {
return
Expand Down
2 changes: 2 additions & 0 deletions reloc/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func main() {
}

ferry.Run()

reloc.StopAndFlushMetrics()
}

func errorAndExit(msg string) {
Expand Down
8 changes: 8 additions & 0 deletions reloc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func InitializeMetrics(prefix string, config *Config) error {
{Name: "TargetDB", Value: config.TargetDB},
}

metrics.AddConsumer()
go consumeMetrics(client, metricsChan)

return nil
Expand All @@ -39,7 +40,12 @@ func SetGlobalMetrics(prefix string, metricsChan chan interface{}) {
metrics = ghostferry.SetGlobalMetrics(prefix, metricsChan)
}

func StopAndFlushMetrics() {
metrics.StopAndFlush()
}

func consumeMetrics(client *dogstatsd.Client, metricsChan chan interface{}) {
defer metrics.DoneConsumer()
for {
switch metric := (<-metricsChan).(type) {
case ghostferry.CountMetric:
Expand All @@ -48,6 +54,8 @@ func consumeMetrics(client *dogstatsd.Client, metricsChan chan interface{}) {
handleErr(client.Gauge(metric.Key, metric.Value, tagsToStrings(metric.Tags), metric.SampleRate), metric)
case ghostferry.TimerMetric:
handleErr(client.Timer(metric.Key, metric.Value, tagsToStrings(metric.Tags), metric.SampleRate), metric)
case nil:
return
}
}
}
Expand Down

0 comments on commit 4ca3f75

Please sign in to comment.