Allow filebeat to only run once #2456

Merged
merged 2 commits into from Sep 16, 2016

Conversation

Projects
None yet
2 participants
@ruflin
Collaborator

ruflin commented Sep 5, 2016

When the -once flag is used, filebeat starts all configured harvesters and prospectors and runs each prospector until the harvesters are closed. It is recommended to use the flag in combination with close_eof so harvester directly close when the end of the file is reached. By default harvesters are closed after close_inactive.

Closes #880

filebeat/beater/filebeat.go
+ if *once {
+ logp.Debug("filebeat", "Running filebeat once. Waiting for completion ...")
+ spooler.Flush()
+ publisher.Publish()

This comment has been minimized.

@ruflin

ruflin Sep 5, 2016

Collaborator

@urso What I'm trying to do here is to make sure that all events which are in the queue are sent. Is there a better way to wait / check the publisher if all events were sent?

@ruflin

ruflin Sep 5, 2016

Collaborator

@urso What I'm trying to do here is to make sure that all events which are in the queue are sent. Is there a better way to wait / check the publisher if all events were sent?

ruflin added a commit to ruflin/beats that referenced this pull request Sep 5, 2016

Cleanup Filebeat publishers
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see elastic#2456)
* Add getDataEvents function

@ruflin ruflin referenced this pull request Sep 5, 2016

Merged

Cleanup Filebeat publishers #2463

ruflin added a commit to ruflin/beats that referenced this pull request Sep 5, 2016

Cleanup Filebeat publishers
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see elastic#2456)
* Add getDataEvents function
* Rename publish to publisher package

ruflin added a commit to ruflin/beats that referenced this pull request Sep 5, 2016

Add uptime value to metric reporting
This metric can be useful in combination with elastic#2456 to track how long the harvesting of a file took.

urso added a commit that referenced this pull request Sep 5, 2016

Cleanup Filebeat publishers (#2463)
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see #2456)
* Add getDataEvents function
* Rename publish to publisher package

ruflin added a commit to ruflin/beats that referenced this pull request Sep 5, 2016

Add uptime value to metric reporting
This metric can be useful in combination with elastic#2456 to track how long the harvesting of a file took. Uptime is reported before shutdown.

tsg added a commit that referenced this pull request Sep 6, 2016

Add uptime value to metric reporting (#2464)
This metric can be useful in combination with #2456 to track how long the harvesting of a file took. Uptime is reported before shutdown.

@ruflin ruflin removed the in progress label Sep 9, 2016

filebeat/beater/filebeat.go
+var once *bool
+
+func init() {
+ once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

can be initialized directly at line 19.

@urso

urso Sep 9, 2016

Collaborator

can be initialized directly at line 19.

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

changed

@ruflin

ruflin Sep 9, 2016

Collaborator

changed

@@ -28,6 +28,7 @@ type Prospector struct {
done chan struct{}
states *file.States
wg sync.WaitGroup
+ channelWg sync.WaitGroup // Separate waitgroup for channels as not stopped on completion

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

I don't understand the necessary for channelWg. why not reuse wg?

@urso

urso Sep 9, 2016

Collaborator

I don't understand the necessary for channelWg. why not reuse wg?

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

channelWg is needed, because when only run once, the prospector does the wait() for the first run to be completed. When the prospector Run returns, channel are not shutdown yet and shouldn't because events are still processed. This only happens when prospector is closed with done.

I see that a similar pattern could apply here that we did with the other channels, but we can do this in a follow up PR.

@ruflin

ruflin Sep 9, 2016

Collaborator

channelWg is needed, because when only run once, the prospector does the wait() for the first run to be completed. When the prospector Run returns, channel are not shutdown yet and shouldn't because events are still processed. This only happens when prospector is closed with done.

I see that a similar pattern could apply here that we did with the other channels, but we can do this in a follow up PR.

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

TBH I still not fully understand why this is required. But resource management just became somewhat more complex :(

What bothers me is adding many async/sync code-blocks(utils) trying to manage something adding more and more complexity.

@urso

urso Sep 9, 2016

Collaborator

TBH I still not fully understand why this is required. But resource management just became somewhat more complex :(

What bothers me is adding many async/sync code-blocks(utils) trying to manage something adding more and more complexity.

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

Yes, complexity currently increases which is not good. I still hope as soon as we have the feature working as we want that we can reduce the complexity again through refactoring.

@ruflin

ruflin Sep 9, 2016

Collaborator

Yes, complexity currently increases which is not good. I still hope as soon as we have the feature working as we want that we can reduce the complexity again through refactoring.

filebeat/spooler/spooler.go
+ s.flushMutex.Lock()
+ defer s.flushMutex.Unlock()
+
+ count := len(s.spool)

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

if count == 0 { return 0 }

@urso

urso Sep 9, 2016

Collaborator

if count == 0 { return 0 }

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

done, makes the code nicer.

@ruflin

ruflin Sep 9, 2016

Collaborator

done, makes the code nicer.

filebeat/beater/filebeat.go
+
+ // Flushes spooler as often as needed to send all events to publisher
+ for spooler.Flush() > 0 {
+ }

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

we should a a defer Flush to the spooler. The spooler was changed to drain the receive channel until empty, but final flush on shutdown is missing.

@urso

urso Sep 9, 2016

Collaborator

we should a a defer Flush to the spooler. The spooler was changed to drain the receive channel until empty, but final flush on shutdown is missing.

filebeat/beater/filebeat.go
+
+ // Wait for registrar to finish writing registry
+ finishedWg.Wait()
+ logp.Info("All data collection completed. Shutting down.")

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

I wonder if finishedWG.Wait() is better done via defer statement by modifying the spooler shutdown order in here to:

if *once {
    defer publisherChan.Close() // close publisher channel (optional)
    defer finishedWG.Wait() // wait for all events processed by registrar
    defer spooler.Stop() // stop + flush spooler
} else {
    defer spooler.Stop() // stop spooler
    defer publisherChan.Close() // close publisher channel potentially unblocking the spooler
}
@urso

urso Sep 9, 2016

Collaborator

I wonder if finishedWG.Wait() is better done via defer statement by modifying the spooler shutdown order in here to:

if *once {
    defer publisherChan.Close() // close publisher channel (optional)
    defer finishedWG.Wait() // wait for all events processed by registrar
    defer spooler.Stop() // stop + flush spooler
} else {
    defer spooler.Stop() // stop spooler
    defer publisherChan.Close() // close publisher channel potentially unblocking the spooler
}

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

advantage of this is that spooler.Stop is still called only once. We must make sure that CTRL-C still works also if run with once.

@ruflin

ruflin Sep 9, 2016

Collaborator

advantage of this is that spooler.Stop is still called only once. We must make sure that CTRL-C still works also if run with once.

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

this code is already run on shutdown after Kill signal being received (the crawler is closed before being finished). So what todo here? wait for completion of require CTRL-C to kill the process as is instead of waiting for all data being published. Wonder if introducing stop timeout would mitigate this issue.

@urso

urso Sep 9, 2016

Collaborator

this code is already run on shutdown after Kill signal being received (the crawler is closed before being finished). So what todo here? wait for completion of require CTRL-C to kill the process as is instead of waiting for all data being published. Wonder if introducing stop timeout would mitigate this issue.

filebeat/beater/filebeat.go
+ if *once {
+ go func() {
+ logp.Info("Running filebeat once. Waiting for completion ...")
+ crawler.WaitForCompletion()

This comment has been minimized.

@urso

urso Sep 9, 2016

Collaborator

go-routine not really required. Just wait for completion and call fb.Stop (or close fb.done). filebeat will continue with proper shutdown

@urso

urso Sep 9, 2016

Collaborator

go-routine not really required. Just wait for completion and call fb.Stop (or close fb.done). filebeat will continue with proper shutdown

This comment has been minimized.

@ruflin

ruflin Sep 9, 2016

Collaborator

If there is no go routing CTRL-C Shutdown will not work anymore when running with once

@ruflin

ruflin Sep 9, 2016

Collaborator

If there is no go routing CTRL-C Shutdown will not work anymore when running with once

filebeat/beater/filebeat.go
- wgEvents = &sync.WaitGroup{}
- finishedLogger = newFinishedLogger(wgEvents)
- }
+ //if fb.config.ShutdownTimeout > 0 || *once {

This comment has been minimized.

@ruflin

ruflin Sep 13, 2016

Collaborator

@urso do you see an issue with this? As long as we don't use wgEvents nothing will happen.

@ruflin

ruflin Sep 13, 2016

Collaborator

@urso do you see an issue with this? As long as we don't use wgEvents nothing will happen.

This comment has been minimized.

@urso

urso Sep 13, 2016

Collaborator

it's correct. as long as wgEvent.Wait is not installed in sigWait there is no difference in initializing these always. I just didn't want to have all the wgEvent update by default, but overhead should be ok.

@urso

urso Sep 13, 2016

Collaborator

it's correct. as long as wgEvent.Wait is not installed in sigWait there is no difference in initializing these always. I just didn't want to have all the wgEvent update by default, but overhead should be ok.

This comment has been minimized.

@ruflin

ruflin Sep 13, 2016

Collaborator

Will leave it in as default for the moment and we can still make it optional in case it becomes a performance issue (I would not assume so)

@ruflin

ruflin Sep 13, 2016

Collaborator

Will leave it in as default for the moment and we can still make it optional in case it becomes a performance issue (I would not assume so)

- defer publisher.Stop()
- defer successLogger.Close()
+ defer func() {
+ // Closes first the registrar logger to make sure not more events arrive at the registrar

This comment has been minimized.

@urso

urso Sep 13, 2016

Collaborator

maybe add comment: registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher.

@urso

urso Sep 13, 2016

Collaborator

maybe add comment: registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher.

This comment has been minimized.

@ruflin

ruflin Sep 13, 2016

Collaborator

Added

@ruflin

ruflin Sep 13, 2016

Collaborator

Added

filebeat/beater/filebeat.go
@@ -108,22 +113,42 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
// With harvesters being stopped, optionally wait for all enqueued events being
// published and written by registrar before continuing shutdown.
- fb.sigWait.Wait()
+ if !*once {

This comment has been minimized.

@urso

urso Sep 13, 2016

Collaborator

no need to check *once flag here. Wait will return immediately if no signalers have been added.

@urso

urso Sep 13, 2016

Collaborator

no need to check *once flag here. Wait will return immediately if no signalers have been added.

This comment has been minimized.

@ruflin

ruflin Sep 13, 2016

Collaborator

removed, was a leftover from previous changes.

@ruflin

ruflin Sep 13, 2016

Collaborator

removed, was a leftover from previous changes.

filebeat/beater/filebeat.go
+ crawler.WaitForCompletion()
+
+ // Wait for registrar to finish writing registry
+ wgEvents.Wait()

This comment has been minimized.

@urso

urso Sep 13, 2016

Collaborator

don't wait here. just add wgEvents.Wait to sigWait after waiting for shutdown:

...
waitFinished.Wait()

waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
   waitEvents.Add(wgEvents.Wait)
   if fb.config.ShutdownTimeout > 0 {
    waitEvents.AddTimeout(fb.config.ShutdownTimeout)
  } 
}

@urso

urso Sep 13, 2016

Collaborator

don't wait here. just add wgEvents.Wait to sigWait after waiting for shutdown:

...
waitFinished.Wait()

waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
   waitEvents.Add(wgEvents.Wait)
   if fb.config.ShutdownTimeout > 0 {
    waitEvents.AddTimeout(fb.config.ShutdownTimeout)
  } 
}

This comment has been minimized.

@ruflin

ruflin Sep 13, 2016

Collaborator

I adjusted it accordingly. The outcome is the same.

@ruflin

ruflin Sep 13, 2016

Collaborator

I adjusted it accordingly. The outcome is the same.

filebeat/beater/filebeat.go
+ }
+
+ fb.sigWait.AddChan(fb.done)
+ fb.sigWait.Wait()

This comment has been minimized.

@urso

urso Sep 13, 2016

Collaborator

let's use a separate signalWait waiting for runOnce or fb.done to finished. This way the waiter waiting for shutdown and the one waiting for events published will not get mixed up. The fb.done channel can be installed multiple times. With channel being closed, it will send a signal immediately.

@urso

urso Sep 13, 2016

Collaborator

let's use a separate signalWait waiting for runOnce or fb.done to finished. This way the waiter waiting for shutdown and the one waiting for events published will not get mixed up. The fb.done channel can be installed multiple times. With channel being closed, it will send a signal immediately.

Allow filebeat to only run once
When the `-once` flag is used, filebeat starts all configured harvesters and prospectors and runs each prospector until the harvesters are closed. It is recommended to use the flag in combination with `close_eof` so harvester directly close when the end of the file is reached. By default harvesters are closed after `close_inactive`.
filebeat/beater/filebeat.go
+ logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)
+ waitEvents.Add(withLog(waitDuration(timeout),
+ "Continue shutdown: Time out waiting for events being published."))
+ }

This comment has been minimized.

@urso

urso Sep 16, 2016

Collaborator

Add:

} else {
    waitEvents.AddChan(fb.done)
}

This will ensure CTRL-C will be handled immediately/properly if run-once without shutdown_timeout is configured. If fb.done has been closed already, waitEvents.Wait will return immediately. If fb.done is closed while waiting for wgEvents.Wait, shutdown will continue.

@urso

urso Sep 16, 2016

Collaborator

Add:

} else {
    waitEvents.AddChan(fb.done)
}

This will ensure CTRL-C will be handled immediately/properly if run-once without shutdown_timeout is configured. If fb.done has been closed already, waitEvents.Wait will return immediately. If fb.done is closed while waiting for wgEvents.Wait, shutdown will continue.

This comment has been minimized.

@ruflin

ruflin Sep 16, 2016

Collaborator

Good point. Added.

@ruflin

ruflin Sep 16, 2016

Collaborator

Good point. Added.

@urso urso merged commit 6b7df0c into elastic:master Sep 16, 2016

4 checks passed

CLA Commit author has signed the CLA
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
default Build finished.
Details

suraj-soni added a commit to suraj-soni/beats that referenced this pull request Sep 20, 2016

Cleanup Filebeat publishers (#2463)
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see elastic#2456)
* Add getDataEvents function
* Rename publish to publisher package

suraj-soni added a commit to suraj-soni/beats that referenced this pull request Sep 20, 2016

Add uptime value to metric reporting (#2464)
This metric can be useful in combination with elastic#2456 to track how long the harvesting of a file took. Uptime is reported before shutdown.

suraj-soni added a commit to suraj-soni/beats that referenced this pull request Sep 20, 2016

Allow filebeat to only run once (#2456)
* Allow filebeat to only run once

When the `-once` flag is used, filebeat starts all configured harvesters and prospectors and runs each prospector until the harvesters are closed. It is recommended to use the flag in combination with `close_eof` so harvester directly close when the end of the file is reached. By default harvesters are closed after `close_inactive`.

amomchilov pushed a commit to amomchilov/Filebeat that referenced this pull request Apr 19, 2018

Cleanup Filebeat publishers (#2463)
* Separate async and sync publisher into own files
* Add Publish interface method which can be used to manually trigger publish (see elastic/beats#2456)
* Add getDataEvents function
* Rename publish to publisher package
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment