From c1c047aa609e0d220f196b202833aed1e3e63d3e Mon Sep 17 00:00:00 2001 From: QxBytes <39818795+QxBytes@users.noreply.github.com> Date: Fri, 21 Jun 2024 11:42:27 -0700 Subject: [PATCH 1/2] fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789) * move bufio reader creation out of for loop if the bufio reader is created in the for loop we get unmarshaling errors * fix linter issue * add fixed ut * fix existing unit test flake due to closing pipe on error a previous fix ensured the socket closed on error, but this caused an existing ut to nondeterministically fail without the previous fix, the socket wouldn't have been closed on error * make read inline --- telemetry/telemetrybuffer.go | 18 +++++----------- telemetry/telemetrybuffer_test.go | 34 +++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 0ea13f4e0a..7fc49e7eb3 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -123,12 +123,14 @@ func (tb *TelemetryBuffer) StartServer() error { tb.connections = remove(tb.connections, index) } }() - + reader := bufio.NewReader(conn) for { - reportStr, err := read(conn) - if err != nil { + reportStr, readErr := reader.ReadBytes(Delimiter) + if readErr != nil { return } + reportStr = reportStr[:len(reportStr)-1] + var tmp map[string]interface{} err = json.Unmarshal(reportStr, &tmp) if err != nil { @@ -195,16 +197,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) { } } -// read - read from the file descriptor -func read(conn net.Conn) (b []byte, err error) { - b, err = bufio.NewReader(conn).ReadBytes(Delimiter) - if err == nil { - b = b[:len(b)-1] - } - - return -} - // Write - write to the file descriptor. func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { buf := make([]byte, len(b)) diff --git a/telemetry/telemetrybuffer_test.go b/telemetry/telemetrybuffer_test.go index f226c6a87b..35ea34ef7e 100644 --- a/telemetry/telemetrybuffer_test.go +++ b/telemetry/telemetrybuffer_test.go @@ -67,6 +67,36 @@ func TestClientConnClose(t *testing.T) { tbClient.Close() } +func TestCloseOnWriteError(t *testing.T) { + tbServer, closeTBServer := createTBServer(t) + defer closeTBServer() + + tbClient := NewTelemetryBuffer(nil) + err := tbClient.Connect() + require.NoError(t, err) + defer tbClient.Close() + + data := []byte("{\"good\":1}") + _, err = tbClient.Write(data) + require.NoError(t, err) + // need to wait for connection to populate in server + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns := tbServer.connections + tbServer.mutex.Unlock() + require.Len(t, conns, 1) + + // the connection should be automatically closed on failure + badData := []byte("} malformed json }}}") + _, err = tbClient.Write(badData) + require.NoError(t, err) + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns = tbServer.connections + tbServer.mutex.Unlock() + require.Empty(t, conns) +} + func TestWrite(t *testing.T) { _, closeTBServer := createTBServer(t) defer closeTBServer() @@ -84,8 +114,8 @@ func TestWrite(t *testing.T) { }{ { name: "write", - data: []byte("testdata"), - want: len("testdata") + 1, // +1 due to Delimiter('\n) + data: []byte("{\"testdata\":1}"), + want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n) wantErr: false, }, { From 4e80d0c400591c5e15062d946202c07468119a25 Mon Sep 17 00:00:00 2001 From: QxBytes Date: Tue, 25 Jun 2024 10:42:02 -0700 Subject: [PATCH 2/2] make ut compatible with 1.4.x --- telemetry/telemetrybuffer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telemetry/telemetrybuffer_test.go b/telemetry/telemetrybuffer_test.go index 35ea34ef7e..cdb79849f7 100644 --- a/telemetry/telemetrybuffer_test.go +++ b/telemetry/telemetrybuffer_test.go @@ -71,7 +71,7 @@ func TestCloseOnWriteError(t *testing.T) { tbServer, closeTBServer := createTBServer(t) defer closeTBServer() - tbClient := NewTelemetryBuffer(nil) + tbClient := NewTelemetryBuffer() err := tbClient.Connect() require.NoError(t, err) defer tbClient.Close()