Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat Crawler and Prospector Refactoring #720

Merged
merged 2 commits into from Jan 14, 2016

Conversation

Projects
None yet
2 participants
@ruflin
Copy link
Collaborator

commented Jan 14, 2016

The goal of this PR is that crawler, prospector properly shut down.

  • Start registrar first
  • Introduce waitgroup for started prospectors. Keep list of prospectors in crawler
  • Code cleanup

@ruflin ruflin changed the title First stage of refactoring. PR #626 need to get this clean First stage of refactoring. Jan 14, 2016

@ruflin ruflin changed the title First stage of refactoring. Filebeat Crawler and Prospector Refactoring Jan 14, 2016

@ruflin ruflin force-pushed the ruflin:prospector-stopping branch 2 times, most recently from 973cd3d to 1728607 Jan 14, 2016

@@ -17,7 +17,6 @@ func readLine(reader processor.LineProcessor) (time.Time, string, int, error) {

// Full line read to be returned
if l.Bytes != 0 && err == nil {
logp.Debug("harvester", "full line read")

This comment has been minimized.

Copy link
@ruflin

ruflin Jan 14, 2016

Author Collaborator

I removed this line as it generated too much debu output.

@@ -88,8 +95,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Publishes event to output
go Publish(b, fb)

// registrar records last acknowledged positions in all files.
fb.registrar.Run()
for {

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

why we need a for loop in done channel?

This comment has been minimized.

Copy link
@ruflin

ruflin Jan 14, 2016

Author Collaborator

We don't need it. Will remove it.

// Stop prospectors
logp.Info("Stopping filebeat")
// Stop crawler -> stop prospectors -> stop harvesters
fb.cralwer.Stop()

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

what happens if harvester is blocked by channel forwarding events to the spooler and spooler itself is blocked by libbeat due to ES/LS not responding?

This comment has been minimized.

Copy link
@ruflin

ruflin Jan 14, 2016

Author Collaborator

Good point. I just tried it out locally and unfortunately it exactly happens what you described. Any suggestion on how we should solve it?

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

stopping must ripple through the layers. The done queue can be either shared globally or we're using done queue per layer.

In harvester there are effectively 2 places the harvester can block:

  1. reading from file
  2. forwarding event to spooler

Part 2 is quite simple by using select when publishing. Part 1 should be possible by closing the file from another goroutine. You can get check actual stack-trace if filebeat shutdown is hanging by running filebeat with -httpprof :6060 and pointing your browser to http://localhost:6060/debug/pprof

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

we're basically missing some function in libbeat to close the publisher.Client asynchronously.

running bool
Registrar *Registrar
prospectors []*Prospector
wg *sync.WaitGroup

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

no need to use pointer for wg


defer func() {
p.crawlerWg.Done()
}()

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

just do defer p.crawlerWg.Done()

func (p *Prospector) Run(wg *sync.WaitGroup) {

// TODO: Defer the call to block shutdown
wg.Done()

This comment has been minimized.

Copy link
@ruflin

ruflin Jan 14, 2016

Author Collaborator

@urso Would you agree with this temporary hack? So shutdown of prospector is non blocking. This would be resolved as soon as the proper harvester shutdown would be implemented. The current behaviour would be almost identical to before but has all the structure ready to implement it properly.

This comment has been minimized.

Copy link
@urso

urso Jan 14, 2016

Collaborator

Yes, if you add the reason for the TODO in the comment.

This comment has been minimized.

Copy link
@ruflin

ruflin Jan 14, 2016

Author Collaborator

👍

ruflin added some commits Jan 14, 2016

Make clean shutdown of prospector and crawler.
* Start registrar first
* Introduce waitgroup for started prospectors. Keep list of prospectors in crawler
* Code cleanup

@ruflin ruflin force-pushed the ruflin:prospector-stopping branch from e3cd3c7 to 2d5d214 Jan 14, 2016

urso added a commit that referenced this pull request Jan 14, 2016

Merge pull request #720 from ruflin/prospector-stopping
Filebeat Crawler and Prospector Refactoring

@urso urso merged commit 0129ab4 into elastic:master Jan 14, 2016

3 of 4 checks passed

default Merged build finished.
Details
CLA Commit author has signed the CLA
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.