Skip to content

Commit

Permalink
Merge pull request #310 from hligit/dynamic-headers
Browse files Browse the repository at this point in the history
Add support for adding http headers extracted from tags
  • Loading branch information
hligit committed Jun 26, 2020
2 parents ef54071 + b0b2f2d commit 3f6dee5
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
UNRELEASED
------
- Added experimental support for adding http headers based on metrics tags

22.0.0
------
- Refactor pprof endpoints to sit under `/debug/pprof`
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ following configuration options:
- `log-raw-metric`: logs raw metrics received from the network. Defaults to `false`.
- `custom-headers` : a map of strings that are added to each request sent to allow for additional network routing / request inspection.
Not required, default is empty. Example: `--custom-headers='{"region" : "us-east-1", "service" : "event-producer"}'`
- `dynamic-headers` : similar with `custom-headers`, but the header values are extracted from metric tags matching the
provided list of string. Tag names are canonicalized by first replacing underscores with hyphens, then converting
first letter and each letter after a hyphen to uppercase, the rest are converted to lower case. If a tag is specified
in both `custom-header` and `dynamic-header`, the vaule set by `custom-header` takes precedence. Not required, default
is empty. Example: `--dynamic-headers='["region", "service"]'`.
This is an experimental feature and it may be removed or changed in future versions.


Metric expiry and persistence
Expand Down
2 changes: 1 addition & 1 deletion examples/cloudproviders/k8s/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ spec:
image: alpine:latest
# Send a metric called `metric.namespace` which is a counter with a value of 1, tagged with some dummy tags,
# to the local gostatsd DaemonSet. Do this once every 10 seconds.
command: ['/bin/sh', '-c', 'while 1; echo "metric.namespace:1|c|#tag1:value1,tag2:value2" | nc -w 1 -u $MY_NODE_IP 8125; sleep 10; done']
command: ['/bin/sh', '-c', 'while 1; do echo "metric.namespace:1|c|#tag1:value1,tag2:value2" | nc -w 1 -u $MY_NODE_IP 8125; sleep 10; done']
env:
- name: MY_NODE_IP
valueFrom:
Expand Down
78 changes: 78 additions & 0 deletions metric_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"strings"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -176,6 +177,83 @@ func (mm *MetricMap) Split(count int) []*MetricMap {
return maps
}

func tagsMatch(tagNames []string, tagsKey string) string {
res := make([]string, 0)
for _, tv := range strings.Split(tagsKey, ",") {
for _, tagName := range tagNames {
if tagName == "" {
break
}
if strings.HasPrefix(tv, tagName) {
res = append(res, tv)
break
}
}
}
return strings.Join(res, ",")
}

func (mm *MetricMap) SplitByTags(tagNames []string) map[string]*MetricMap {
maps := make(map[string]*MetricMap)
if len(tagNames) == 0 {
maps[""] = mm
return maps
}
mm.Counters.Each(func(metricName string, tagsKey string, c Counter) {
key := tagsMatch(tagNames, tagsKey)
if _, ok := maps[key]; !ok {
maps[key] = NewMetricMap()
}
mmSplit := maps[key]
if v, ok := mmSplit.Counters[metricName]; ok {
v[tagsKey] = c
} else {
mmSplit.Counters[metricName] = map[string]Counter{tagsKey: c}
}
})

mm.Gauges.Each(func(metricName string, tagsKey string, g Gauge) {
key := tagsMatch(tagNames, tagsKey)
if _, ok := maps[key]; !ok {
maps[key] = NewMetricMap()
}
mmSplit := maps[key]
if v, ok := mmSplit.Gauges[metricName]; ok {
v[tagsKey] = g
} else {
mmSplit.Gauges[metricName] = map[string]Gauge{tagsKey: g}
}
})

mm.Timers.Each(func(metricName string, tagsKey string, t Timer) {
key := tagsMatch(tagNames, tagsKey)
if _, ok := maps[key]; !ok {
maps[key] = NewMetricMap()
}
mmSplit := maps[key]
if v, ok := mmSplit.Timers[metricName]; ok {
v[tagsKey] = t
} else {
mmSplit.Timers[metricName] = map[string]Timer{tagsKey: t}
}
})

mm.Sets.Each(func(metricName string, tagsKey string, s Set) {
key := tagsMatch(tagNames, tagsKey)
if _, ok := maps[key]; !ok {
maps[key] = NewMetricMap()
}
mmSplit := maps[key]
if v, ok := mmSplit.Sets[metricName]; ok {
v[tagsKey] = s
} else {
mmSplit.Sets[metricName] = map[string]Set{tagsKey: s}
}
})

return maps
}

