Skip to content

Commit b1bc4a4

Browse files
authored
fix(filebeat): prevent data corruption in TCP/Syslog/Unix input (#47618)
* fix(filebeat): prevent data corruption in TCP/Syslog input Mutating a buffer received from a TCP callback could corrupt data in the underlying bufio.Scanner. Copy the data to a separate buffer before passing it to the callback. * mention unix in changelog file * fix imports
1 parent 86bb29c commit b1bc4a4

File tree

4 files changed

+234
-1
lines changed

4 files changed

+234
-1
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: bug-fix
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Fix possible data corruption in tcp, syslog and unix inputs
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: filebeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

filebeat/input/net/tcp/input_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,26 @@
2020
package tcp
2121

2222
import (
23+
"bytes"
2324
"context"
2425
"errors"
26+
"fmt"
27+
"net"
28+
"runtime"
2529
"sync"
2630
"testing"
2731
"time"
2832

2933
netinput "github.com/elastic/beats/v7/filebeat/input/net"
3034
"github.com/elastic/beats/v7/filebeat/input/net/nettest"
3135
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
36+
libbeattesting "github.com/elastic/beats/v7/libbeat/testing"
3237
conf "github.com/elastic/elastic-agent-libs/config"
3338
"github.com/elastic/elastic-agent-libs/logp"
3439
"github.com/elastic/elastic-agent-libs/monitoring"
40+
41+
"github.com/stretchr/testify/assert"
42+
"github.com/stretchr/testify/require"
3543
)
3644

3745
func TestInput(t *testing.T) {
@@ -95,3 +103,77 @@ func TestInput(t *testing.T) {
95103
// No more events on the channel, test passed
96104
}
97105
}
106+
107+
func BenchmarkInput(b *testing.B) {
108+
port, err := libbeattesting.AvailableTCP4Port()
109+
if err != nil {
110+
b.Fatalf("cannot find available port: %s", err)
111+
}
112+
serverAddr := net.JoinHostPort("localhost", fmt.Sprintf("%d", port))
113+
114+
inp, err := configure(conf.MustNewConfigFrom(map[string]any{
115+
"host": serverAddr,
116+
"number_of_workers": 2,
117+
}))
118+
if err != nil {
119+
b.Fatalf("cannot create input: %s", err)
120+
}
121+
122+
ctx, cancel := context.WithCancel(context.Background())
123+
defer cancel()
124+
125+
v2Ctx := v2.Context{
126+
ID: b.Name(),
127+
Cancelation: ctx,
128+
Logger: logp.NewNopLogger(),
129+
MetricsRegistry: monitoring.NewRegistry(),
130+
}
131+
132+
metrics := inp.InitMetrics("tcp", v2Ctx.MetricsRegistry, v2Ctx.Logger)
133+
c := make(chan netinput.DataMetadata, 5*runtime.NumCPU())
134+
135+
go func() {
136+
if err := inp.Run(v2Ctx, c, metrics); err != nil {
137+
if !errors.Is(err, context.Canceled) {
138+
b.Errorf("input exited with error: %s", err)
139+
}
140+
}
141+
}()
142+
143+
require.EventuallyWithTf(b, func(ct *assert.CollectT) {
144+
conn, err := net.Dial("tcp", serverAddr)
145+
require.NoError(ct, err)
146+
conn.Close()
147+
}, 30*time.Second, 100*time.Millisecond, "waiting for TCP server to start")
148+
149+
testMessage := bytes.Repeat([]byte("A"), 1001)
150+
testMessage[len(testMessage)-1] = '\n'
151+
b.ResetTimer()
152+
b.RunParallel(func(pb *testing.PB) {
153+
for pb.Next() {
154+
conn, err := net.Dial("tcp", serverAddr)
155+
if err != nil {
156+
b.Errorf("cannot create connection: %s", err)
157+
continue
158+
}
159+
160+
for range 100 {
161+
_, err = conn.Write(testMessage)
162+
if err != nil {
163+
b.Errorf("failed to send data: %s", err)
164+
break
165+
}
166+
}
167+
conn.Close()
168+
169+
// Read the events from the channel to prevent blocking
170+
for range 100 {
171+
select {
172+
case <-c:
173+
case <-time.After(time.Second):
174+
b.Error("timeout waiting for event")
175+
}
176+
}
177+
}
178+
})
179+
}

filebeat/inputsource/common/streaming/handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ func SplitHandlerFactory(family inputsource.Family, logger *logp.Logger, metadat
7878
return fmt.Errorf(string(family)+" split_client error: %w", err)
7979
}
8080
r.Reset()
81-
callback(scanner.Bytes(), metadata)
81+
82+
// Deep copy the data to avoid mutating the underlying scanner buffer.
83+
buf := make([]byte, len(scanner.Bytes()))
84+
_ = copy(buf, scanner.Bytes())
85+
callback(buf, metadata)
8286
}
8387

