Skip to content
This repository has been archived by the owner on Oct 12, 2021. It is now read-only.

Commit

Permalink
Revert "Remove varz shim from metron"
Browse files Browse the repository at this point in the history
This reverts commit dee5e0f.

[#97731982]
  • Loading branch information
geapi committed Jun 24, 2015
1 parent dee5e0f commit eb19a5e
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 1 deletion.
26 changes: 26 additions & 0 deletions src/integration_tests/metron/varz_healthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,32 @@ var _ = Describe("/varz endpoint", func() {
return getMetricFromContext(agentListenerContext, "receivedMessageCount").Value
}).Should(BeNumerically(">=", expectedValue))
})

It("includes value metrics from sources", func() {
connection, _ := net.Dial("udp", "localhost:51161")
connection.Write(basicValueMessage())

Eventually(func() *instrumentation.Context { return getContext("forwarder") }).ShouldNot(BeNil())

metric := getMetricFromContext(getContext("forwarder"), "fake-origin-2.fake-metric-name")
Expect(metric).ToNot(BeNil())
Expect(metric.Name).To(Equal("fake-origin-2.fake-metric-name"))
Expect(metric.Value).To(BeNumerically("==", 42))
Expect(metric.Tags["component"]).To(Equal("test-component"))
})

It("includes counter event metrics from sources", func() {
connection, _ := net.Dial("udp", "localhost:51161")
connection.Write(basicCounterEventMessage())
connection.Write(basicCounterEventMessage())

Eventually(func() *instrumentation.Context { return getContext("forwarder") }).ShouldNot(BeNil())

metric := getMetricFromContext(getContext("forwarder"), "fake-origin-2.fake-counter-event-name")
Expect(metric.Name).To(Equal("fake-origin-2.fake-counter-event-name"))
Expect(metric.Value).To(BeNumerically("==", 2))
Expect(metric.Tags["component"]).To(Equal("test-component"))
})
})

var _ = Describe("/healthz", func() {
Expand Down
7 changes: 6 additions & 1 deletion src/metron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"metron/writers/messageaggregator"
"metron/writers/signer"
"metron/writers/tagger"
"metron/writers/varzforwarder"

"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/gunk/workpool"
Expand All @@ -33,6 +34,8 @@ var (
debug = flag.Bool("debug", false, "Debug logging")
)

var metricTTL = time.Second * 5

func main() {
flag.Parse()
config, logger := parseConfig(*debug, *configFilePath, *logFilePath)
Expand All @@ -42,7 +45,8 @@ func main() {
dopplerForwarder := dopplerforwarder.New(dopplerClientPool, logger)
byteSigner := signer.New(config.SharedSecret, dopplerForwarder)
marshaller := eventmarshaller.New(byteSigner, logger)
messageTagger := tagger.New(config.Deployment, config.Job, config.Index, marshaller)
varzShim := varzforwarder.New(config.Job, metricTTL, marshaller, logger)
messageTagger := tagger.New(config.Deployment, config.Job, config.Index, varzShim)
aggregator := messageaggregator.New(messageTagger, logger)

dropsondeUnmarshaller := eventunmarshaller.New(aggregator, logger)
Expand All @@ -58,6 +62,7 @@ func main() {
legacyUnmarshaller,
dropsondeUnmarshaller,
aggregator,
varzShim,
marshaller,
}

Expand Down
55 changes: 55 additions & 0 deletions src/metron/writers/varzforwarder/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package varzforwarder

import (
"time"

"github.com/cloudfoundry/sonde-go/events"
)

type metrics struct {
metricsByName map[string]float64
timer *time.Timer
}

func (metrics *metrics) processMetric(metric *events.Envelope) {
switch metric.GetEventType() {
case events.Envelope_ValueMetric:
metrics.processValueMetric(metric)
case events.Envelope_CounterEvent:
metrics.processCounterEvent(metric)
case events.Envelope_HttpStartStop:
metrics.processHTTPStartStop(metric)
}
}

func (metrics *metrics) processValueMetric(metric *events.Envelope) {
metrics.metricsByName[metric.GetValueMetric().GetName()] = metric.GetValueMetric().GetValue()
}

func (metrics *metrics) processCounterEvent(metric *events.Envelope) {
eventName := metric.GetCounterEvent().GetName()
count := metrics.metricsByName[eventName]
metrics.metricsByName[eventName] = count + float64(metric.GetCounterEvent().GetDelta())
}

func (metrics *metrics) processHTTPStartStop(metric *events.Envelope) {
eventName := "requestCount"
count := metrics.metricsByName[eventName]
metrics.metricsByName[eventName] = count + 1

startStop := metric.GetHttpStartStop()
status := startStop.GetStatusCode()
switch {
case status >= 100 && status < 200:
metrics.metricsByName["responseCount1XX"] = metrics.metricsByName["responseCount1XX"] + 1
case status >= 200 && status < 300:
metrics.metricsByName["responseCount2XX"] = metrics.metricsByName["responseCount2XX"] + 1
case status >= 300 && status < 400:
metrics.metricsByName["responseCount3XX"] = metrics.metricsByName["responseCount3XX"] + 1
case status >= 400 && status < 500:
metrics.metricsByName["responseCount4XX"] = metrics.metricsByName["responseCount4XX"] + 1
case status >= 500 && status < 600:
metrics.metricsByName["responseCount5XX"] = metrics.metricsByName["responseCount5XX"] + 1
default:
}
}
100 changes: 100 additions & 0 deletions src/metron/writers/varzforwarder/varz_forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package varzforwarder

import (
"fmt"
"sync"
"time"

"metron/writers"

"github.com/cloudfoundry/gosteno"
"github.com/cloudfoundry/loggregatorlib/cfcomponent/instrumentation"
"github.com/cloudfoundry/sonde-go/events"
)

type VarzForwarder struct {
metricsByOrigin map[string]*metrics
componentName string
ttl time.Duration

logger *gosteno.Logger
outputWriter writers.EnvelopeWriter
lock sync.RWMutex
}

func New(componentName string, ttl time.Duration, outputWriter writers.EnvelopeWriter, logger *gosteno.Logger) *VarzForwarder {
return &VarzForwarder{
metricsByOrigin: make(map[string]*metrics),
componentName: componentName,
ttl: ttl,
logger: logger,
outputWriter: outputWriter,
}
}

func (vf *VarzForwarder) Write(envelope *events.Envelope) {
vf.addMetric(envelope)
vf.resetTimer(envelope.GetOrigin())

vf.outputWriter.Write(envelope)
}

func (vf *VarzForwarder) Emit() instrumentation.Context {
vf.lock.RLock()
defer vf.lock.RUnlock()

c := instrumentation.Context{Name: "forwarder"}
metrics := []instrumentation.Metric{}
tags := map[string]interface{}{
"component": vf.componentName,
}

for origin, originMetrics := range vf.metricsByOrigin {
for name, value := range originMetrics.metricsByName {
metricName := fmt.Sprintf("%s.%s", origin, name)
metrics = append(metrics, instrumentation.Metric{Name: metricName, Value: value, Tags: tags})
}
}

c.Metrics = metrics
return c
}

func (vf *VarzForwarder) addMetric(metric *events.Envelope) {
vf.lock.Lock()
defer vf.lock.Unlock()

originMetrics, ok := vf.metricsByOrigin[metric.GetOrigin()]
if !ok {
vf.metricsByOrigin[metric.GetOrigin()] = vf.createMetrics(metric.GetOrigin())
originMetrics = vf.metricsByOrigin[metric.GetOrigin()]
}

originMetrics.processMetric(metric)
}

func (vf *VarzForwarder) createMetrics(origin string) *metrics {
vf.logger.Debugf("creating metrics for origin %v", origin)
return &metrics{
metricsByName: make(map[string]float64),
timer: time.AfterFunc(vf.ttl, func() { vf.deleteMetrics(origin) }),
}
}

func (vf *VarzForwarder) deleteMetrics(origin string) {
vf.logger.Debugf("deleting metrics for origin %v", origin)
vf.lock.Lock()
defer vf.lock.Unlock()

delete(vf.metricsByOrigin, origin)
}

func (vf *VarzForwarder) resetTimer(origin string) {
vf.lock.RLock()
defer vf.lock.RUnlock()

metrics, ok := vf.metricsByOrigin[origin]
if ok {
metrics.timer.Reset(vf.ttl)
}
}
13 changes: 13 additions & 0 deletions src/metron/writers/varzforwarder/varz_forwarder_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package varzforwarder_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
)

func TestVarzForwarder(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "VarzForwarder Suite")
}

0 comments on commit eb19a5e

Please sign in to comment.