func (mm *MetricMap) receiveCounter(m *Metric, tagsKey string) {
value := int64(m.Value / m.Rate)
v, ok := mm.Counters[m.Name]
Expand Down
68 changes: 68 additions & 0 deletions metric_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,71 @@ func TestMetricMapIsEmpty(t *testing.T) {
mm.Sets.Delete("m")
require.True(t, mm.IsEmpty())
}

func TestTagsMatch(t *testing.T) {
tagsKey := "author:bob,env:dev,region:us-east-1,service:monitor,other:abc"

require.Equal(t, tagsMatch([]string{}, tagsKey), "")
require.Equal(t, tagsMatch([]string{"env"}, tagsKey), "env:dev")
require.Equal(t, tagsMatch([]string{"env", "service"}, tagsKey), "env:dev,service:monitor")
}

func TestMetricMapSplitByTags(t *testing.T) {
mmOriginal := NewMetricMap()
mmOriginal.Counters["m"] = map[string]Counter{
"t:x,s:h1": {Tags: Tags{"t:x"}, Hostname: "h1", Value: 10},
"t:x,s:h2": {Tags: Tags{"t:x"}, Hostname: "h2", Value: 20},
"t:x,v:1,s:h3": {Tags: Tags{"t:x", "v:1"}, Hostname: "h3", Value: 30},
"t:y,v:2,s:h4": {Tags: Tags{"t:y", "v:2"}, Hostname: "h4", Value: 40},
"t:y,x,s:h5": {Tags: Tags{"t:y", "x"}, Hostname: "h5", Value: 50},
}
mmOriginal.Gauges["m"] = map[string]Gauge{
"t:x,s:h1": {Tags: Tags{"t:x"}, Hostname: "h1", Value: 10},
"t:x,s:h2": {Tags: Tags{"t:x"}, Hostname: "h2", Value: 20},
"t:x,v:1,s:h3": {Tags: Tags{"t:x", "v:1"}, Hostname: "h3", Value: 30},
"t:y,v:2,s:h4": {Tags: Tags{"t:y", "v:2"}, Hostname: "h4", Value: 40},
"t:y,x,s:h5": {Tags: Tags{"t:y", "x"}, Hostname: "h5", Value: 50},
}
mmOriginal.Timers["m"] = map[string]Timer{
"t:x,s:h1": {Values: []float64{10, 50}, Tags: Tags{"t:x"}, Hostname: "h1"},
"t:x,s:h2": {Values: []float64{20, 40}, Tags: Tags{"t:x"}, Hostname: "h2"},
"t:x,v:1,s:h3": {Values: []float64{30, 30}, Tags: Tags{"t:x", "v:1"}, Hostname: "h3"},
"t:y,v:2,s:h4": {Values: []float64{40, 20}, Tags: Tags{"t:y", "v:2"}, Hostname: "h4"},
"t:y,x,s:h5": {Values: []float64{50, 10}, Tags: Tags{"t:y", "x"}, Hostname: "h5"},
}
mmOriginal.Sets["m"] = map[string]Set{
"t:x,s:h1": {Values: map[string]struct{}{"10": {}, "50": {}}, Tags: Tags{"t:x"}, Hostname: "h1"},
"t:x,s:h2": {Values: map[string]struct{}{"20": {}, "40": {}}, Tags: Tags{"t:x"}, Hostname: "h2"},
"t:x,v:1,s:h3": {Values: map[string]struct{}{"30": {}, "3.0": {}}, Tags: Tags{"t:x", "v:1"}, Hostname: "h3"},
"t:y,v:2,s:h4": {Values: map[string]struct{}{"40": {}, "20": {}}, Tags: Tags{"t:y", "v:2"}, Hostname: "h4"},
"t:y,x,s:h5": {Values: map[string]struct{}{"50": {}, "10": {}}, Tags: Tags{"t:y", "x"}, Hostname: "h5"},
}

// no-op if given empty tagNames
mms := mmOriginal.SplitByTags([]string{})
require.Equal(t, len(mms), 1)

// empty tag name doesn't match
mms = mmOriginal.SplitByTags([]string{""})
require.Equal(t, len(mms), 1)

// non existing tag name doesn't match
mms = mmOriginal.SplitByTags([]string{"key:"})
require.Equal(t, len(mms), 1)

// valueless tag doesn't match
mms = mmOriginal.SplitByTags([]string{"x:"})
require.Equal(t, len(mms), 1)

// each value of tag s is unique
mms = mmOriginal.SplitByTags([]string{"s:"})
require.Equal(t, len(mms), 5)

// two possible values for tag t
mms = mmOriginal.SplitByTags([]string{"t:"})
require.Equal(t, len(mms), 2)

// all value combinations of tag t and v
mms = mmOriginal.SplitByTags([]string{"t:", "v:"})
require.Equal(t, len(mms), 4)
}
2 changes: 1 addition & 1 deletion pkg/backends/datadog/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/atlassian/gostatsd/pkg/util"

"github.com/cenkalti/backoff"
"github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
Expand Down
54 changes: 40 additions & 14 deletions pkg/statsd/handler_http_forwarder_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -54,6 +55,7 @@ type HttpForwarderHandlerV2 struct {
eventWg sync.WaitGroup
compress bool
headers map[string]string
dynHeaderNames []string
}

// NewHttpForwarderHandlerV2FromViper returns a new http API client.
Expand All @@ -77,6 +79,7 @@ func NewHttpForwarderHandlerV2FromViper(logger logrus.FieldLogger, v *viper.Vipe
subViper.GetDuration("max-request-elapsed-time"),
subViper.GetDuration("flush-interval"),
subViper.GetStringMapString("custom-headers"),
subViper.GetStringSlice("dynamic-headers"),
pool,
)
}
Expand All @@ -92,6 +95,7 @@ func NewHttpForwarderHandlerV2(
maxRequestElapsedTime time.Duration,
flushInterval time.Duration,
xheaders map[string]string,
dynHeaderNames []string,
pool *transport.TransportPool,
) (*HttpForwarderHandlerV2, error) {
if apiEndpoint == "" {
Expand Down Expand Up @@ -139,6 +143,18 @@ func NewHttpForwarderHandlerV2(
headers[k] = v
}

dynHeaderNamesWithColon := make([]string, 0)
for _, name := range dynHeaderNames {
if name == "" {
continue
}
if _, ok := xheaders[name]; !ok {
dynHeaderNamesWithColon = append(dynHeaderNamesWithColon, name+":")
} else {
logger.WithField("header-name", name).Warn("static and dynamic header defined, using static")
}
}

metricsSem := make(chan struct{}, maxRequests)
for i := 0; i < maxRequests; i++ {
metricsSem <- struct{}{}
Expand All @@ -156,6 +172,7 @@ func NewHttpForwarderHandlerV2(
consolidatedMetrics: ch,
client: httpClient.Client,
headers: headers,
dynHeaderNames: dynHeaderNamesWithColon,
}, nil
}

Expand Down Expand Up @@ -212,15 +229,18 @@ func (hfh *HttpForwarderHandlerV2) Run(ctx context.Context) {
case <-ctx.Done():
return
case metricMaps := <-hfh.consolidatedMetrics:
if !hfh.acquireSem(ctx) {
return
mergedMetricMap := mergeMaps(metricMaps)
mms := mergedMetricMap.SplitByTags(hfh.dynHeaderNames)
for dynHeaderTags, mm := range mms {
if !hfh.acquireSem(ctx) {
return
}
postId := atomic.AddUint64(&hfh.postId, 1) - 1
go func(postId uint64, metricMap *gostatsd.MetricMap, dynHeaderTags string) {
hfh.postMetrics(ctx, metricMap, dynHeaderTags, postId)
hfh.releaseSem()
}(postId, mm, dynHeaderTags)
}
metricMap := mergeMaps(metricMaps)
postId := atomic.AddUint64(&hfh.postId, 1) - 1
go func(postId uint64) {
hfh.postMetrics(ctx, metricMap, postId)
hfh.releaseSem()
}(postId)
}
}
}
Expand Down Expand Up @@ -305,18 +325,18 @@ func translateToProtobufV2(metricMap *gostatsd.MetricMap) *pb.RawMessageV2 {
return &pbMetricMap
}

func (hfh *HttpForwarderHandlerV2) postMetrics(ctx context.Context, metricMap *gostatsd.MetricMap, batchId uint64) {
func (hfh *HttpForwarderHandlerV2) postMetrics(ctx context.Context, metricMap *gostatsd.MetricMap, dynHeaderTags string, batchId uint64) {
message := translateToProtobufV2(metricMap)
hfh.post(ctx, message, batchId, "metrics", "/v2/raw")
hfh.post(ctx, message, dynHeaderTags, batchId, "metrics", "/v2/raw")
}

func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Message, id uint64, endpointType, endpoint string) {
func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Message, dynHeaderTags string, id uint64, endpointType, endpoint string) {
logger := hfh.logger.WithFields(logrus.Fields{
"id": id,
"type": endpointType,
})

post, err := hfh.constructPost(ctx, logger, hfh.apiEndpoint+endpoint, message)
post, err := hfh.constructPost(ctx, logger, hfh.apiEndpoint+endpoint, message, dynHeaderTags)
if err != nil {
atomic.AddUint64(&hfh.messagesInvalid, 1)
logger.WithError(err).Error("failed to create request")
Expand Down Expand Up @@ -395,7 +415,7 @@ func (hfh *HttpForwarderHandlerV2) serializeAndCompress(message proto.Message) (
return buf.Bytes(), nil
}

func (hfh *HttpForwarderHandlerV2) constructPost(ctx context.Context, logger logrus.FieldLogger, path string, message proto.Message) (func() error /*doPost*/, error) {
func (hfh *HttpForwarderHandlerV2) constructPost(ctx context.Context, logger logrus.FieldLogger, path string, message proto.Message, dynHeaderTags string) (func() error /*doPost*/, error) {
var body []byte
var err error
var encoding string
Expand All @@ -418,6 +438,12 @@ func (hfh *HttpForwarderHandlerV2) constructPost(ctx context.Context, logger log
return fmt.Errorf("unable to create http.Request: %v", err)
}
req = req.WithContext(ctx)
for _, tv := range strings.Split(dynHeaderTags, ",") {
vs := strings.SplitN(tv, ":", 2)
if len(vs) > 1 {
req.Header.Set(strings.ReplaceAll(vs[0], "_", "-"), vs[1])
}
}
for header, v := range hfh.headers {
req.Header.Set(header, v)
}
Expand Down Expand Up @@ -485,7 +511,7 @@ func (hfh *HttpForwarderHandlerV2) dispatchEvent(ctx context.Context, e *gostats
message.Type = pb.EventV2_Success
}

hfh.post(ctx, message, postId, "event", "/v2/event")
hfh.post(ctx, message, "", postId, "event", "/v2/event")

defer hfh.eventWg.Done()
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/statsd/handler_http_forwarder_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package statsd

import (
"testing"
"time"

"github.com/atlassian/gostatsd"
"github.com/atlassian/gostatsd/pb"
"github.com/atlassian/gostatsd/pkg/transport"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -202,3 +206,28 @@ func BenchmarkHttpForwarderV2TranslateAll(b *testing.B) {
translateToProtobufV2(mm)
}
}

func TestHttpForwarderV2New(t *testing.T) {
logger := logrus.New()
pool := transport.NewTransportPool(logger, viper.New())
cusHeaders := map[string]string{"region": "us", "env": "dev"}

for _, testcase := range []struct {
dynHeaders []string
expected []string
}{
{
dynHeaders: []string{"service", "deploy"},
expected: []string{"service:", "deploy:"},
},
{
dynHeaders: []string{"service", "deploy", "env"},
expected: []string{"service:", "deploy:"},
},
} {
h, err := NewHttpForwarderHandlerV2(logger, "default", "endpoint", 1, 1, false, time.Second, time.Second,
cusHeaders, testcase.dynHeaders, pool)
require.Nil(t, err)
require.Equal(t, h.dynHeaderNames, testcase.expected)
}
}
1 change: 1 addition & 0 deletions pkg/web/http_receiver_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestForwardingEndToEndV2(t *testing.T) {
10*time.Second,
10*time.Millisecond,
nil,
nil,
p,
)
require.NoError(t, err)
Expand Down

0 comments on commit 3f6dee5

Please sign in to comment.