Skip to content

Commit

Permalink
feat(sumologicexporter): add source category, host and name overrides…
Browse files Browse the repository at this point in the history
… for otlp (#132)
  • Loading branch information
pmalek committed Aug 6, 2021
1 parent ef9ea79 commit afd1d35
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 25 deletions.
23 changes: 23 additions & 0 deletions pkg/exporter/sumologicexporter/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const (
headerCategory string = "X-Sumo-Category"
headerFields string = "X-Sumo-Fields"

attributeKeySourceHost = "_sourceHost"
attributeKeySourceName = "_sourceName"
attributeKeySourceCategory = "_sourceCategory"

contentTypeLogs string = "application/x-www-form-urlencoded"
contentTypePrometheus string = "application/vnd.sumologic.prometheus"
contentTypeCarbon2 string = "application/vnd.sumologic.carbon2"
Expand Down Expand Up @@ -273,6 +277,8 @@ func (s *sender) sendOTLPLogs(ctx context.Context, flds fields) ([]pdata.LogReco
record.CopyTo(logs.AppendEmpty())
}

s.addResourceAttributes(rl.Resource().Attributes(), flds)

body, err := logsMarshaler.MarshalLogs(ld)
if err != nil {
return s.logBuffer, err
Expand Down Expand Up @@ -366,6 +372,7 @@ func (s *sender) sendOTLPMetrics(ctx context.Context, flds fields) ([]metricPair
for _, record := range s.metricBuffer {
rm := rms.AppendEmpty()
record.attributes.CopyTo(rm.Resource().Attributes())
s.addResourceAttributes(rm.Resource().Attributes(), flds)
ilm := rm.InstrumentationLibraryMetrics().AppendEmpty()
ms := ilm.Metrics().AppendEmpty()
record.metric.CopyTo(ms)
Expand Down Expand Up @@ -435,6 +442,10 @@ func (s *sender) sendTraces(ctx context.Context, td pdata.Traces, flds fields) e

// sendOTLPTraces sends trace records in OTLP format
func (s *sender) sendOTLPTraces(ctx context.Context, td pdata.Traces, flds fields) error {
for i := 0; i < td.ResourceSpans().Len(); i++ {
s.addResourceAttributes(td.ResourceSpans().At(i).Resource().Attributes(), flds)
}

body, err := tracesMarshaler.MarshalTraces(td)
if err != nil {
return err
Expand Down Expand Up @@ -581,3 +592,15 @@ func (s *sender) addRequestHeaders(req *http.Request, pipeline PipelineType, fld
}
return nil
}

func (s *sender) addResourceAttributes(attrs pdata.AttributeMap, flds fields) {
if s.sources.host.isSet() {
attrs.InsertString(attributeKeySourceHost, s.sources.host.format(flds))
}
if s.sources.name.isSet() {
attrs.InsertString(attributeKeySourceName, s.sources.name.format(flds))
}
if s.sources.category.isSet() {
attrs.InsertString(attributeKeySourceCategory, s.sources.category.format(flds))
}
}
187 changes: 162 additions & 25 deletions pkg/exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,62 @@ func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.
}
}

// prepareOTLPSenderTest prepares sender test environment.
// The enclosed httptest.Server is closed automatically using test.Cleanup.
func prepareOTLPSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request)) *senderTest {
var reqCounter int32
// generate a test server so we can capture and inspect the request
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if len(cb) == 0 {
return
}

if c := int(atomic.LoadInt32(&reqCounter)); assert.Greater(t, len(cb), c) {
cb[c](w, req)
atomic.AddInt32(&reqCounter, 1)
}
}))
t.Cleanup(func() { testServer.Close() })

cfg := createDefaultConfig().(*Config)
cfg.CompressEncoding = NoCompression
cfg.HTTPClientSettings.Endpoint = testServer.URL

f, err := newFilter(cfg.MetadataAttributes)
require.NoError(t, err)

c, err := newCompressor(cfg.CompressEncoding)
require.NoError(t, err)

pf, err := newPrometheusFormatter()
require.NoError(t, err)

gf, err := newGraphiteFormatter(cfg.GraphiteTemplate)
require.NoError(t, err)

return &senderTest{
srv: testServer,
s: newSender(
cfg,
&http.Client{
Timeout: cfg.HTTPClientSettings.Timeout,
},
f,
sourceFormats{
host: getTestSourceFormat(t, "source_host"),
category: getTestSourceFormat(t, "source_category"),
name: getTestSourceFormat(t, "source_name"),
},
c,
pf,
gf,
testServer.URL,
testServer.URL,
testServer.URL,
),
}
}

