Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Osquerybeat: Add missing data_stream to events in order to support logstash configuration better #32543

Merged
merged 1 commit into from Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions x-pack/osquerybeat/internal/config/config.go
Expand Up @@ -8,6 +8,8 @@
package config

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/processors"
)

Expand All @@ -19,9 +21,13 @@ import (
// type: logs
// query: select * from usb_devices

const DefaultNamespace = "default"
const (
DefaultNamespace = "default"
DefaultDataset = "osquery_manager.result"
DefaultType = "logs"
)

const datastreamPrefix = "logs-osquery_manager.result-"
var datastreamPrefix = fmt.Sprintf("%s-%s-", DefaultType, DefaultDataset)

type StreamConfig struct {
ID string `config:"id"`
Expand All @@ -34,6 +40,8 @@ type StreamConfig struct {

type DatastreamConfig struct {
Namespace string `config:"namespace"`
Dataset string `config:"dataset"`
Type string `config:"type"`
}

type InputConfig struct {
Expand Down
33 changes: 28 additions & 5 deletions x-pack/osquerybeat/internal/pub/publisher.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config"
"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/ecs"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (p *Publisher) Configure(inputs []config.InputConfig) error {
p.mx.Lock()
defer p.mx.Unlock()

processors, err := processorsForInputsConfig(inputs)
processors, err := p.processorsForInputsConfig(inputs)
if err != nil {
return err
}
Expand Down Expand Up @@ -90,20 +91,42 @@ func (p *Publisher) Close() {
}
}

func processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) {
func (p *Publisher) processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) {
procs = processors.NewList(nil)

// Use only first input processor
// Every input will have a processor that adds the elastic_agent info, we need only one
// Not expecting other processors at the moment and this needs to work for 7.13
for _, input := range inputs {
if len(input.Processors) > 0 {
procs, err = processors.New(input.Processors)
// Attach the data_stream processor. This will append the data_stream attributes to the events.
// This is needed for the proper logstash auto-discovery of the destination datastream for the results.
ds := add_data_stream.DataStream{
Namespace: input.Datastream.Namespace,
Dataset: input.Datastream.Dataset,
Type: input.Datastream.Type,
}
if ds.Namespace == "" {
ds.Namespace = config.DefaultNamespace
}
if ds.Dataset == "" {
ds.Dataset = config.DefaultDataset
}
if ds.Type == "" {
ds.Type = config.DefaultType
}

procs.AddProcessor(add_data_stream.New(ds))

userProcs, err := processors.New(input.Processors)
if err != nil {
return nil, err
}
return procs, nil
procs.AddProcessors(*userProcs)
break
}
}
return nil, nil
return procs, nil
}

func hitToEvent(index, eventType, actionID, responseID string, hit map[string]interface{}, ecsm ecs.Mapping, reqData interface{}) beat.Event {
Expand Down