Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more checks for buffer corruption on startup #3970

Open
fujimotos opened this issue Nov 25, 2022 · 7 comments
Open

Add more checks for buffer corruption on startup #3970

fujimotos opened this issue Nov 25, 2022 · 7 comments
Assignees

Comments

@fujimotos
Copy link
Member

Is your feature request related to a problem? Please describe.

Currently when starting up Fluentd outputs, we try to check if each buffer chunk
is non-empty, and if it has some bytes, we assume it contains valid data.

It turned out that this operation model has a few issues:

  • If the data was indeed corrupted, the behavior is undefined. It likely causes
    many kinds of errors in various parts of the pipeline.
  • It is also hard to tell which chunks was corrupted from td-agent.log.
    This is important because users probably want to recover the lost data.

We should perform more rigorous buffer checks on startup,
so that Fluentd can handle corrupted chunks gracefully.

Describe the solution you'd like

  • Perform more sanity checks on buffer chunks on startup.
  • Emit more error logs regarding the corrupted chunks.

Describe alternatives you've considered

N/A

Additional context

No response

@daipom
Copy link
Contributor

daipom commented Nov 25, 2022

Thanks for summarizing this issue!
I'm willing to consider this issue in December.
Here are my impressions at present.

It is also hard to tell which chunks was corrupted from td-agent.log.

I feel the next log should be the info level since these chunks could be a problem in the case of abnormal system termination, such as a machine power failure.

Perform more sanity checks on buffer chunks on startup.

We have the check in FileChunk initialization, but it can only detect the corruption of the metadata.
If the chunk body file is corrupt, it proceeds to the next logic.

In MessagePackEventStream, the size (the number of records) is initially given from the metadata, and it will be updated after unpacked.
I'm wondering if this size could be used to confirm the corruption, but it seems from the description that the metadata values are not very reliable, so this depends on that point.

  • fluentd/lib/fluent/event.rb

    Lines 236 to 247 in 981decb

    def ensure_unpacked!(unpacker: nil)
    return if @unpacked_times && @unpacked_records
    @unpacked_times = []
    @unpacked_records = []
    (unpacker || Fluent::MessagePackFactory.msgpack_unpacker).feed_each(@data) do |time, record|
    @unpacked_times << time
    @unpacked_records << record
    end
    # @size should be updated always right after unpack.
    # The real size of unpacked objects are correct, rather than given size.
    @size = @unpacked_times.size
    end

in_forward has the feature of checking the stream, but it is disabled by default, so we may not notice that corrupted data is being sent.

  • def check_and_skip_invalid_event(tag, es, remote_host)
    new_es = Fluent::MultiEventStream.new
    es.each { |time, record|
    if invalid_event?(tag, time, record)
    log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
    next
    end
    new_es.add(time, record)
    }
    new_es
    end

@daipom daipom self-assigned this Dec 28, 2022
@daipom
Copy link
Contributor

daipom commented Dec 28, 2022

I am examining this issue.

The most important thing is to detect file corruption at abnormal system termination, such as a machine power failure.
To handle this, we should improve the process of loading the existing chunk files at startup.

I feel the next log should be the info level since these chunks could be a problem in the case of abnormal system termination, such as a machine power failure.

This log level should be info at least when flush_at_shutdown is true.
When flush_at_shutdown is true, the level can be warn.

Without this log, even if we notice that we have received corrupted data on the destination server, we can not know which chunks may have been corrupted.

And, if possible, we should check for the corruption of the chunk's body when loading existing chunks.

I am currently making this modification.

@daipom
Copy link
Contributor

daipom commented Jan 26, 2023

I have created some PRs for this issue.

About adding more checks for buffer corruption, I consider the following:

  • It is difficult to determine if the content of the chunk file is broken or not.
    • Its format depends on the plugin, so we can not define what should be considered broken in general.
  • I guess we can check if we can unpack the data as MessagePack only when the output plugin does not use a custom format.
    • There is a way of corruption such that unpacking does not result in an error, but returns nil records.
    • So we probably should each all records and check for nil or other incorrect records.
    • This impact on performance would be ignorable if only for flush_at_shutdown true and buffer::resume().

@daipom
Copy link
Contributor

daipom commented Feb 17, 2023

I have created some PRs for this issue.

All PRs are merged, thanks for the reviews!
I will add documents and a release note about #4025 soon.

About adding more checks for buffer corruption, I consider the following:

* It is difficult to determine if the content of the chunk file is broken or not.
  
  * Its format depends on the plugin, so we can not define what should be considered broken in general.

* I guess we can check if we can unpack the data as MessagePack only when the output plugin does not use a custom format.
  
  * There is a way of corruption such that unpacking does not result in an error, but returns `nil` records.
  * So we probably should `each` all records and check for `nil` or other incorrect records.
  * This impact on performance would be ignorable if only for `flush_at_shutdown true` and `buffer::resume()`.

I want to work on other issues now, so I won't be able to work on this for a while.

@daipom
Copy link
Contributor

daipom commented Mar 29, 2023

Added documentation.

The following feature would be helpful, but I will not be able to work on it for a while.

About adding more checks for buffer corruption, I consider the following:

* It is difficult to determine if the content of the chunk file is broken or not.
  
  * Its format depends on the plugin, so we can not define what should be considered broken in general.

* I guess we can check if we can unpack the data as MessagePack only when the output plugin does not use a custom format.
  
  * There is a way of corruption such that unpacking does not result in an error, but returns `nil` records.
  * So we probably should `each` all records and check for `nil` or other incorrect records.
  * This impact on performance would be ignorable if only for `flush_at_shutdown true` and `buffer::resume()`.

@cosmo0920
Copy link
Contributor

The following feature would be helpful, but I will not be able to work on it for a while.

About adding more checks for buffer corruption, I consider the following:

* It is difficult to determine if the content of the chunk file is broken or not.
  
  * Its format depends on the plugin, so we can not define what should be considered broken in general.

* I guess we can check if we can unpack the data as MessagePack only when the output plugin does not use a custom format.
  
  * There is a way of corruption such that unpacking does not result in an error, but returns `nil` records.
  * So we probably should `each` all records and check for `nil` or other incorrect records.
  * This impact on performance would be ignorable if only for `flush_at_shutdown true` and `buffer::resume()`.

Not sure it's possible but if we could add checksums for the buffer contents, it would be helpful to verify the correctness of the buffers. This is already implemented in the chunkio which is used in Fluent Bit's filesystem buffering mechanism.

The main issue of the current implementation is: there is no mechanisms to detect the buffer corruptions.

@daipom
Copy link
Contributor

daipom commented Apr 8, 2024

Not sure it's possible but if we could add checksums for the buffer contents, it would be helpful to verify the correctness of the buffers. This is already implemented in the chunkio which is used in Fluent Bit's filesystem buffering mechanism.

The main issue of the current implementation is: there is no mechanisms to detect the buffer corruptions.

I agree.

I remember that when I previously made some improvements to this issue, I did not consider such a new mechanism because it would be expensive to implement and impactful to existing logic.
However, if we could add such a checksum mechanism, it would be helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants