Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] Update process metricset #9139

Merged
merged 7 commits into from Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/auditbeat/include/list.go
Expand Up @@ -9,7 +9,7 @@ import (
// factories with the global registry.
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/host"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/packages"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/processes"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/process"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/socket"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/user"
)
2 changes: 1 addition & 1 deletion x-pack/auditbeat/module/system/_meta/config.yml.tmpl
Expand Up @@ -6,7 +6,7 @@
metricsets:
- host
- packages
- processes
- process
{{ if eq .GOOS "linux" -}}
- socket
- user
Expand Down
25 changes: 25 additions & 0 deletions x-pack/auditbeat/module/system/process/_meta/data.json
@@ -0,0 +1,25 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"event": {
"action": "existing_process",
"dataset": "process",
"id": "203e3d86-6b94-4e36-b906-930187073b93",
"module": "system",
"type": "state"
},
"process": {
"args": [
"/sbin/init"
],
"executable": "/lib/systemd/systemd",
"name": "systemd",
"pid": 1,
"ppid": 0,
"start": "2018-12-03T23:49:23.08Z",
"working_directory": "/"
}
}
@@ -1,4 +1,4 @@
The System `processes` metricset provides ... TODO.
The System `process` metricset provides ... TODO.

The module is implemented for Linux, macOS (Darwin), and Windows.

Expand Down
Expand Up @@ -2,18 +2,30 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package processes
package process

import (
"time"
)

// Config defines the host metricset's configuration options.
type Config struct {
ReportChanges bool `config:"processes.report_changes"`
StatePeriod time.Duration `config:"state.period"`
ProcessStatePeriod time.Duration `config:"process.state.period"`
}

// Validate validates the host metricset config.
func (c *Config) Validate() error {
return nil
}

func (c *Config) effectiveStatePeriod() time.Duration {
if c.ProcessStatePeriod != 0 {
return c.ProcessStatePeriod
}
return c.StatePeriod
}

var defaultConfig = Config{
ReportChanges: true,
StatePeriod: 12 * time.Hour,
}
262 changes: 262 additions & 0 deletions x-pack/auditbeat/module/system/process/process.go
@@ -0,0 +1,262 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package process

import (
"strconv"
"time"

"github.com/OneOfOne/xxhash"
"github.com/gofrs/uuid"
"github.com/pkg/errors"

"github.com/elastic/beats/auditbeat/datastore"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/metric/system/process"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/auditbeat/cache"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
)

const (
moduleName = "system"
metricsetName = "process"

bucketName = "auditbeat.process.v1"
bucketKeyStateTimestamp = "state_timestamp"

eventTypeState = "state"
eventTypeEvent = "event"

eventActionExistingProcess = "existing_process"
eventActionProcessStarted = "process_started"
eventActionProcessStopped = "process_stopped"
)

func init() {
mb.Registry.MustAddMetricSet(moduleName, metricsetName, New,
mb.DefaultMetricSet(),
)
}

// MetricSet collects data about the host.
type MetricSet struct {
mb.BaseMetricSet
config Config
cache *cache.Cache
log *logp.Logger
bucket datastore.Bucket
lastState time.Time
}

// ProcessInfo wraps the process information and implements cache.Cacheable.
type ProcessInfo struct {
cwurm marked this conversation as resolved.
Show resolved Hide resolved
types.ProcessInfo
}

// Hash creates a hash for ProcessInfo.
func (pInfo ProcessInfo) Hash() uint64 {
h := xxhash.New64()
h.WriteString(strconv.Itoa(pInfo.PID))
h.WriteString(pInfo.StartTime.String())
return h.Sum64()
}

func (pInfo ProcessInfo) toMapStr() common.MapStr {
return common.MapStr{
// https://github.com/elastic/ecs#-process-fields
"name": pInfo.Name,
"args": pInfo.Args,
"pid": pInfo.PID,
"ppid": pInfo.PPID,
"working_directory": pInfo.CWD,
"executable": pInfo.Exe,
"start": pInfo.StartTime,
}
}

// New constructs a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName)

config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName)
}

bucket, err := datastore.OpenBucket(bucketName)
if err != nil {
return nil, errors.Wrap(err, "failed to open persistent datastore")
}

ms := &MetricSet{
BaseMetricSet: base,
config: config,
log: logp.NewLogger(metricsetName),
cache: cache.New(),
bucket: bucket,
}

