Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #4506 to 5.x: Filebeat modules: Machine learning jobs #4588

Merged
merged 6 commits into from Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -70,6 +70,8 @@ https://github.com/elastic/beats/compare/v5.4.1...master[Check the HEAD diff]

*Filebeat*

- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]

*Heartbeat*

*Metricbeat*
Expand Down
46 changes: 40 additions & 6 deletions filebeat/beater/filebeat.go
@@ -1,11 +1,12 @@
package beater

import (
"errors"
"flag"
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -54,8 +55,16 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Add prospectors created by the modules
config.Prospectors = append(config.Prospectors, moduleProspectors...)

if !config.ProspectorReload.Enabled() && len(config.Prospectors) == 0 {
return nil, errors.New("No prospectors defined. What files do you want me to watch?")
haveEnabledProspectors := false
for _, prospector := range config.Prospectors {
if prospector.Enabled() {
haveEnabledProspectors = true
break
}
}

if !config.ProspectorReload.Enabled() && !haveEnabledProspectors {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
}

if *once && config.ProspectorReload.Enabled() {
Expand All @@ -67,12 +76,19 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: &config,
moduleRegistry: moduleRegistry,
}

// register `setup` callback for ML jobs
if !moduleRegistry.Empty() {
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
}
return fb, nil
}

// modulesSetup is called when modules are configured to do the initial
// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) modulesSetup(b *beat.Beat) error {
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand All @@ -95,13 +111,31 @@ func (fb *Filebeat) modulesSetup(b *beat.Beat) error {
return nil
}

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")

esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
" modules because the Elasticsearch output is not configured/enabled.")
return nil
}

esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}

return fb.moduleRegistry.LoadML(esClient)
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

if !fb.moduleRegistry.Empty() {
err = fb.modulesSetup(b)
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
Expand Down
30 changes: 25 additions & 5 deletions filebeat/fileset/fileset.go
Expand Up @@ -17,6 +17,7 @@ import (
"text/template"

"github.com/elastic/beats/libbeat/common"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
)

// Fileset struct is the representation of a fileset.
Expand Down Expand Up @@ -74,11 +75,16 @@ func (fs *Fileset) Read(beatVersion string) error {
// manifest structure is the representation of the manifest.yml file from the
// fileset.
type manifest struct {
ModuleVersion string `config:"module_version"`
Vars []map[string]interface{} `config:"var"`
IngestPipeline string `config:"ingest_pipeline"`
Prospector string `config:"prospector"`
Requires struct {
ModuleVersion string `config:"module_version"`
Vars []map[string]interface{} `config:"var"`
IngestPipeline string `config:"ingest_pipeline"`
Prospector string `config:"prospector"`
MachineLearning []struct {
Name string `config:"name"`
Job string `config:"job"`
Datafeed string `config:"datafeed"`
} `config:"machine_learning"`
Requires struct {
Processors []ProcessorRequirement `config:"processors"`
} `config:"requires"`
}
Expand Down Expand Up @@ -310,3 +316,17 @@ func removeExt(path string) string {
func (fs *Fileset) GetRequiredProcessors() []ProcessorRequirement {
return fs.manifest.Requires.Processors
}

// GetMLConfigs returns the list of machine-learning configurations declared
// by this fileset.
func (fs *Fileset) GetMLConfigs() []mlimporter.MLConfig {
var mlConfigs []mlimporter.MLConfig
for _, ml := range fs.manifest.MachineLearning {
mlConfigs = append(mlConfigs, mlimporter.MLConfig{
ID: fmt.Sprintf("filebeat-%s-%s-%s", fs.mcfg.Module, fs.name, ml.Name),
JobPath: filepath.Join(fs.modulePath, fs.name, ml.Job),
DatafeedPath: filepath.Join(fs.modulePath, fs.name, ml.Datafeed),
})
}
return mlConfigs
}
29 changes: 28 additions & 1 deletion filebeat/fileset/modules.go
Expand Up @@ -2,15 +2,17 @@ package fileset

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
"github.com/elastic/beats/libbeat/paths"
)

Expand Down Expand Up @@ -418,6 +420,31 @@ func interpretError(initialErr error, body []byte) error {
return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// LoadML loads the machine-learning configurations into Elasticsearch, if Xpack is avaiable
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
if err != nil {
return errors.Errorf("Error checking if xpack is available: %v", err)
}
if !haveXpack {
logp.Warn("Xpack Machine Learning is not enabled")
return nil
}

for module, filesets := range reg.registry {
for name, fileset := range filesets {
for _, mlConfig := range fileset.GetMLConfigs() {
err = mlimporter.ImportMachineLearningJob(esClient, &mlConfig)
if err != nil {
return errors.Errorf("Error loading ML config from %s/%s: %v", module, name, err)
}
}
}
}

return nil
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
Expand Down
@@ -0,0 +1,44 @@
{
"job_id": "JOB_ID",
"query_delay": "60s",
"frequency": "60s",
"indexes": [
"filebeat-*"
],
"types": [
"_default_",
"log"
],
"query": {
"match_all": {
"boost": 1
}
},
"aggregations": {
"buckets": {
"date_histogram": {
"field": "@timestamp",
"interval": 3600000,
"offset": 0,
"order": {
"_key": "asc"
},
"keyed": false,
"min_doc_count": 0
},
"aggregations": {
"@timestamp": {
"max": {
"field": "@timestamp"
}
},
"nginx.access.response_code": {
"terms": {
"field": "nginx.access.response_code",
"size": 10000
}
}
}
}
}
}
23 changes: 23 additions & 0 deletions filebeat/module/nginx/access/machine_learning/response_code.json
@@ -0,0 +1,23 @@
{
"description" : "Anomaly detector for changes in event rates of nginx.access.response_code responses",
"analysis_config" : {
"bucket_span": "1h",
"summary_count_field_name": "doc_count",
"detectors": [
{
"detector_description": "Event rate for nginx.access.response_code",
"function": "count",
"detector_rules": [],
"partition_field_name": "nginx.access.response_code"
}
],
"influencers": ["nginx.access.response_code"]
},
"data_description": {
"time_field": "@timestamp",
"time_format": "epoch_ms"
},
"model_plot_config": {
"enabled": true
}
}
5 changes: 5 additions & 0 deletions filebeat/module/nginx/access/manifest.yml
Expand Up @@ -12,6 +12,11 @@ var:
ingest_pipeline: ingest/default.json
prospector: config/nginx-access.yml

machine_learning:
- name: response_code
job: machine_learning/response_code.json
datafeed: machine_learning/datafeed_response_code.json

requires.processors:
- name: user_agent
plugin: ingest-user-agent
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/files/logs/nginx.log
@@ -0,0 +1 @@
127.0.0.1 - - [07/Mar/2017:11:23:14 -0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"
46 changes: 46 additions & 0 deletions filebeat/tests/system/test_modules.py
Expand Up @@ -183,3 +183,49 @@ def search_objects():
assert len(objects) == 1
o = objects[0]
assert o["x-pipeline"] == "test-pipeline"

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_machine_learning_nginx(self):
"""
Tests that setup works and loads nginx dashboards.
"""
self.init()

# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template="filebeat_modules.yml.j2",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)

os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/nginx.log"],
source_dir="../files",
target_dir="log")

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*", "-once",
"-c", cfgfile,
"-setup", "-modules=nginx",
"-E", "dashboards.directory=../../_meta/kibana",
"-M", "*.*.prospector.close_eof=true",
"-M", "nginx.error.enabled=false",
"-M", "nginx.access.var.paths=[{}/log/nginx.log]".format(self.working_dir)]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=subprocess.STDOUT,
bufsize=0).wait()

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])
8 changes: 1 addition & 7 deletions filebeat/tests/system/test_prospector.py
Expand Up @@ -276,14 +276,8 @@ def test_shutdown_no_prospectors(self):

filebeat = self.start_beat()

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"No prospectors defined"),
max_timeout=10)

self.wait_until(
lambda: self.log_contains("No prospectors defined"),
lambda: self.log_contains("No modules or prospectors enabled"),
max_timeout=10)

filebeat.check_wait(exit_code=1)
Expand Down
13 changes: 13 additions & 0 deletions libbeat/beat/beat.go
Expand Up @@ -84,6 +84,10 @@ type Beater interface {
// the beat its run-loop.
type Creator func(*Beat, *common.Config) (Beater, error)

// SetupMLCallback can be used by the Beat to register MachineLearning configurations
// for the enabled modules.
type SetupMLCallback func(*Beat) error

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Expand All @@ -93,6 +97,9 @@ type Beat struct {
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher

SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called
}

// BeatConfig struct contains the basic configuration of every beat
Expand Down Expand Up @@ -217,6 +224,12 @@ func (b *Beat) launch(bt Creator) error {
if err != nil {
return err
}
if b.SetupMLCallback != nil && *setup {
err = b.SetupMLCallback(b)
if err != nil {
return err
}
}

logp.Info("%s start running.", b.Name)
defer logp.Info("%s stopped.", b.Name)
Expand Down