Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make messages with identical timestamps sortable by ULID #6711

Merged
merged 33 commits into from Nov 30, 2022

Conversation

mpfz0r
Copy link
Member

@mpfz0r mpfz0r commented Nov 3, 2019

The ULID format in the gl2_message_id field is composed of a 48 bit timestamp followed by 80 bits
of randomness.
Use the first 16 bits of the random field to embed a sequence number
for each message.
If a batch of messages was received with identical timestamps
(the same millisecond), the original receive order is kept by the
encoded sequence number which directly follows the timestamp.

This allows us to sort messages by gl2_message_id which should have the correct original
order in most cases.

CAVEATS

This is a best effort approach to a complicated problem. It's not a silver bullet.
Here are reasons why the gl2_message_id sort order might not always be correct:

  • The sequence number is generated per node and input.
    This means that sorting will not work if an input is load balanced over multiple nodes.

  • There is only space for 60535 messages with the same timestamp and input.

  • Also, there is a small chance that, if too many batches of messages with the same timestamp and input get processed
    in parallel, the sort order might be wrong.

Performance Impact

Running a benchmark, which ingests 8 million messages.
Four parallel curl loops send batches of 5000 messages with identical timestamps.

Without this change, this takes 3m 19s
With this change: 3m 25s

Which means approx 40k msg/sec and no measurable performance impact.

Fixes #2741

@mpfz0r
Copy link
Member Author

mpfz0r commented Nov 21, 2019

/rebase

bernd
bernd previously requested changes Dec 13, 2019
Copy link
Member

@bernd bernd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments:

Copy link
Member Author

@mpfz0r mpfz0r left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I've updated the PR

@bernd
Copy link
Member

bernd commented Jan 27, 2020

Checking for an existing "gl2_message_id" and using the message timestamp is done in #7290.

@bernd bernd modified the milestones: 3.3.0, 4.0.0 May 12, 2020
@bernd bernd removed this from the 4.0.0 milestone Jan 27, 2022
@bernd
Copy link
Member

bernd commented Oct 12, 2022

@mpfz0r Does this implementation still work when a non-local message journal implementation (e.g., Kafka) is used? This just crossed my mind.

@mpfz0r
Copy link
Member Author

mpfz0r commented Oct 12, 2022

@mpfz0r Does this implementation still work when a non-local message journal implementation (e.g., Kafka) is used? This just crossed my mind.

It does. I had the same thought at first, but only my first iteration depended on the journal 😅

Copy link
Contributor

@patrickmann patrickmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Tested with syslog input - sorting by gl2_message_id works as expected

The ULID format is composed of a 48 bit timestamp followed by 80 bits
of randomness.
Use the first 16 bits of the random field to embed a sequence number
for each message.
If a batch of messages was received with identical timestamps
(the same millisecond), the original receive order is kept by the
encoded sequence number which directly follows the timestamp.
Introduce a seqence number on each input which gets incremented
and embedded in every received message.
This allows sorting to work without depending on a KafkaJournal.

Regenerate the protobuf class JournalMessages so it can pass the
sequence number from a RawMessage to a Message.

Don't overwrite already existing GL2_MESSAGE_IDs.
They might already be set if we are receiving messages from a Graylog forwarder.
Every message should have a gl2_message_id now
This enables us to sort by gl2_message_id, even if the field does not
exist.
This might be the case with older indices, restored from archives.
This test should not be using the default index template
@mpfz0r mpfz0r added inputs and removed in progress labels Nov 25, 2022
Copy link
Contributor

@thll thll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice improvement! 👍

@mpfz0r mpfz0r requested a review from thll November 29, 2022 17:54
@bernd bernd dismissed their stale review November 30, 2022 13:42

No time to review it again.

@mpfz0r mpfz0r merged commit 18678f1 into master Nov 30, 2022
@mpfz0r mpfz0r deleted the ulid-sorted-messages branch November 30, 2022 13:47
bernd pushed a commit that referenced this pull request Dec 9, 2022
* Make messages with identical timestamps sortable by ULID

The ULID format is composed of a 48 bit timestamp followed by 80 bits
of randomness.
Use the first 16 bits of the random field to embed a sequence number
for each message.
If a batch of messages was received with identical timestamps
(the same millisecond), the original receive order is kept by the
encoded sequence number which directly follows the timestamp.

* Add license

* Use a MessageInput sequenceNr instead of journalOffset

Introduce a seqence number on each input which gets incremented
and embedded in every received message.
This allows sorting to work without depending on a KafkaJournal.

Regenerate the protobuf class JournalMessages so it can pass the
sequence number from a RawMessage to a Message.

Don't overwrite already existing GL2_MESSAGE_IDs.
They might already be set if we are receiving messages from a Graylog forwarder.

* final some variables

* Fix review comments

* Use protoc 2.5.0 instead of 3.0.0

* Handle sequenceNr wrap and exceptions

* update license

* better log messages; bump offset gap

* increase cache size

* Cleanup and fix test

* Fix some comments and add changelog

* Bump OFFSET_GAP to 5000 and rename a few constants

With an OFFSET_GAP of 5000, I couldn't reproduce negative sequence
numbers anymore.
It's a tradeoff, but ordering only 60535 messages for the same timestamp
is reasonable.

* improve java doc

* Always add gl2_message_id as a second sort order, if sorting by timestamp is requested.

* Don't assume that sort list is mutable

* Fix BackendStartupIT test

Every message should have a gl2_message_id now

* Clarify when gl2_message_id might not be empty

* Add a index mapping for gl2_message_id

This enables us to sort by gl2_message_id, even if the field does not
exist.
This might be the case with older indices, restored from archives.

* Fix IndexMappingTest

* fix FieldTypePollerIT

* Fix FieldAliasForEvents IT

This test should not be using the default index template

* Handle Messages with sequenceNr that are received from Forwarders

* Include the nodeId into the sequenceNr lookup cache

Multiple forwarders can run the same input, that's why we need
to differentiate them in the cache.

* Provide unmapped_type for gl2_message_id sort

This allows us to sort on older indices that might not have
that field yet.

https://www.elastic.co/guide/en/elasticsearch/reference/7.17/sort-search-results.html#_ignoring_unmapped_fields

* Only add gl2_message_id sort if not already present

Also add some tests

* Simply reset the sequenceNrCache in case we exceed the ULID limit

This is simpler and probably leads to less log messages.

* Improve log message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Overcoming log ordering issues with millisecond precision
4 participants