Skip to content

Commit

Permalink
fixup! Try to asynchronously connect to Fluentd before writing
Browse files Browse the repository at this point in the history
  • Loading branch information
Albin Kerouanton committed Apr 23, 2020
1 parent 9978fb3 commit 565a204
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
}

func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool) {
f.wg.Add(1)
defer f.wg.Done()

waiter := time.After(time.Duration(0))
Expand All @@ -381,14 +380,14 @@ func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool)
err := f.connect(ctx)
if err == nil {
f.ready <- true
break
return
}

if _, ok := err.(*ErrUnknownNetwork); ok {
// No need to retry on unknown network error. Thus false is passed
// to ready channel to let the other end drain the message queue.
f.ready <- false
break
return
}

waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
Expand All @@ -398,7 +397,7 @@ func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool)

waiter = time.After(time.Duration(waitTime) * time.Millisecond)
case <-stopAsyncConnect:
break
return
}
}
}
Expand All @@ -414,7 +413,9 @@ func (f *Fluent) run() {
// the select on f.stopRunning to signal its end to this goroutine.
ctx, cancelDialing := context.WithCancel(context.Background())
stopAsyncConnect := make(chan bool)
f.wg.Add(1)
go f.connectAsync(ctx, stopAsyncConnect)

select {
case <-f.stopRunning:
drainEvents = true
Expand Down

0 comments on commit 565a204

Please sign in to comment.