// Load from disk: Time when state was last sent
err = bucket.Load(bucketKeyStateTimestamp, func(blob []byte) error {
if len(blob) > 0 {
return ms.lastState.UnmarshalBinary(blob)
}
return nil
})
if err != nil {
return nil, err
}
if !ms.lastState.IsZero() {
ms.log.Debugf("Last state was sent at %v. Next state update by %v.", ms.lastState, ms.lastState.Add(ms.config.effectiveStatePeriod()))
} else {
ms.log.Debug("No state timestamp found")
}

return ms, nil
}

// Close cleans up the MetricSet when it finishes.
func (ms *MetricSet) Close() error {
if ms.bucket != nil {
return ms.bucket.Close()
}
return nil
}

// Fetch collects process information. It is invoked periodically.
func (ms *MetricSet) Fetch(report mb.ReporterV2) {
needsStateUpdate := time.Since(ms.lastState) > ms.config.effectiveStatePeriod()
if needsStateUpdate || ms.cache.IsEmpty() {
ms.log.Debugf("State update needed (needsStateUpdate=%v, cache.IsEmpty()=%v)", needsStateUpdate, ms.cache.IsEmpty())
err := ms.reportState(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
ms.log.Debugf("Next state update by %v", ms.lastState.Add(ms.config.effectiveStatePeriod()))
}

err := ms.reportChanges(report)
if err != nil {
ms.log.Error(err)
report.Error(err)
}
}

// reportState reports all running processes on the system.
func (ms *MetricSet) reportState(report mb.ReporterV2) error {
// Only update lastState if this state update was regularly scheduled,
// i.e. not caused by an Auditbeat restart (when the cache would be empty).
if !ms.cache.IsEmpty() {
ms.lastState = time.Now()
}

processInfos, err := ms.getProcessInfos()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
}
ms.log.Debugf("Found %v processes", len(processInfos))

stateID, err := uuid.NewV4()
if err != nil {
return errors.Wrap(err, "error generating state ID")
}
for _, pInfo := range processInfos {
event := processEvent(pInfo, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
}

if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))
}

// Save time so we know when to send the state again (config.StatePeriod)
timeBytes, err := ms.lastState.MarshalBinary()
if err != nil {
return err
}
err = ms.bucket.Store(bucketKeyStateTimestamp, timeBytes)
if err != nil {
return errors.Wrap(err, "error writing state timestamp to disk")
}

return nil
}

// reportChanges detects and reports any changes to processes on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
processInfos, err := ms.getProcessInfos()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
}
ms.log.Debugf("Found %v processes", len(processInfos))

started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))

for _, pInfo := range started {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStarted))
}

for _, pInfo := range stopped {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStopped))
}

return nil
}

func processEvent(pInfo *ProcessInfo, eventType string, eventAction string) mb.Event {
return mb.Event{
RootFields: common.MapStr{
"event": common.MapStr{
"type": eventType,
"action": eventAction,
},
"process": pInfo.toMapStr(),
},
}
}

func convertToCacheable(processInfos []*ProcessInfo) []cache.Cacheable {
c := make([]cache.Cacheable, 0, len(processInfos))

for _, p := range processInfos {
c = append(c, p)
}

return c
}

func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, error) {
// TODO: Implement Processes() in go-sysinfo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we merge to master we should collect a list of the open TODOs in the module and make sure we don't forget to address them. This one would be caught be CI once we have it running on darwin.

Copy link
Contributor Author

@cwurm cwurm Dec 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the CI will fail when there are TODO comments in the code? I've been using them quite liberally, also for minor things that we can address anytime (maybe never, to be honest).

Kind of as breadcrumbs meaning "If somebody ever touches this code again and has some spare cycles, maybe do this".

// e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41
pids, err := process.Pids()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch the list of PIDs")
}

var processInfos []*ProcessInfo

for _, pid := range pids {
process, err := sysinfo.Process(pid)
if err != nil {
return nil, errors.Wrap(err, "failed to load process")
}

pInfo, err := process.Info()
if err != nil {
return nil, errors.Wrap(err, "failed to load process information")
}

processInfos = append(processInfos, &ProcessInfo{pInfo})
}

return processInfos, nil
}
Expand Up @@ -2,25 +2,33 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package processes
package process

import (
"testing"

"github.com/elastic/beats/auditbeat/core"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

func TestData(t *testing.T) {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(f, t, "")
if err != nil {
t.Fatal("write", err)
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("received error: %+v", errs[0])
}

if len(events) == 0 {
t.Fatal("no events were generated")
}

fullEvent := mbtest.StandardizeEvent(f, events[0], core.AddDatasetToEvent)
mbtest.WriteEventToDataJSON(t, fullEvent, "")
}

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "system",
"metricsets": []string{"processes"},
"metricsets": []string{"process"},
}
}