Skip to content

Commit

Permalink
Osquerybeat: Add missing data_stream to events in order to support lo…
Browse files Browse the repository at this point in the history
…gstash default configuration better (#32543)
  • Loading branch information
aleksmaus committed Aug 1, 2022
1 parent db251f7 commit b26092f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
12 changes: 10 additions & 2 deletions x-pack/osquerybeat/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package config

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/processors"
)

Expand All @@ -19,9 +21,13 @@ import (
// type: logs
// query: select * from usb_devices

const DefaultNamespace = "default"
const (
DefaultNamespace = "default"
DefaultDataset = "osquery_manager.result"
DefaultType = "logs"
)

const datastreamPrefix = "logs-osquery_manager.result-"
var datastreamPrefix = fmt.Sprintf("%s-%s-", DefaultType, DefaultDataset)

type StreamConfig struct {
ID string `config:"id"`
Expand All @@ -34,6 +40,8 @@ type StreamConfig struct {

type DatastreamConfig struct {
Namespace string `config:"namespace"`
Dataset string `config:"dataset"`
Type string `config:"type"`
}

type InputConfig struct {
Expand Down
33 changes: 28 additions & 5 deletions x-pack/osquerybeat/internal/pub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/config"
"github.com/elastic/beats/v7/x-pack/osquerybeat/internal/ecs"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (p *Publisher) Configure(inputs []config.InputConfig) error {
p.mx.Lock()
defer p.mx.Unlock()

processors, err := processorsForInputsConfig(inputs)
processors, err := p.processorsForInputsConfig(inputs)
if err != nil {
return err
}
Expand Down Expand Up @@ -90,20 +91,42 @@ func (p *Publisher) Close() {
}
}

func processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) {
func (p *Publisher) processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) {
procs = processors.NewList(nil)

// Use only first input processor
// Every input will have a processor that adds the elastic_agent info, we need only one
// Not expecting other processors at the moment and this needs to work for 7.13
for _, input := range inputs {
if len(input.Processors) > 0 {
procs, err = processors.New(input.Processors)
// Attach the data_stream processor. This will append the data_stream attributes to the events.
// This is needed for the proper logstash auto-discovery of the destination datastream for the results.
ds := add_data_stream.DataStream{
Namespace: input.Datastream.Namespace,
Dataset: input.Datastream.Dataset,
Type: input.Datastream.Type,
}
if ds.Namespace == "" {
ds.Namespace = config.DefaultNamespace
}
if ds.Dataset == "" {
ds.Dataset = config.DefaultDataset
}
if ds.Type == "" {
ds.Type = config.DefaultType
}

procs.AddProcessor(add_data_stream.New(ds))

userProcs, err := processors.New(input.Processors)
if err != nil {
return nil, err
}
return procs, nil
procs.AddProcessors(*userProcs)
break
}
}
return nil, nil
return procs, nil
}

func hitToEvent(index, eventType, actionID, responseID string, hit map[string]interface{}, ecsm ecs.Mapping, reqData interface{}) beat.Event {
Expand Down

0 comments on commit b26092f

Please sign in to comment.