Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WithContext methods to the Fluent struct including handling of context deadlines #86

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 107 additions & 36 deletions fluent/fluent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fluent

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -78,17 +79,24 @@ func NewErrUnknownNetwork(network string) error {
}

type msgToSend struct {
ctx context.Context
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ctx is referred only in f.write(), and its argument is only the msg. Putting contexts into msgToSend only for passing it to f.write() looks too much to me (because msgToSend is a message, not any other internal structure!)
How about containing ctx in bufferInput? (And add an argument ctx to f.write()?)
It's a structure of msg and internal objects, so it seems a better struct to have the context.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do so, EncodeDataWithContext is not needed anymore, right?
That function looks only for putting contexts to msgToSend structure.

data []byte
ack string
}

type bufferInput struct {
msg *msgToSend
result chan<- error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be syncWriteResult or a similar one to show it's only about sync-write.

}

type Fluent struct {
Config

dialer dialer
stopRunning chan bool
pending chan *msgToSend
pending chan bufferInput
wg sync.WaitGroup
resultPool sync.Pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want a different name for this too, like result above. The current name looks so confusing to me.


muconn sync.Mutex
conn net.Conn
Expand All @@ -108,6 +116,10 @@ type dialer interface {
Dial(string, string) (net.Conn, error)
}

type dialerWithContext interface {
DialContext(context.Context, string, string) (net.Conn, error)
}

func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
if config.FluentNetwork == "" {
config.FluentNetwork = defaultNetwork
Expand Down Expand Up @@ -140,22 +152,24 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
config.Async = config.Async || config.AsyncConnect
}

if config.Async {
f = &Fluent{
Config: config,
dialer: d,
pending: make(chan *msgToSend, config.BufferLimit),
}
f.wg.Add(1)
go f.run()
} else {
f = &Fluent{
Config: config,
dialer: d,
f = &Fluent{
Config: config,
dialer: d,
pending: make(chan bufferInput, config.BufferLimit),
resultPool: sync.Pool{
New: func() interface{} {
return make(chan error, 1)
},
},
}
if !config.Async {
if err = f.connect(context.Background()); err != nil {
return
}
err = f.connect()
}

f.wg.Add(1)
go f.run()
Comment on lines +171 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those should be only for Async, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return
}

Expand Down Expand Up @@ -185,17 +199,25 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
// f.Post("tag_name", structData)
//
func (f *Fluent) Post(tag string, message interface{}) error {
return f.PostWithContext(context.Background(), tag, message)
}

func (f *Fluent) PostWithContext(ctx context.Context, tag string, message interface{}) error {
timeNow := time.Now()
return f.PostWithTime(tag, timeNow, message)
return f.PostWithTimeAndContext(ctx, tag, timeNow, message)
}

func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) error {
return f.PostWithTimeAndContext(context.Background(), tag, tm, message)
}

func (f *Fluent) PostWithTimeAndContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
if len(f.TagPrefix) > 0 {
tag = f.TagPrefix + "." + tag
}

if m, ok := message.(msgp.Marshaler); ok {
return f.EncodeAndPostData(tag, tm, m)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, m)
}

msg := reflect.ValueOf(message)
Expand All @@ -215,7 +237,7 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
}
kv[name] = msg.FieldByIndex(field.Index).Interface()
}
return f.EncodeAndPostData(tag, tm, kv)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
}

if msgtype.Kind() != reflect.Map {
Expand All @@ -229,13 +251,17 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
kv[k.String()] = msg.MapIndex(k).Interface()
}

return f.EncodeAndPostData(tag, tm, kv)
return f.EncodeAndPostDataWithContext(ctx, tag, tm, kv)
}

func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
return f.EncodeAndPostDataWithContext(context.Background(), tag, tm, message)
}

func (f *Fluent) EncodeAndPostDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) error {
var msg *msgToSend
var err error
if msg, err = f.EncodeData(tag, tm, message); err != nil {
if msg, err = f.EncodeDataWithContext(ctx, tag, tm, message); err != nil {
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
}
return f.postRawData(msg)
Expand All @@ -251,7 +277,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
return f.appendBuffer(msg)
}
// Synchronous write
return f.write(msg)
return f.appendBufferBlocking(msg)
}

// For sending forward protocol adopted JSON
Expand Down Expand Up @@ -296,8 +322,12 @@ func getUniqueID(timeUnix int64) (string, error) {
}

func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
return f.EncodeDataWithContext(context.Background(), tag, tm, message)
}

func (f *Fluent) EncodeDataWithContext(ctx context.Context, tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
option := make(map[string]string)
msg = &msgToSend{}
msg = &msgToSend{ctx: ctx}
timeUnix := tm.Unix()
if f.Config.RequestAck {
var err error
Expand Down Expand Up @@ -338,13 +368,37 @@ func (f *Fluent) Close() (err error) {
// appendBuffer appends data to buffer with lock.
func (f *Fluent) appendBuffer(msg *msgToSend) error {
select {
case f.pending <- msg:
case f.pending <- bufferInput{msg: msg}:
default:
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
}
return nil
}

// appendBufferWithFeedback appends data to buffer and waits for the result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appendBufferWithFeedback did you update the name after writing this comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, will change the comment :)

func (f *Fluent) appendBufferBlocking(msg *msgToSend) error {
result := f.resultPool.Get().(chan error)
// write the data to the buffer and block if the buffer is full
select {
case f.pending <- bufferInput{msg: msg, result: result}:
// don't do anything
case <-msg.ctx.Done():
// because the result channel is not used, it can safely be returned to the sync pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this kind of comment 👍

f.resultPool.Put(result)
return msg.ctx.Err()
}

select {
case err := <-result:
f.resultPool.Put(result)
return err
case <-msg.ctx.Done():
// the context deadline has exceeded, but there is no result yet. So the result channel cannot be returned to
// the pool, as it might be written later.
return msg.ctx.Err()
}
}

// close closes the connection.
func (f *Fluent) close(c net.Conn) {
f.muconn.Lock()
Expand All @@ -356,19 +410,23 @@ func (f *Fluent) close(c net.Conn) {
}

// connect establishes a new connection using the specified transport.
func (f *Fluent) connect() (err error) {
func (f *Fluent) connect(ctx context.Context) (err error) {
var address string
switch f.Config.FluentNetwork {
case "tcp":
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
address = f.Config.FluentHost + ":" + strconv.Itoa(f.Config.FluentPort)
case "unix":
f.conn, err = f.dialer.Dial(
f.Config.FluentNetwork,
f.Config.FluentSocketPath)
address = f.Config.FluentSocketPath
default:
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
return
}
if d, ok := f.dialer.(dialerWithContext); ok {
f.conn, err = d.DialContext(ctx, f.Config.FluentNetwork, address)
} else {
f.conn, err = f.dialer.Dial(f.Config.FluentNetwork, address)
}

return err
}

Expand All @@ -386,7 +444,11 @@ func (f *Fluent) run() {
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
continue
}
err := f.write(entry)
err := f.write(entry.msg)
if entry.result != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases to get nil on entry.result?

Copy link
Author

@arnogeurts arnogeurts Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so in this change the sync flow and the async flow get very similar. Both write to the pending channel and have a separate go-routine taking entries of that channel and writing the data to the TCP connection.

In case of the async flow no feedback is required, so there will be no result channel in the entry. As you can see in appendBuffer an entry is written to the channel without a result channel.

In case of the sync flow, a result channel is added to the entry, so the feedback can be returned to the caller. And as you can see appendBufferBlocking puts the result channel in the entry and waits on the feedback (hence the "blocking").

entry.result <- err
continue
}
if err != nil {
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
}
Expand All @@ -413,7 +475,7 @@ func (f *Fluent) write(msg *msgToSend) error {
if c == nil {
f.muconn.Lock()
if f.conn == nil {
err := f.connect()
err := f.connect(msg.ctx)
if err != nil {
f.muconn.Unlock()

Expand All @@ -425,7 +487,13 @@ func (f *Fluent) write(msg *msgToSend) error {
if waitTime > f.Config.MaxRetryWait {
waitTime = f.Config.MaxRetryWait
}
time.Sleep(time.Duration(waitTime) * time.Millisecond)
waitDuration := time.Duration(waitTime) * time.Millisecond
if deadline, hasDeadLine := msg.ctx.Deadline(); hasDeadLine && deadline.Before(time.Now().Add(waitDuration)) {
// the context deadline is within the wait time, so after the sleep the deadline will have been
// exceeded. It is a waste of time to wait on that.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it wait until the deadline?
This code is to return DeadlineExceeded when now + waitDuration > deadline but the DeadlineExceeded should occur when the current time is after the deadline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If deadline.Before(time.Now()), it should return DeadlineExceeded immediately.
If deadline.Before(time.Now().Add(waitDuration), it should sleep until the deadline.
MaxRetryWait may be configured as minutes or hours, so the current code could cause unexpectedly early return of errors.

return context.DeadlineExceeded
}
time.Sleep(waitDuration)
continue
}
}
Expand All @@ -435,11 +503,14 @@ func (f *Fluent) write(msg *msgToSend) error {

// We're connected, write msg
t := f.Config.WriteTimeout
var deadline time.Time
if time.Duration(0) < t {
c.SetWriteDeadline(time.Now().Add(t))
} else {
c.SetWriteDeadline(time.Time{})
deadline = time.Now().Add(t)
}
if ctxDeadline, hasDeadline := msg.ctx.Deadline(); hasDeadline && (deadline.IsZero() || ctxDeadline.Before(deadline)) {
deadline = ctxDeadline
}
c.SetWriteDeadline(deadline)
_, err := c.Write(msg.data)
if err != nil {
f.close(c)
Expand Down
54 changes: 54 additions & 0 deletions fluent/fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluent

import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
Expand Down Expand Up @@ -449,6 +450,59 @@ func TestPostWithTime(t *testing.T) {
}
}

func TestPostWithTimeAndContext(t *testing.T) {
testcases := map[string]Config{
"with Async": {
Async: true,
MarshalAsJSON: true,
TagPrefix: "acme",
},
"without Async": {
Async: false,
MarshalAsJSON: true,
TagPrefix: "acme",
},
}

for tcname := range testcases {
t.Run(tcname, func(t *testing.T) {
tc := testcases[tcname]
t.Parallel()

d := newTestDialer()
var f *Fluent
defer func() {
if f != nil {
f.Close()
}
}()
deadline := time.Now().Add(1 * time.Second)

go func() {
var err error
if f, err = newWithDialer(tc, d); err != nil {
t.Errorf("Unexpected error: %v", err)
}
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
defer cancelFunc()

_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
_ = f.PostWithTimeAndContext(ctx, "tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
}()

conn := d.waitForNextDialing(true)
assertReceived(t,
conn.waitForNextWrite(true, ""),
"[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]")

assertReceived(t,
conn.waitForNextWrite(true, ""),
"[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]")
assert.Equal(t, conn.writeDeadline, deadline)
})
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add another test case for the context timeout, via sync and async logger?

func TestReconnectAndResendAfterTransientFailure(t *testing.T) {
testcases := map[string]Config{
"with Async": {
Expand Down