Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Add publish pipeline timeout to run_once #35721

Merged
merged 21 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 37 additions & 22 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// Heartbeat represents the root datastructure of this beat.
Expand Down Expand Up @@ -117,17 +116,8 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {

sched := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce)

pipelineClientFactory := func(p beat.Pipeline) (pipeline.ISyncClient, error) {
if parsedConfig.RunOnce {
client, err := pipeline.NewSyncClient(logp.L(), p, beat.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("could not create pipeline sync client for run_once: %w", err)
}
return client, nil
} else {
client, err := p.Connect()
return monitors.SyncPipelineClientAdaptor{C: client}, err
}
pipelineClientFactory := func(p beat.Pipeline) (beat.Client, error) {
return p.Connect()
}

bt := &Heartbeat{
Expand Down Expand Up @@ -160,23 +150,34 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.trace.Start()
defer bt.trace.Close()

// Adapt local pipeline to synchronized mode if run_once is enabled,
// Otherwise, pipeline is unchange
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
pipeline := b.Publisher
var pipelineWrapper monitors.PipelineWrapper = &monitors.NoopPipelineWrapper{}
if bt.config.RunOnce {
sync := &monitors.SyncPipelineWrapper{}

pipeline = monitors.WithSyncPipelineWrapper(pipeline, sync)
pipelineWrapper = sync
}

logp.L().Info("heartbeat is running! Hit CTRL-C to stop it.")
groups, _ := syscall.Getgroups()
logp.L().Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

waitMonitors := monitors.NewSignalWait()

// It is important this appear before we check for run once mode
// In run once mode we depend on these monitors being loaded, but not other more
// dynamic types.
stopStaticMonitors, err := bt.RunStaticMonitors(b)
stopStaticMonitors, err := bt.RunStaticMonitors(b, pipeline)
if err != nil {
return err
}
defer stopStaticMonitors()

if bt.config.RunOnce {
bt.scheduler.WaitForRunOnce()
logp.L().Info("Ending run_once run")
return nil
waitMonitors.Add(monitors.WithLog(bt.scheduler.WaitForRunOnce, "Ending run_once run."))
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
}

if b.Manager.Enabled() {
Expand Down Expand Up @@ -211,20 +212,34 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

defer bt.scheduler.Stop()

<-bt.done
// Wait until run_once ends or bt is being shutdown
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
waitMonitors.AddChan(bt.done)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember if we ever waited for Kill signal from the OS or any interrupts? Not blocking the PR, just want to understand if we are handling anything differently here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, any interrupt is handled by libbeat which in turn triggers heartbeat.Stop() and finally <-bt.done. This is just replicating the same condition using the wait signal approach and listening both for interrupts OR run once finished event, if in run once mode.

waitMonitors.Wait()

if err != nil {
logp.L().Errorf("could not write trace stop event: %s", err)
logp.L().Info("Shutting down, waiting for output to complete")

// Due to defer's LIFO execution order, waitPublished.Wait() has to be
// located _after_ b.Manager.Stop() or else it will exit early
waitPublished := monitors.NewSignalWait()
defer waitPublished.Wait()

// Three possible events: global beat, run_once pipeline done and publish timeout
waitPublished.AddChan(bt.done)
waitPublished.Add(monitors.WithLog(pipelineWrapper.Wait, "shutdown: finished publishing events."))
if bt.config.PublishTimeout > 0 {
logp.Info("shutdown: output timer started. Waiting for max %v.", bt.config.PublishTimeout)
waitPublished.Add(monitors.WithLog(monitors.WaitDuration(bt.config.PublishTimeout),
"shutdown: time out waiting for pipeline to publish events."))
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
}
logp.L().Info("Shutting down.")

return nil
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat, pipeline beat.Pipeline) (stop func(), err error) {
runners := make([]cfgfile.Runner, 0, len(bt.config.Monitors))
for _, cfg := range bt.config.Monitors {
created, err := bt.monitorFactory.Create(b.Publisher, cfg)
created, err := bt.monitorFactory.Create(pipeline, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.L().Info("skipping disabled monitor: %s", err)
Expand Down
1 change: 1 addition & 0 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type LocationWithID struct {
// Config defines the structure of heartbeat.yml.
type Config struct {
RunOnce bool `config:"run_once"`
PublishTimeout time.Duration `config:"publish_timeout"`
vigneshshanmugam marked this conversation as resolved.
Show resolved Hide resolved
Monitors []*conf.C `config:"monitors"`
ConfigMonitors *conf.C `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/processors/util"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

Expand All @@ -56,7 +55,7 @@ type RunnerFactory struct {
beatLocation *config.LocationWithID
}

type PipelineClientFactory func(pipeline beat.Pipeline) (pipeline.ISyncClient, error)
type PipelineClientFactory func(pipeline beat.Pipeline) (beat.Client, error)

type publishSettings struct {
// Fields and tags to add to monitor.
Expand Down
5 changes: 4 additions & 1 deletion heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ func TestDuplicateMonitorIDs(t *testing.T) {
}
}

c, err := mockPipeline.Connect()
require.NoError(t, err)

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, mockPipeline.ConnectSync(), sched.Add, nil, nil)
_, m0Err := newMonitor(badConf, reg, c, sched.Add, nil, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
Expand Down
12 changes: 2 additions & 10 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
beatversion "github.com/elastic/beats/v7/libbeat/version"
)

Expand Down Expand Up @@ -80,9 +79,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch
AddTask: sched.Add,
StateLoader: monitorstate.NilStateLoader,
PluginsReg: pluginsReg,
PipelineClientFactory: func(pipeline beat.Pipeline) (pipeline.ISyncClient, error) {
c, _ := pipeline.Connect()
return SyncPipelineClientAdaptor{C: c}, nil
PipelineClientFactory: func(pipeline beat.Pipeline) (beat.Client, error) {
return pipeline.Connect()
},
}),
sched,
Expand Down Expand Up @@ -164,12 +162,6 @@ func (pc *MockPipeline) ConnectWith(cc beat.ClientConfig) (beat.Client, error) {
return c, nil
}

// Convenience function for tests
func (pc *MockPipeline) ConnectSync() pipeline.ISyncClient {
c, _ := pc.Connect()
return SyncPipelineClientAdaptor{C: c}
}

func (pc *MockPipeline) PublishedEvents() []*beat.Event {
pc.mtx.Lock()
defer pc.mtx.Unlock()
Expand Down
11 changes: 5 additions & 6 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"

"github.com/mitchellh/hashstructure"

Expand Down Expand Up @@ -63,9 +62,9 @@ type Monitor struct {
internalsMtx sync.Mutex
close func() error

// pubClient accepts an ISyncClient as the lowest common denominator of client
// since async clients are a subset of sync clients
pubClient pipeline.ISyncClient
// pubClient accepts a generic beat.Client. Pipeline synchronicity is implemented
// at client wrapper-level
pubClient beat.Client

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
Expand All @@ -89,7 +88,7 @@ func checkMonitorConfig(config *conf.C, registrar *plugin.PluginsReg) error {
func newMonitor(
config *conf.C,
registrar *plugin.PluginsReg,
pubClient pipeline.ISyncClient,
pubClient beat.Client,
taskAdder scheduler.AddTask,
stateLoader monitorstate.StateLoader,
onStop func(*Monitor),
Expand All @@ -106,7 +105,7 @@ func newMonitor(
func newMonitorUnsafe(
config *conf.C,
registrar *plugin.PluginsReg,
pubClient pipeline.ISyncClient,
pubClient beat.Client,
addTask scheduler.AddTask,
stateLoader monitorstate.StateLoader,
onStop func(*Monitor),
Expand Down
8 changes: 6 additions & 2 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func testMonitorConfig(t *testing.T, conf *conf.C, eventValidator validator.Vali
sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

mon, err := newMonitor(conf, reg, pipel.ConnectSync(), sched.Add, nil, nil)
c, err := pipel.Connect()
require.NoError(t, err)
mon, err := newMonitor(conf, reg, c, sched.Add, nil, nil)
require.NoError(t, err)

mon.Start()
Expand Down Expand Up @@ -116,7 +118,9 @@ func TestCheckInvalidConfig(t *testing.T) {
sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

m, err := newMonitor(serverMonConf, reg, pipel.ConnectSync(), sched.Add, nil, nil)
c, err := pipel.Connect()
require.NoError(t, err)
m, err := newMonitor(serverMonConf, reg, c, sched.Add, nil, nil)
require.Error(t, err)
// This could change if we decide the contract for newMonitor should always return a monitor
require.Nil(t, m, "For this test to work we need a nil value for the monitor.")
Expand Down
91 changes: 91 additions & 0 deletions heartbeat/monitors/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitors

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/logp"
)

// Defines a synchronous pipeline wrapper interface
type PipelineWrapper interface {
Wait()
}

type NoopPipelineWrapper struct {
}

// Noop
func (n *NoopPipelineWrapper) Wait() {
}

// Pipeline wrapper that implements synchronous op. Calling Wait() on this client will block until all
// events passed through this pipeline (and any of the linked clients) are ACKed, safe to use concurrently.
type SyncPipelineWrapper struct {
wg sync.WaitGroup
}

// Used to wrap every client and track emmitted vs acked events.
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
type wrappedClient struct {
wg *sync.WaitGroup
client beat.Client
}

// returns a new pipeline with the provided SyncPipelineClientWrapper.
func WithSyncPipelineWrapper(pipeline beat.Pipeline, pw *SyncPipelineWrapper) beat.Pipeline {
pipeline = pipetool.WithACKer(pipeline, acker.TrackingCounter(func(_, total int) {
logp.L().Debugf("ack callback receives with events count of %d", total)
pw.onACK(total)
}))

pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
return &wrappedClient{
wg: &pw.wg,
client: client,
}
})

return pipeline
}

func (c *wrappedClient) Publish(event beat.Event) {
c.wg.Add(1)
c.client.Publish(event)
}

func (c *wrappedClient) PublishAll(events []beat.Event) {
c.wg.Add(len(events))
c.client.PublishAll(events)
}

func (c *wrappedClient) Close() error {
return c.client.Close()
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved
}

// waits until ACK is received for every event that was sent
func (s *SyncPipelineWrapper) Wait() {
s.wg.Wait()
}

func (s *SyncPipelineWrapper) onACK(n int) {
s.wg.Add(-1 * n)
}