Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Logstash output #13345

Merged
merged 6 commits into from Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 21 additions & 3 deletions libbeat/cfgfile/cfgfile.go
Expand Up @@ -61,6 +61,16 @@ func init() {
makePathFlag("path.logs", "Logs path")
}

// OverrideChecker checks if a config should be overwritten.
type OverrideChecker func(*common.Config) bool

// ConditionalOverride stores a config which needs to overwrite the existing config based on the
// result of the Check.
type ConditionalOverride struct {
Check OverrideChecker
Config *common.Config
}

// ChangeDefaultCfgfileFlag replaces the value and default value for the `-c`
// flag so that it reflects the beat name.
func ChangeDefaultCfgfileFlag(beatName string) error {
Expand Down Expand Up @@ -122,7 +132,7 @@ func Read(out interface{}, path string) error {
// Load reads the configuration from a YAML file structure. If path is empty
// this method reads from the configuration file specified by the '-c' command
// line flag.
func Load(path string, beatOverrides *common.Config) (*common.Config, error) {
func Load(path string, beatOverrides []ConditionalOverride) (*common.Config, error) {
var config *common.Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge BeatsOverrides with conditionalsOverrides, the only difference between the two is that one of them should be applied all the time. No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after taking a walk I realized that they could be in the same list. :D

var err error

Expand All @@ -149,9 +159,17 @@ func Load(path string, beatOverrides *common.Config) (*common.Config, error) {
}

if beatOverrides != nil {
merged := defaults
for _, o := range beatOverrides {
if o.Check(config) {
merged, err = common.MergeConfigs(merged, o.Config)
if err != nil {
return nil, err
}
}
}
config, err = common.MergeConfigs(
defaults,
beatOverrides,
merged,
config,
overwrites,
)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/settings.go
Expand Up @@ -20,7 +20,7 @@ package instance
import (
"github.com/spf13/pflag"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/monitoring/report"
Expand All @@ -34,7 +34,7 @@ type Settings struct {
Version string
Monitoring report.Settings
RunFlags *pflag.FlagSet
ConfigOverrides *common.Config
ConfigOverrides []cfgfile.ConditionalOverride

DisableConfigResolver bool

Expand Down
64 changes: 59 additions & 5 deletions x-pack/functionbeat/config/config.go
Expand Up @@ -11,26 +11,60 @@ import (
"fmt"
"regexp"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
)

// ConfigOverrides overrides the defaults provided by libbeat.
var (
functionPattern = "^[A-Za-z][A-Za-z0-9\\-]{0,139}$"
functionRE = regexp.MustCompile(functionPattern)
ConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{
configOverrides = common.MustNewConfigFrom(map[string]interface{}{
"path.data": "/tmp",
"path.logs": "/tmp/logs",
"logging.to_stderr": true,
"logging.to_files": false,
"setup.template.enabled": true,
"queue.mem": map[string]interface{}{
"events": "${output.elasticsearch.bulk_max_size}",
"flush.min_events": 10,
"flush.timeout": "0.01s",
},
})
functionLoggingOverrides = common.MustNewConfigFrom(map[string]interface{}{
"logging.to_stderr": true,
"logging.to_files": false,
})
esOverrides = common.MustNewConfigFrom(map[string]interface{}{
"queue.mem.events": "${output.elasticsearch.bulk_max_size}",
"output.elasticsearch.bulk_max_size": 50,
})
logstashOverrides = common.MustNewConfigFrom(map[string]interface{}{
"queue.mem.events": "${output.logstash.bulk_max_size}",
"output.logstash": map[string]interface{}{
"bulk_max_size": 50,
"pipelining": 0,
},
})

// Overrides overrides the default configuration provided by libbeat.
Overrides = []cfgfile.ConditionalOverride{
cfgfile.ConditionalOverride{
Check: always,
Config: configOverrides,
},
cfgfile.ConditionalOverride{
Check: isLogstash,
Config: logstashOverrides,
},
cfgfile.ConditionalOverride{
Check: isElasticsearch,
Config: esOverrides,
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat I haven't thought about doing it this way.


functionOverride = cfgfile.ConditionalOverride{
Check: always,
Config: functionLoggingOverrides,
}
// FunctionOverrides contain logging settings
FunctionOverrides = append(Overrides, functionOverride)
)

// Config default configuration for Functionbeat.
Expand Down Expand Up @@ -58,6 +92,26 @@ var DefaultFunctionConfig = FunctionConfig{
Enabled: true,
}

var always = func(_ *common.Config) bool {
return true
}

var isLogstash = func(cfg *common.Config) bool {
return isOutput(cfg, "logstash")
}

var isElasticsearch = func(cfg *common.Config) bool {
return isOutput(cfg, "elasticsearch")
}

func isOutput(cfg *common.Config, name string) bool {
outputCfg, err := cfg.Child("output", -1)
if err != nil {
return false
}
return outputCfg.HasField(name)
}

type functionName string

func (f *functionName) Unpack(s string) error {
Expand Down
53 changes: 23 additions & 30 deletions x-pack/functionbeat/function/beater/functionbeat.go
Expand Up @@ -11,21 +11,22 @@ import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/x-pack/functionbeat/config"
"github.com/elastic/beats/x-pack/functionbeat/function/core"
"github.com/elastic/beats/x-pack/functionbeat/function/provider"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

var (
graceDelay = 45 * time.Minute
refreshDelay = 15 * time.Minute
graceDelay = 45 * time.Minute
refreshDelay = 15 * time.Minute
supportedOutputs = []string{
"elasticsearch",
"logstash",
}
)

// Functionbeat is a beat designed to run under a serverless environment and listen to external triggers,
Expand Down Expand Up @@ -87,22 +88,16 @@ func getProvider(cfg *common.Config) (provider.Provider, error) {
// Run starts functionbeat.
func (bt *Functionbeat) Run(b *beat.Beat) error {
defer bt.cancel()
bt.log.Info("Functionbeat is running")
defer bt.log.Info("Functionbeat stopped running")

manager, err := licenser.Create(&b.Config.Output, refreshDelay, graceDelay)
if err != nil {
return errors.Wrap(err, "could not create the license manager")
outputName := b.Config.Output.Name()
if !isOutputSupported(outputName) {
return fmt.Errorf("unsupported output type: %s; supported ones: %+v", outputName, supportedOutputs)
}
manager.Start()
defer manager.Stop()

// Wait until we receive the initial license.
if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, licenser.BasicAndAboveOrTrial); err != nil {
return err
}
bt.log.Info("Functionbeat is running")
defer bt.log.Info("Functionbeat stopped running")

clientFactory := makeClientFactory(bt.log, manager, b.Publisher)
clientFactory := makeClientFactory(bt.log, b.Publisher)

enabledFunctions := bt.enabledFunctions()
bt.log.Infof("Functionbeat is configuring enabled functions: %s", strings.Join(enabledFunctions, ", "))
Expand Down Expand Up @@ -140,7 +135,16 @@ func (bt *Functionbeat) Stop() {
bt.cancel()
}

func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline beat.Pipeline) func(*common.Config) (core.Client, error) {
func isOutputSupported(name string) bool {
for _, output := range supportedOutputs {
if name == output {
return true
}
}
return false
}

func makeClientFactory(log *logp.Logger, pipeline beat.Pipeline) func(*common.Config) (core.Client, error) {
// Each function has his own client to the publisher pipeline,
// publish operation will block the calling thread, when the method unwrap we have received the
// ACK for the batch.
Expand All @@ -167,17 +171,6 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea
},
})

if err != nil {
return nil, err
}

// Make the client aware of the current license, the client will accept sending events to the
// pipeline until the client is closed or if the license change and is not valid.
licenseAware := core.NewLicenseAwareClient(client, licenser.BasicAndAboveOrTrial)
if err := manager.AddWatcher(licenseAware); err != nil {
return nil, err
}

return licenseAware, nil
return client, err
}
}
2 changes: 1 addition & 1 deletion x-pack/functionbeat/function/cmd/root.go
Expand Up @@ -28,7 +28,7 @@ func NewFunctionCmd(name string, beatCreator beat.Creator) *FunctionCmd {
settings := instance.Settings{
Name: name,
IndexPrefix: name,
ConfigOverrides: config.ConfigOverrides,
ConfigOverrides: config.FunctionOverrides,
}

err := cfgfile.ChangeDefaultCfgfileFlag(settings.Name)
Expand Down
73 changes: 0 additions & 73 deletions x-pack/functionbeat/function/core/license_client.go

This file was deleted.