Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filestream include_message do not correctly track the offset of a file #39653

Closed
belimawr opened this issue May 21, 2024 · 1 comment · Fixed by #39873
Closed

Filestream include_message do not correctly track the offset of a file #39653

belimawr opened this issue May 21, 2024 · 1 comment · Fixed by #39873
Assignees
Labels
bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Comments

@belimawr
Copy link
Contributor

For confirmed bugs, please report:

  • Version: main
  • Operating System: All

When the include_message field is used, the offset is only updated based on the length of the message read, it ignores the amount of data read and not ingested.

How to reproduce

  1. Create a file (/tmp/foo.log) with the following content:
    TEST
    A
    
  2. Create a filebeat.yml with the following content:
    filebeat.inputs:
      - type: filestream
        parsers:
          - include_message.patterns:
              - ^A$
        id: my-filestream-id
        enabled: true
        paths:
          - /tmp/foo.log
    
    output:
      console:
        codec.json:
          pretty: true
  3. Run Filebeat and wait for the event to be printed in the console
  4. Stop Filebeat
  5. Look a the registry log file, the offset will be 2, corresponding to the size of the message, not the bytes advanced in the file.
    {"k":"filestream::my-filestream-id::native::26550-34","v":{"cursor":{"offset":2},"meta":{"source":"/tmp/foo.log","identifier_name":"native"},"ttl":1800000000000,"updated":[280445103831836,1716310115]}}
  6. Stop Filebeat
  7. Start Filebeat
  8. Wait until the same message gets published/printed to the console
  9. Look at the registry file once more, the offset has been increased by 2, for a total of 4.
    {"k":"filestream::my-filestream-id::native::26550-34","v":{"cursor":{"offset":4},"meta":{"source":"/tmp/foo.log","identifier_name":"native"},"ttl":1800000000000,"updated":[280444802347591,1716310433]}}

The problem happens because the parser.FilterParser does not account for the size of the lines it discards.

func (p *FilterParser) Next() (reader.Message, error) {
for p.ctx.Err() == nil {
message, err := p.r.Next()
if err != nil {
return message, err
}
if p.matchAny(string(message.Content)) {
return message, err
}
p.logger.Debug("dropping message because it does not match any of the provided patterns [%v]: %s", p.matchers, string(message.Content))
}
return reader.Message{}, io.EOF
}

Then when Filestream gets the message, it increases the file offset by the message's size instead of the amount of bytes advanced in the file.

message, err := r.Next()
if err != nil {
if errors.Is(err, ErrFileTruncate) {
log.Infof("File was truncated, nothing to read. Path='%s'", path)
} else if errors.Is(err, ErrClosed) {
log.Infof("Reader was closed. Closing. Path='%s'", path)
} else if errors.Is(err, io.EOF) {
log.Debugf("EOF has been reached. Closing. Path='%s'", path)
} else {
log.Errorf("Read line error: %v", err)
metrics.ProcessingErrors.Inc()
}
return nil
}
s.Offset += int64(message.Bytes)

@belimawr belimawr added bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team labels May 21, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants