Permalink
Browse files

Heartbeat autodiscover (#8415)

Autodiscover support for Heartbeat + tests
  • Loading branch information...
andrewvc committed Oct 5, 2018
1 parent 694e010 commit cff3e40cfedb9521d7249caaa569668de203dc59
@@ -21,6 +21,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
*Heartbeat*
- Added autodiscovery support {pull}8415[8415]
*Metricbeat*
*Packetbeat*
@@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
@@ -39,6 +40,8 @@ type Heartbeat struct {
config config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
dynamicFactory *monitors.RunnerFactory
autodiscover *autodiscover.Autodiscover
}
// New creates a new heartbeat.
@@ -64,6 +67,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
done: make(chan struct{}),
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(scheduler, false),
}
return bt, nil
}
@@ -81,12 +86,22 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()
err := bt.RunDynamicMonitors(b)
err := bt.RunReloadableMonitors(b)
if err != nil {
return err
}
}
if bt.config.Autodiscover != nil {
bt.autodiscover, err = bt.makeAutodiscover(b)
if err != nil {
return err
}
bt.autodiscover.Start()
defer bt.autodiscover.Stop()
}
if err := bt.scheduler.Start(); err != nil {
return err
}
@@ -112,21 +127,31 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
return nil
}
// RunDynamicMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunDynamicMonitors(b *beat.Beat) (err error) {
factory := monitors.NewFactory(bt.scheduler, false)
// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(factory); err != nil {
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
return err
}
// Execute the monitor
go bt.monitorReloader.Run(factory)
go bt.monitorReloader.Run(bt.dynamicFactory)
return nil
}
// makeAutodiscover creates an autodiscover object ready to be started.
func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, error) {
adapter := autodiscover.NewFactoryAdapter(bt.dynamicFactory)
ad, err := autodiscover.NewAutodiscover("heartbeat", b.Publisher, adapter, bt.config.Autodiscover)
if err != nil {
return nil, err
}
return ad, nil
}
// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
@@ -21,15 +21,17 @@
package config
import (
"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/common"
)
// Config defines the structure of heartbeat.yml.
type Config struct {
// Modules is a list of module specific configuration data.
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
}
// Scheduler defines the syntax of a heartbeat.yml scheduler block.
@@ -1,3 +1,5 @@
logging.level: debug
heartbeat.monitors:
{% for monitor in monitors -%}
- type: {{ monitor.type }}
@@ -40,6 +42,19 @@ heartbeat.config.monitors:
reload.enabled: {{ reload|default("false")}}
{% endif -%}
{% if autodiscover %}
heartbeat.autodiscover:
providers:
{%- for provider, settings in autodiscover.items() %}
- type: {{provider}}
{%- if settings %}
{%- for k, v in settings.items() %}
{{k}}: {{v | default([])}}
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
{%- if shipper_name %}
name: {{ shipper_name }}
{% endif %}
@@ -0,0 +1,64 @@
import os
from heartbeat import BaseTest
import unittest
import re
from beat.beat import INTEGRATION_TESTS
class TestAutodiscover(BaseTest):
"""
Test heartbeat autodiscover
"""
@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_docker(self):
"""
Test docker autodiscover starts modules from templates
"""
import docker
docker_client = docker.from_env()
self.render_config_template(
autodiscover={
'docker': {
'templates': '''
- condition:
contains.docker.container.image: redis
config:
- type: tcp
hosts: ["${data.host}:${data.port}"]
schedule: "@every 1s"
timeout: 1s
''',
},
},
)
proc = self.start_beat()
self.wait_until(lambda: self.log_contains(
re.compile('autodiscover.+Got a start event:', re.I)))
self.wait_until(lambda: self.output_count(lambda x: x >= 1))
output = self.read_output_json()
proc.check_kill_and_wait()
matched = False
matcher = re.compile("redis", re.I)
for i, container in enumerate(docker_client.containers.list()):
for tag in container.image.tags:
if matcher.search(tag):
network_settings = container.attrs['NetworkSettings']
host = network_settings['Networks'].values()[
0]['IPAddress']
port = network_settings['Ports'].keys()[0].split("/")[0]
# Check metadata is added
expected = 'tcp-tcp@%s:%s' % (host, port)
actual = output[0]['monitor']['id']
if expected == actual:
matched = True
assert matched
@@ -26,20 +26,20 @@ import (
"github.com/elastic/beats/libbeat/common/bus"
)
// AutodiscoverAdapter for Metricbeat modules
type AutodiscoverAdapter struct {
// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory.
type FactoryAdapter struct {
factory cfgfile.RunnerFactory
}
// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Metricbeat modules
func NewAutodiscoverAdapter(factory cfgfile.RunnerFactory) *AutodiscoverAdapter {
return &AutodiscoverAdapter{
// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory.
func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter {
return &FactoryAdapter{
factory: factory,
}
}
// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
func (m *FactoryAdapter) CreateConfig(e bus.Event) ([]*common.Config, error) {
config, ok := e["config"].([]*common.Config)
if !ok {
return nil, errors.New("Got a wrong value in event `config` key")
@@ -48,16 +48,16 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error
}
// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
func (m *FactoryAdapter) CheckConfig(c *common.Config) error {
return m.factory.CheckConfig(c)
}
// Create a module or prospector from the given config
func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (m *FactoryAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
return m.factory.Create(p, c, meta)
}
// EventFilter returns the bus filter to retrieve runner start/stop triggering events
func (m *AutodiscoverAdapter) EventFilter() []string {
func (m *FactoryAdapter) EventFilter() []string {
return []string{"config"}
}
@@ -10,6 +10,7 @@
import time
import yaml
import hashlib
import re
from datetime import datetime, timedelta
from .compose import ComposeMixin
@@ -22,6 +23,8 @@
yaml_cache = {}
REGEXP_TYPE = type(re.compile("t"))
class TimeoutError(Exception):
pass
@@ -359,6 +362,7 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
"""
Returns the number of appearances of the given string in the log file
"""
is_regexp = type(msg) == REGEXP_TYPE
counter = 0
if ignore_case:
@@ -371,6 +375,10 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):
try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
if is_regexp:
if msg.search(line) is not None:
counter = counter + 1
continue
if ignore_case:
line = line.lower()
if line.find(msg) >= 0:
@@ -32,10 +32,12 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
mbautodiscover "github.com/elastic/beats/metricbeat/autodiscover"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"
// Add autodiscover builders / appenders
_ "github.com/elastic/beats/metricbeat/autodiscover"
// Add metricbeat default processors
_ "github.com/elastic/beats/metricbeat/processor/add_kubernetes_metadata"
)
@@ -172,7 +174,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
if config.Autodiscover != nil {
var err error
factory := module.NewFactory(metricbeat.moduleOptions...)
adapter := mbautodiscover.NewAutodiscoverAdapter(factory)
adapter := autodiscover.NewFactoryAdapter(factory)
metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover)
if err != nil {
return nil, err

0 comments on commit cff3e40

Please sign in to comment.