Skip to content

Commit

Permalink
filebeat/input/tcp - Fix panic in linux input metrics (#35068) (#35074)
Browse files Browse the repository at this point in the history
A panic occurred while parsing /proc/net/tcp and reaching the final empty line.
This occurred on Linux when the desired socket was not found in the list.

Fixes #35064
  • Loading branch information
andrewkroh committed Apr 12, 2023
1 parent ce8216a commit a550cf8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -106,7 +106,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770]
- Fix TestMultiEventForEOFRetryHandlerInput unit test of CometD input {pull}34903[34903]
- Add input instance id to request trace filename for httpjson and cel inputs {pull}35024[35024]
- Fix panic in UDP input on Linux when collecting socket metrics from OS. {issue}35064[35064]
- Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064]

*Heartbeat*

Expand Down
12 changes: 6 additions & 6 deletions filebeat/input/tcp/input.go
Expand Up @@ -246,7 +246,7 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
for {
select {
case <-t.C:
rx, err := procNetTCP(addr)
rx, err := procNetTCP("/proc/net/tcp", addr)
if err != nil {
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
Expand All @@ -263,18 +263,18 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
// socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetTCP(addr []string) (rx int64, err error) {
b, err := os.ReadFile("/proc/net/tcp")
func procNetTCP(path string, addr []string) (rx int64, err error) {
b, err := os.ReadFile(path)
if err != nil {
return 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, fmt.Errorf("/proc/net/tcp entry not found for %s (no line)", addr)
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
for _, l := range lines[1:] {
f := bytes.Fields(l)
if contains(f[1], addr) {
if len(f) > 4 && contains(f[1], addr) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[4]))
Expand All @@ -286,7 +286,7 @@ func procNetTCP(addr []string) (rx int64, err error) {
return rx, nil
}
}
return 0, fmt.Errorf("/proc/net/tcp entry not found for %s", addr)
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
Expand Down
41 changes: 41 additions & 0 deletions filebeat/input/tcp/input_test.go
@@ -0,0 +1,41 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestProcNetTCP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
rx, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"0100007F:17AC"})
if err != nil {
t.Fatal(err)
}
assert.EqualValues(t, 1, rx)
})

t.Run("without_match", func(t *testing.T) {
_, err := procNetTCP("testdata/proc_net_tcp.txt", []string{"FOO:BAR", "BAR:BAZ"})
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})
}
2 changes: 2 additions & 0 deletions filebeat/input/tcp/testdata/proc_net_tcp.txt
@@ -0,0 +1,2 @@
sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
1: 0100007F:17AC 00000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 104724420 1 0000000000000000 100 0 0 10 0
4 changes: 2 additions & 2 deletions filebeat/input/udp/input.go
Expand Up @@ -264,7 +264,7 @@ func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s (no line)", addr)
return 0, 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
for _, l := range lines[1:] {
f := bytes.Fields(l)
Expand All @@ -284,7 +284,7 @@ func procNetUDP(path string, addr []string) (rx, drops int64, err error) {
return rx, drops, nil
}
}
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s", addr)
return 0, 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

func contains(b []byte, addr []string) bool {
Expand Down

0 comments on commit a550cf8

Please sign in to comment.