Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add registry for prospectors #4980

Merged
merged 2 commits into from Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/channel/interface.go
Expand Up @@ -5,8 +5,8 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// OutletFactory is used to create a new Outlet instance
type OutleterFactory func(*common.Config) (Outleter, error)
// Factory is used to create a new Outlet instance
type Factory func(*common.Config) (Outleter, error)

// Outleter is the outlet for a prospector
type Outleter interface {
Expand Down
10 changes: 6 additions & 4 deletions filebeat/crawler/crawler.go
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

_ "github.com/elastic/beats/filebeat/include"
)

type Crawler struct {
prospectors map[uint64]*prospector.Prospector
prospectorConfigs []*common.Config
out channel.OutleterFactory
out channel.Factory
wg sync.WaitGroup
modulesReloader *cfgfile.Reloader
prospectorsReloader *cfgfile.Reloader
Expand All @@ -27,7 +29,7 @@ type Crawler struct {
beatDone chan struct{}
}

func New(out channel.OutleterFactory, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) {
func New(out channel.Factory, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) {
return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
Expand Down Expand Up @@ -56,9 +58,9 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config
cfgwarn.Beta("Loading separate prospectors is enabled.")

c.prospectorsReloader = cfgfile.NewReloader(configProspectors)
prospectorsFactory := prospector.NewFactory(c.out, r, c.beatDone)
registrarContext := prospector.NewRegistrarContext(c.out, r, c.beatDone)
go func() {
c.prospectorsReloader.Run(prospectorsFactory)
c.prospectorsReloader.Run(registrarContext)
}()
}

Expand Down
12 changes: 6 additions & 6 deletions filebeat/fileset/factory.go
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/prospector"
pros "github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -14,7 +14,7 @@ import (

// Factory for modules
type Factory struct {
outlet channel.OutleterFactory
outlet channel.Factory
registrar *registrar.Registrar
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
Expand All @@ -25,12 +25,12 @@ type Factory struct {
type prospectorsRunner struct {
id uint64
moduleRegistry *ModuleRegistry
prospectors []*prospector.Prospector
prospectors []*pros.Prospector
pipelineLoaderFactory PipelineLoaderFactory
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.OutleterFactory, registrar *registrar.Registrar, beatVersion string,
func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string,
pipelineLoaderFactory PipelineLoaderFactory, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
Expand Down Expand Up @@ -62,9 +62,9 @@ func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
return nil, err
}

prospectors := make([]*prospector.Prospector, len(pConfigs))
prospectors := make([]*pros.Prospector, len(pConfigs))
for i, pConfig := range pConfigs {
prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates())
prospectors[i], err = pros.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates())
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions filebeat/include/list.go
@@ -0,0 +1,8 @@
package include

import (
_ "github.com/elastic/beats/filebeat/prospector/log"
_ "github.com/elastic/beats/filebeat/prospector/redis"
_ "github.com/elastic/beats/filebeat/prospector/stdin"
_ "github.com/elastic/beats/filebeat/prospector/udp"
)
21 changes: 14 additions & 7 deletions filebeat/prospector/log/prospector.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -28,6 +29,13 @@ var (
harvesterSkipped = monitoring.NewInt(nil, "filebeat.harvester.skipped")
)

func init() {
err := prospector.Register("log", NewProspector)
if err != nil {
panic(err)
}
}

// Prospector contains the prospector and its config
type Prospector struct {
cfg *common.Config
Expand All @@ -42,10 +50,9 @@ type Prospector struct {
// NewProspector instantiates a new Log
func NewProspector(
cfg *common.Config,
states []file.State,
outlet channel.OutleterFactory,
done, beatDone chan struct{},
) (*Prospector, error) {
outlet channel.Factory,
context prospector.Context,
) (prospector.Prospectorer, error) {

// Note: underlying output.
// The prospector and harvester do have different requirements
Expand All @@ -61,7 +68,7 @@ func NewProspector(
// stateOut will only be unblocked if the beat is shut down.
// otherwise it can block on a full publisher pipeline, so state updates
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), beatDone)
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

p := &Prospector{
config: defaultConfig,
Expand All @@ -70,7 +77,7 @@ func NewProspector(
outlet: out,
stateOutlet: stateOut,
states: &file.States{},
done: done,
done: context.Done,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand All @@ -92,7 +99,7 @@ func NewProspector(
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

err = p.loadStates(states)
err = p.loadStates(context.States)
if err != nil {
return nil, err
}
Expand Down
57 changes: 19 additions & 38 deletions filebeat/prospector/prospector.go
@@ -1,23 +1,24 @@
package prospector

import (
"fmt"
"sync"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector/log"
"github.com/elastic/beats/filebeat/prospector/redis"
"github.com/elastic/beats/filebeat/prospector/stdin"
"github.com/elastic/beats/filebeat/prospector/udp"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

// Prospectorer is the interface common to all prospectors
type Prospectorer interface {
Run()
Stop()
Wait()
}

// Prospector contains the prospector
type Prospector struct {
config prospectorConfig
Expand All @@ -29,17 +30,10 @@ type Prospector struct {
beatDone chan struct{}
}

// Prospectorer is the interface common to all prospectors
type Prospectorer interface {
Run()
Stop()
Wait()
}

// NewProspector instantiates a new prospector
func NewProspector(
conf *common.Config,
outlet channel.OutleterFactory,
outlet channel.Factory,
beatDone chan struct{},
states []file.State,
) (*Prospector, error) {
Expand All @@ -63,38 +57,25 @@ func NewProspector(
return nil, err
}

err = prospector.initProspectorer(outlet, states, conf)
var f Factory
f, err = GetFactory(prospector.config.Type)
if err != nil {
return prospector, err
}

return prospector, nil
}

func (p *Prospector) initProspectorer(outlet channel.OutleterFactory, states []file.State, config *common.Config) error {
var prospectorer Prospectorer
var err error

switch p.config.Type {
case harvester.StdinType:
prospectorer, err = stdin.NewProspector(config, outlet)
case harvester.RedisType:
prospectorer, err = redis.NewProspector(config, outlet)
case harvester.LogType:
prospectorer, err = log.NewProspector(config, states, outlet, p.done, p.beatDone)
case harvester.UdpType:
prospectorer, err = udp.NewProspector(config, outlet)
default:
return fmt.Errorf("invalid prospector type: %v. Change type", p.config.Type)
context := Context{
States: states,
Done: prospector.done,
BeatDone: prospector.beatDone,
}

var prospectorer Prospectorer
prospectorer, err = f(conf, outlet, context)
if err != nil {
return err
return prospector, err
}
prospector.prospectorer = prospectorer

p.prospectorer = prospectorer

return nil
return prospector, nil
}

// Start starts the prospector
Expand Down
10 changes: 9 additions & 1 deletion filebeat/prospector/redis/prospector.go
Expand Up @@ -8,11 +8,19 @@ import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
err := prospector.Register("redis", NewProspector)
if err != nil {
panic(err)
}
}

// Prospector is a prospector for redis
type Prospector struct {
started bool
Expand All @@ -23,7 +31,7 @@ type Prospector struct {
}

// NewProspector creates a new redis prospector
func NewProspector(cfg *common.Config, outletFactory channel.OutleterFactory) (*Prospector, error) {
func NewProspector(cfg *common.Config, outletFactory channel.Factory, context prospector.Context) (prospector.Prospectorer, error) {
cfgwarn.Experimental("Redis slowlog prospector is enabled.")

config := defaultConfig
Expand Down
Expand Up @@ -8,24 +8,24 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

// Factory is a factory for registrars
type Factory struct {
outlet channel.OutleterFactory
// RegistrarContext is a factory for registrars
type RegistrarContext struct {
outlet channel.Factory
registrar *registrar.Registrar
beatDone chan struct{}
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.OutleterFactory, registrar *registrar.Registrar, beatDone chan struct{}) *Factory {
return &Factory{
// NewRegistrarContext instantiates a new RegistrarContext
func NewRegistrarContext(outlet channel.Factory, registrar *registrar.Registrar, beatDone chan struct{}) *RegistrarContext {
return &RegistrarContext{
outlet: outlet,
registrar: registrar,
beatDone: beatDone,
}
}

// Create creates a prospector based on a config
func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
func (r *RegistrarContext) Create(c *common.Config) (cfgfile.Runner, error) {
p, err := NewProspector(c, r.outlet, r.beatDone, r.registrar.GetStates())
if err != nil {
logp.Err("Error creating prospector: %s", err)
Expand Down
45 changes: 45 additions & 0 deletions filebeat/prospector/registry.go
@@ -0,0 +1,45 @@
package prospector

import (
"fmt"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
}

type Factory func(config *common.Config, outletFactory channel.Factory, context Context) (Prospectorer, error)

var registry = make(map[string]Factory)

func Register(name string, factory Factory) error {
logp.Info("Registering prospector factory")
if name == "" {
return fmt.Errorf("Error registering prospector: name cannot be empty")
}
if factory == nil {
return fmt.Errorf("Error registering prospector '%v': factory cannot be empty", name)
}
if _, exists := registry[name]; exists {
return fmt.Errorf("Error registering prospector '%v': already registered", name)
}

registry[name] = factory
logp.Info("Succesfully registered prospector")

return nil
}

func GetFactory(name string) (Factory, error) {
if _, exists := registry[name]; !exists {
return nil, fmt.Errorf("Error retrieving factory for prospector '%v'", name)
}
return registry[name], nil
}