Skip to content

Commit

Permalink
Deregister pipeline loader callback when inputsRunner is stopped (#7893)
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch authored and adriansr committed Aug 8, 2018
1 parent 10f41be commit 65ef265
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fix potential data loss on OS X in spool file by using fcntl with F_FULLFSYNC. {pull}7859[7859]
- Improve fsync on linux, by assuming the kernel resets error flags of failed writes. {pull}7859[7859]
- Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849]
- Deregister pipeline loader callback when inputsRunner is stopped. {pull}[7893][7893]

*Auditbeat*

Expand Down
21 changes: 18 additions & 3 deletions filebeat/fileset/factory.go
Expand Up @@ -18,6 +18,8 @@
package fileset

import (
uuid "github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
input "github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand Down Expand Up @@ -46,6 +48,7 @@ type Factory struct {
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
overwritePipelines bool
pipelineCallbackID uuid.UUID
beatDone chan struct{}
}

Expand All @@ -55,6 +58,7 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
pipelineCallbackID uuid.UUID
overwritePipelines bool
}

Expand All @@ -67,6 +71,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoaderFactory: pipelineLoaderFactory,
pipelineCallbackID: uuid.Nil,
overwritePipelines: overwritePipelines,
}
}
Expand Down Expand Up @@ -107,14 +112,20 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
pipelineCallbackID: f.pipelineCallbackID,
overwritePipelines: f.overwritePipelines,
}, nil
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
// Load pipelines instantly and then setup a callback for reconnections:
// Attempt to load pipelines instantly when starting or after reload.
// Thus, if ES was not available previously, it could be loaded this time.
// If the function below fails, it means that ES is not available
// at the moment, so the pipeline loader cannot be created.
// Registering a callback regardless of the availability of ES
// makes it possible to try to load pipeline when ES becomes reachable.
pipelineLoader, err := p.pipelineLoaderFactory()
if err != nil {
logp.Err("Error loading pipeline: %s", err)
Expand All @@ -126,11 +137,11 @@ func (p *inputsRunner) Start() {
}
}

// Callback:
// Register callback to try to load pipelines when connecting to ES.
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)
p.pipelineCallbackID = elasticsearch.RegisterConnectCallback(callback)
}

for _, input := range p.inputs {
Expand All @@ -143,6 +154,10 @@ func (p *inputsRunner) Start() {
}
}
func (p *inputsRunner) Stop() {
if p.pipelineCallbackID != uuid.Nil {
elasticsearch.DeregisterConnectCallback(p.pipelineCallbackID)
}

for _, input := range p.inputs {
input.Stop()
}
Expand Down
38 changes: 34 additions & 4 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"sync"

uuid "github.com/satori/go.uuid"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
Expand Down Expand Up @@ -49,20 +51,48 @@ var (
ErrResponseRead = errors.New("bulk item status parse failed")
)

// Callbacks must not depend on the result of a previous one,
// because the ordering is not fixed.
type callbacksRegistry struct {
callbacks []connectCallback
callbacks map[uuid.UUID]connectCallback
mutex sync.Mutex
}

// XXX: it would be fantastic to do this without a package global
var connectCallbackRegistry callbacksRegistry
var connectCallbackRegistry = newCallbacksRegistry()

func newCallbacksRegistry() callbacksRegistry {
return callbacksRegistry{
callbacks: make(map[uuid.UUID]connectCallback),
}
}

// RegisterConnectCallback registers a callback for the elasticsearch output
// The callback is called each time the client connects to elasticsearch.
func RegisterConnectCallback(callback connectCallback) {
// It returns the key of the newly added callback, so it can be deregistered later.
func RegisterConnectCallback(callback connectCallback) uuid.UUID {
connectCallbackRegistry.mutex.Lock()
defer connectCallbackRegistry.mutex.Unlock()

// find the next unique key
var key uuid.UUID
exists := true
for exists {
key = uuid.NewV4()
_, exists = connectCallbackRegistry.callbacks[key]
}

connectCallbackRegistry.callbacks[key] = callback
return key
}

// DeregisterConnectCallback deregisters a callback for the elasticsearch output
// specified by its key. If a callback does not exist, nothing happens.
func DeregisterConnectCallback(key uuid.UUID) {
connectCallbackRegistry.mutex.Lock()
defer connectCallbackRegistry.mutex.Unlock()
connectCallbackRegistry.callbacks = append(connectCallbackRegistry.callbacks, callback)

delete(connectCallbackRegistry.callbacks, key)
}

func makeES(
Expand Down
39 changes: 39 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
@@ -0,0 +1,39 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package elasticsearch

import (
"fmt"
"testing"
)

func TestConnectCallbacksManagement(t *testing.T) {
f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil }
f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil }
f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil }

_ = RegisterConnectCallback(f0)
id1 := RegisterConnectCallback(f1)
id2 := RegisterConnectCallback(f2)

t.Logf("removing second callback")
DeregisterConnectCallback(id1)
if _, ok := connectCallbackRegistry.callbacks[id2]; !ok {
t.Fatalf("third callback cannot be retrieved")
}
}

0 comments on commit 65ef265

Please sign in to comment.