Skip to content

Commit

Permalink
Configurable buffer size for reading plugin log lines (#265)
Browse files Browse the repository at this point in the history
- Add `PluginLogBufferSize` option to ClientConfig, defaulting to previous 64KiB size
- Added a new test `TestClient_logStderrParseJSON` to verify the parsing of JSON formatted logs.
  • Loading branch information
luoxiaohei committed Nov 13, 2023
1 parent d16cec3 commit 7c313e4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 9 deletions.
15 changes: 12 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ var (
ErrGRPCBrokerMuxNotSupported = errors.New("client requested gRPC broker multiplexing but plugin does not support the feature")
)

// defaultPluginLogBufferSize is the default size of the buffer used to read from stderr for plugin log lines.
const defaultPluginLogBufferSize = 64 * 1024

// Client handles the lifecycle of a plugin application. It launches
// plugins, connects to them, dispenses interface implementations, and handles
// killing the process.
Expand Down Expand Up @@ -220,6 +223,10 @@ type ClientConfig struct {
// it will default to hclog's default logger.
Logger hclog.Logger

// PluginLogBufferSize is the buffer size(bytes) to read from stderr for plugin log lines.
// If this is 0, then the default of 64KB is used.
PluginLogBufferSize int

// AutoMTLS has the client and server automatically negotiate mTLS for
// transport authentication. This ensures that only the original client will
// be allowed to connect to the server, and all other connections will be
Expand Down Expand Up @@ -416,6 +423,10 @@ func NewClient(config *ClientConfig) (c *Client) {
})
}

if config.PluginLogBufferSize == 0 {
config.PluginLogBufferSize = defaultPluginLogBufferSize
}

c = &Client{
config: config,
logger: config.Logger,
Expand Down Expand Up @@ -1146,14 +1157,12 @@ func (c *Client) getGRPCMuxer(addr net.Addr) (*grpcmux.GRPCClientMuxer, error) {
return c.grpcMuxer, nil
}

var stdErrBufferSize = 64 * 1024

func (c *Client) logStderr(name string, r io.Reader) {
defer c.clientWaitGroup.Done()
defer c.stderrWaitGroup.Done()
l := c.logger.Named(filepath.Base(name))

reader := bufio.NewReaderSize(r, stdErrBufferSize)
reader := bufio.NewReaderSize(r, c.config.PluginLogBufferSize)
// continuation indicates the previous line was a prefix
continuation := false

Expand Down
60 changes: 54 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -1483,18 +1484,13 @@ func testClient_logger(t *testing.T, proto string) {

// Test that we continue to consume stderr over long lines.
func TestClient_logStderr(t *testing.T) {
orig := stdErrBufferSize
stdErrBufferSize = 32
defer func() {
stdErrBufferSize = orig
}()

stderr := bytes.Buffer{}
c := NewClient(&ClientConfig{
Stderr: &stderr,
Cmd: &exec.Cmd{
Path: "test",
},
PluginLogBufferSize: 32,
})
c.clientWaitGroup.Add(1)

Expand All @@ -1515,3 +1511,55 @@ this line is short
t.Fatalf("\nexpected output: %q\ngot output: %q", msg, read)
}
}

func TestClient_logStderrParseJSON(t *testing.T) {
logBuf := bytes.Buffer{}
c := NewClient(&ClientConfig{
Stderr: bytes.NewBuffer(nil),
Cmd: &exec.Cmd{Path: "test"},
PluginLogBufferSize: 64,
Logger: hclog.New(&hclog.LoggerOptions{
Name: "test-logger",
Level: hclog.Trace,
Output: &logBuf,
JSONFormat: true,
}),
})
c.clientWaitGroup.Add(1)

msg := `{"@message": "this is a message", "@level": "info"}
{"@message": "this is a large message that is more than 64 bytes long", "@level": "info"}`
reader := strings.NewReader(msg)

c.stderrWaitGroup.Add(1)
c.logStderr(c.config.Cmd.Path, reader)
logs := strings.Split(strings.TrimSpace(logBuf.String()), "\n")

wants := []struct {
wantLevel string
wantMessage string
}{
{"info", "this is a message"},
{"debug", `{"@message": "this is a large message that is more than 64 bytes`},
{"debug", ` long", "@level": "info"}`},
}

if len(logs) != len(wants) {
t.Fatalf("expected %d logs, got %d", len(wants), len(logs))
}

for i, tt := range wants {
l := make(map[string]interface{})
if err := json.Unmarshal([]byte(logs[i]), &l); err != nil {
t.Fatal(err)
}

if l["@level"] != tt.wantLevel {
t.Fatalf("expected level %q, got %q", tt.wantLevel, l["@level"])
}

if l["@message"] != tt.wantMessage {
t.Fatalf("expected message %q, got %q", tt.wantMessage, l["@message"])
}
}
}

0 comments on commit 7c313e4

Please sign in to comment.