Skip to content

Commit

Permalink
Merge branch 'main' into sumologicexporter-dont-send-null-log
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Sep 17, 2021
2 parents 7303bbf + c9a2930 commit 76e03f6
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
6 changes: 3 additions & 3 deletions pkg/processor/sourceprocessor/attribute_filler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ type attributeFiller struct {
labels []string
}

func createSourceHostFiller() attributeFiller {
func createSourceHostFiller(sourceHostAttrName string) attributeFiller {
return attributeFiller{
name: sourceHostKey,
compiledFormat: "",
compiledFormat: "%s",
dashReplacement: "",
labels: make([]string, 0),
labels: []string{sourceHostAttrName},
prefix: "",
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/sourceprocessor/source_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newSourceProcessor(cfg *Config) *sourceProcessor {
collector: cfg.Collector,
keys: keys,
source: cfg.Source,
sourceHostFiller: createSourceHostFiller(),
sourceHostFiller: createSourceHostFiller(cfg.SourceHostKey),
sourceCategoryFiller: createSourceCategoryFiller(cfg, keys),
sourceNameFiller: createSourceNameFiller(cfg, keys),
exclude: exclude,
Expand Down
109 changes: 109 additions & 0 deletions pkg/processor/sourceprocessor/source_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ var (
}
)

func newLogsDataWithLogs(resourceAttrs map[string]string, logAttrs map[string]string) pdata.Logs {
ld := pdata.NewLogs()
rs := ld.ResourceLogs().AppendEmpty()
attrs := rs.Resource().Attributes()
for k, v := range resourceAttrs {
attrs.UpsertString(k, v)
}

ills := rs.InstrumentationLibraryLogs().AppendEmpty()
log := ills.Logs().AppendEmpty()
log.Body().SetStringVal("dummy log")
for k, v := range logAttrs {
log.Attributes().InsertString(k, v)
}

return ld
}

func newTraceData(labels map[string]string) pdata.Traces {
td := pdata.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
Expand Down Expand Up @@ -149,6 +167,97 @@ func assertSpansEqual(t *testing.T, t1 pdata.Traces, t2 pdata.Traces) {
}
}

func TestLogsSourceHostKey(t *testing.T) {
resourceAttrs := map[string]string{
"_SYSTEMD_UNIT": `docker.service`,
"_HOSTNAME": `sumologic-kubernetes-collection-hostname`,
}
logAttrs := map[string]string{
"PRIORITY": `6`,
"_BOOT_ID": `7878e140730d4ee89fadd300c929a892`,
"_MACHINE_ID": `1e54ca203e554cda9c944fd7f00e94e1`,
"MESSAGE": `time="2021-09-15T17:31:49.523983251+02:00" level=info msg="ignoring event"module=libcontainerd namespace=moby`,
"_TRANSPORT": `stdout`,
"SYSLOG_FACILITY": `3`,
"_UID": `0`,
"_GID": `0`,
"_CAP_EFFECTIVE": `3fffffffff`,
"_SELINUX_CONTEXT": `unconfined`,
"_SYSTEMD_SLICE": `system.slice`,
"_STREAM_ID": `f16d7d0400a44f228554869816ab1dfa`,
"SYSLOG_IDENTIFIER": `dockerd`,
"_PID": `1061`,
"_COMM": `dockerd`,
"_EXE": `/usr/bin/dockerd`,
"_CMDLINE": `/usr/bin/dockerd -H fd:// --containerd=/run/containerd/containerd.sock`,
"_SYSTEMD_CGROUP": `/system.slice/docker.service`,
"fluent.tag": `host.docker.service`,
"_SYSTEMD_INVOCATION_ID": `1b689467e52f4fc4aa95d3c36fa1d7fa`,
}

t.Run("works using existing resource attribute", func(t *testing.T) {
config := NewFactory().CreateDefaultConfig().(*Config)
config.SourceName = "will-it-work-%{_HOSTNAME}"
config.SourceHostKey = "_HOSTNAME"

pLogs := newLogsDataWithLogs(resourceAttrs, logAttrs)

sp := newSourceProcessor(config)
out, err := sp.ProcessLogs(context.Background(), pLogs)
require.NoError(t, err)

out.ResourceLogs().At(0).Resource().Attributes().Range(func(k string, v pdata.AttributeValue) bool {
t.Logf("k %s : v %v\n", k, v.StringVal())
return true
})

require.Equal(t, out.ResourceLogs().Len(), 1)
resAttrs := out.ResourceLogs().At(0).Resource().Attributes()

{
v, ok := resAttrs.Get("_sourceName")
require.True(t, ok)
assert.Equal(t, "will-it-work-sumologic-kubernetes-collection-hostname", v.StringVal())
}

{
v, ok := resAttrs.Get("_sourceHost")
require.True(t, ok)
assert.Equal(t, "sumologic-kubernetes-collection-hostname", v.StringVal())
}
})

t.Run("does not work using record attribute", func(t *testing.T) {
config := NewFactory().CreateDefaultConfig().(*Config)
config.SourceName = "will-it-work-%{_CMDLINE}"
config.SourceHostKey = "_CMDLINE"

pLogs := newLogsDataWithLogs(resourceAttrs, logAttrs)

sp := newSourceProcessor(config)
out, err := sp.ProcessLogs(context.Background(), pLogs)
require.NoError(t, err)

out.ResourceLogs().At(0).Resource().Attributes().Range(func(k string, v pdata.AttributeValue) bool {
t.Logf("k %s : v %v\n", k, v.StringVal())
return true
})

require.Equal(t, out.ResourceLogs().Len(), 1)
resAttrs := out.ResourceLogs().At(0).Resource().Attributes()

{
_, ok := resAttrs.Get("_sourceName")
require.False(t, ok)
}

{
_, ok := resAttrs.Get("_sourceHost")
require.False(t, ok)
}
})
}

func TestTraceSourceProcessor(t *testing.T) {
want := newTraceData(mergedK8sLabelsWithMeta)
test := newTraceData(k8sLabels)
Expand Down

0 comments on commit 76e03f6

Please sign in to comment.