func extractBody(t *testing.T, req *http.Request) string {
buf := new(strings.Builder)
_, err := io.Copy(buf, req.Body)
Expand Down Expand Up @@ -174,6 +230,9 @@ func exampleTrace() pdata.Traces {
td := pdata.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
rs.Resource().Attributes().UpsertString("hostname", "testHost")
rs.Resource().Attributes().UpsertString("_sourceHost", "source_host")
rs.Resource().Attributes().UpsertString("_sourceName", "source_name")
rs.Resource().Attributes().UpsertString("_sourceCategory", "source_category")
span := rs.InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty()
span.SetTraceID(pdata.NewTraceID([16]byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x3, 0x81, 0x3, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0xC}))
span.SetSpanID(pdata.NewSpanID([8]byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73}))
Expand Down Expand Up @@ -432,7 +491,7 @@ func TestSendLogsOTLP(t *testing.T) {
func(w http.ResponseWriter, req *http.Request) {
body := extractBody(t, req)
//nolint:lll
assert.Equal(t, "\n\x80\x01\n\x00\x12|\n\x00\x127*\r\n\vExample log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00\x12?*\x15\n\x13Another example log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00", body)
assert.Equal(t, "\n\xe2\x01\nb\n\x1c\n\v_sourceHost\x12\r\n\vsource_host\n\x1c\n\v_sourceName\x12\r\n\vsource_name\n$\n\x0f_sourceCategory\x12\x11\n\x0fsource_category\x12|\n\x00\x127*\r\n\vExample log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00\x12?*\x15\n\x13Another example log2\x10\n\x04key1\x12\b\n\x06value12\x10\n\x04key2\x12\b\n\x06value2J\x00R\x00", body)
assert.Equal(t, "key1=value, key2=value2", req.Header.Get("X-Sumo-Fields"))
assert.Equal(t, "otelcol", req.Header.Get("X-Sumo-Client"))
assert.Equal(t, "application/x-protobuf", req.Header.Get("Content-Type"))
Expand All @@ -447,45 +506,123 @@ func TestSendLogsOTLP(t *testing.T) {
}

func TestOverrideSourceName(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source name/test_name", req.Header.Get("X-Sumo-Name"))
},
t.Run("text format", func(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source name/test_name", req.Header.Get("X-Sumo-Name"))
},
})

test.s.sources.name = getTestSourceFormat(t, "Test source name/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})

test.s.sources.name = getTestSourceFormat(t, "Test source name/%{key1}")
test.s.logBuffer = exampleLog()
t.Run("otlp", func(t *testing.T) {
test := prepareOTLPSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
unmarshaller := otlp.NewProtobufLogsUnmarshaler()
b, err := io.ReadAll(req.Body)
require.NoError(t, err)
l, err := unmarshaller.UnmarshalLogs(b)
require.NoError(t, err)

require.Equal(t, l.ResourceLogs().Len(), 1)
sourceCategory, ok := l.ResourceLogs().At(0).Resource().Attributes().Get("_sourceName")
require.True(t, ok)
require.Equal(t, pdata.AttributeValueTypeString, sourceCategory.Type())
require.Equal(t, "Test source name/test_name", sourceCategory.StringVal())
},
})

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
test.s.sources.name = getTestSourceFormat(t, "Test source name/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})
}

func TestOverrideSourceCategory(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source category/test_name", req.Header.Get("X-Sumo-Category"))
},
t.Run("text format", func(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source category/test_name", req.Header.Get("X-Sumo-Category"))
},
})

test.s.sources.category = getTestSourceFormat(t, "Test source category/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})

test.s.sources.category = getTestSourceFormat(t, "Test source category/%{key1}")
test.s.logBuffer = exampleLog()
t.Run("otlp", func(t *testing.T) {
test := prepareOTLPSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
unmarshaller := otlp.NewProtobufLogsUnmarshaler()
b, err := io.ReadAll(req.Body)
require.NoError(t, err)
l, err := unmarshaller.UnmarshalLogs(b)
require.NoError(t, err)

require.Equal(t, l.ResourceLogs().Len(), 1)
sourceCategory, ok := l.ResourceLogs().At(0).Resource().Attributes().Get("_sourceCategory")
require.True(t, ok)
require.Equal(t, pdata.AttributeValueTypeString, sourceCategory.Type())
require.Equal(t, "Test source category/test_name", sourceCategory.StringVal())
},
})

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
test.s.sources.category = getTestSourceFormat(t, "Test source category/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})
}

func TestOverrideSourceHost(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source host/test_name", req.Header.Get("X-Sumo-Host"))
},
t.Run("text format", func(t *testing.T) {
test := prepareSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "Test source host/test_name", req.Header.Get("X-Sumo-Host"))
},
})

test.s.sources.host = getTestSourceFormat(t, "Test source host/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})

test.s.sources.host = getTestSourceFormat(t, "Test source host/%{key1}")
test.s.logBuffer = exampleLog()
t.Run("otlp", func(t *testing.T) {
test := prepareOTLPSenderTest(t, []func(w http.ResponseWriter, req *http.Request){
func(w http.ResponseWriter, req *http.Request) {
unmarshaller := otlp.NewProtobufLogsUnmarshaler()
b, err := io.ReadAll(req.Body)
require.NoError(t, err)
l, err := unmarshaller.UnmarshalLogs(b)
require.NoError(t, err)

require.Equal(t, l.ResourceLogs().Len(), 1)
sourceHost, ok := l.ResourceLogs().At(0).Resource().Attributes().Get("_sourceHost")
require.True(t, ok)
require.Equal(t, pdata.AttributeValueTypeString, sourceHost.Type())
require.Equal(t, "Test source host/test_name", sourceHost.StringVal())
},
})

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
test.s.sources.host = getTestSourceFormat(t, "Test source host/%{key1}")
test.s.logBuffer = exampleLog()

_, err := test.s.sendLogs(context.Background(), fieldsFromMap(map[string]string{"key1": "test_name"}))
assert.NoError(t, err)
})
}

func TestLogsBuffer(t *testing.T) {
Expand Down

0 comments on commit afd1d35

Please sign in to comment.