Skip to content

Commit

Permalink
[Filebeat] Reduce memory usage of multiline (#14068) (#14073)
Browse files Browse the repository at this point in the history
The use of time.After when multiline is enabled and lines don't match
the multiline pattern increases the memory usage (from 30MB to 1GB).

This extra memory is attributed to unexpired timers allocated internally
by the Go runtime when time.After(duration) is used. According to the
docs: "If efficiency is a concern, use NewTimer instead and call Timer.Stop
if the timer is no longer needed.".

It's not clear to me why this problem only appears when many lines on
the input file don't match the pattern.

(cherry picked from commit ce651e0)
  • Loading branch information
adriansr committed Oct 15, 2019
1 parent 4087522 commit 12ee6cd
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Fixed increased memory usage with large files when multiline pattern does not match. {issue}14068[14068]

*Heartbeat*

Expand Down
9 changes: 5 additions & 4 deletions libbeat/reader/readfile/timeout.go
Expand Up @@ -28,7 +28,7 @@ var (
errTimeout = errors.New("timeout")
)

// timeoutProcessor will signal some configurable timeout error if no
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
reader reader.Reader
Expand All @@ -43,7 +43,7 @@ type lineMessage struct {
err error
}

// New returns a new timeout reader from an input line reader.
// NewTimeoutReader returns a new timeout reader from an input line reader.
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
if signal == nil {
signal = errTimeout
Expand Down Expand Up @@ -75,14 +75,15 @@ func (r *TimeoutReader) Next() (reader.Message, error) {
}
}()
}

timer := time.NewTimer(r.timeout)
select {
case msg := <-r.ch:
if msg.err != nil {
r.running = false
}
timer.Stop()
return msg.line, msg.err
case <-time.After(r.timeout):
case <-timer.C:
return reader.Message{}, r.signal
}
}

0 comments on commit 12ee6cd

Please sign in to comment.