8488
// We are out of the scanner, either we reached EOF or another fatal error occurred.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package streaming
19+
20+
import (
21+
"bufio"
22+
"net"
23+
"strings"
24+
"testing"
25+
"time"
26+
27+
"github.com/stretchr/testify/assert"
28+
"github.com/stretchr/testify/require"
29+
30+
"github.com/elastic/beats/v7/filebeat/inputsource"
31+
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
32+
"github.com/elastic/elastic-agent-libs/logp"
33+
)
34+
35+
// TestSplitHandlerDeepCopy tests that the SplitHandlerFactory creates a handler
36+
// that provides deep copies of the scanned data to the consumer callback.
37+
func TestSplitHandlerDeepCopy(t *testing.T) {
38+
logger := logp.NewNopLogger()
39+
40+
var receivedMessages []string
41+
var bufferSnapshots [][]byte
42+
43+
// Simulate a consumer callback:
44+
// 1. Stores the received message as a string for verification.
45+
// 2. Keeps a reference to the raw byte slice.
46+
// 3. Overwrites the slice to detect whether future callbacks receive
47+
// shared memory instead of independent copies.
48+
networkFunc := func(data []byte, metadata inputsource.NetworkMetadata) {
49+
receivedMessages = append(receivedMessages, string(data))
50+
bufferSnapshots = append(bufferSnapshots, data)
51+
52+
for i := range data {
53+
data[i] = 'X'
54+
}
55+
}
56+
57+
metadataFunc := func(conn net.Conn) inputsource.NetworkMetadata {
58+
return inputsource.NetworkMetadata{}
59+
}
60+
61+
server, client := net.Pipe()
62+
defer server.Close()
63+
defer client.Close()
64+
65+
header := "PREFIX:"
66+
smallMsg := header + "small_event_data"
67+
largeMsg := header + strings.Repeat("large_event_data_that_fills_buffer_", 10) + "end"
68+
expectedMessages := []string{smallMsg, largeMsg, smallMsg}
69+
70+
input := strings.Join(expectedMessages, "\n") + "\n"
71+
72+
go func() {
73+
defer client.Close()
74+
_, err := client.Write([]byte(input))
75+
require.NoError(t, err)
76+
}()
77+
78+
factory := SplitHandlerFactory(inputsource.FamilyTCP, logger, metadataFunc, networkFunc, bufio.ScanLines)
79+
config := ListenerConfig{
80+
MaxMessageSize: cfgtype.ByteSize(2048),
81+
Timeout: 1 * time.Second,
82+
}
83+
handler := factory(config)
84+
85+
err := handler(t.Context(), server)
86+
assert.NoError(t, err)
87+
88+
require.Len(t, receivedMessages, len(expectedMessages), "should receive all messages")
89+
90+
for i, received := range receivedMessages {
91+
assert.Equal(t, expectedMessages[i], received, "message %d should be received intact", i)
92+
}
93+
94+
// Verify that each callback received an independent copy of the buffer.
95+
for i := range bufferSnapshots {
96+
assert.Equal(t,
97+
strings.Repeat("X", len(expectedMessages[i])),
98+
string(bufferSnapshots[i]),
99+
"buffer %d should be fully mutated inside the callback", i,
100+
)
101+
}
102+
}

0 commit comments

Comments
 (0)