Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(router) Add a router strategy to route messages to multiple strategies. #368

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

fpacifici
Copy link
Contributor

This PR introduces a Router processing strategy that delivers messages to multiple independent strategies, keeps track of the commits that can happen out of order and commit in the right order on Kafka.

Why a router strategy?
There are multiple use cases. In the specific this is to route messages to a low priority second topic to put old messages aside and prioritize new ones. This is meant to automate the process that we employ to route messages to ingest-events-2.
There are other scenarios like sending messages to different topics in the indexer, divide processing in multiple classes on different multiprocess pools, etc.

How does it work ?

  • the strategy is provided a selector and a list of strategies to route messages to.
  • the selector inspects a message and routes it. Ideally this should be based on the message header to avoid parsing the message.
  • the strategy wraps the commit function passed to the destinations in order to intercept all commits.
  • all commits are registered into an object that keeps track of the watermark to commit.

What's the commit policy ?
Each strategy can have its own commit policy, they can commit when they want. Each strategy is expected to commit offsets in order per partition. This is not different than the standard Kafka behavior. Different parallel destination strategies can commit out of order with respect to each other.

How will this be used ?
The first use case will be to automate the ingest-events2 process to put older messages aside during ingestion when we are trying to burn a backlog.
Today that process requires people to manually change the Relay configuration to route new messages to a separate topic and start a consumer there.
This strategy will be used in the ingest consumer so that really old messages will be sent to a strategy that produces on a backup topic reaching the newer messages soon.
If this works well we will expand the usage to all consumers where in order delivery is not strictly needed. We could consider adding it inside the StreamProcessor to ensire it will always be there.

@fpacifici fpacifici requested review from a team as code owners May 19, 2024 03:04
@untitaker
Copy link
Member

Different parallel destination strategies can commit out of order with respect to each other.

this basically means that parallel strategies cannot work on the same topic, right? they have to commit to entirely different topics or consumer groups.

@untitaker
Copy link
Member

untitaker commented May 22, 2024

@fpacifici i've read the code, it looks like it would work. but what do you think of changing Message so that it carries a set of ranges per partition to commit? then there would be less complexity in figuring out what is safe to commit, and invocations of the commit callback would not have to carry information about which route was taken. it would therefore make the router a simple strategy instead of a strategy-factory that also has to wrap the commit callback.

the downside is that it is a more fundamental change and requires changes to all existing strategies that merge/unmerge messages

@fpacifici
Copy link
Contributor Author

this basically means that parallel strategies cannot work on the same topic, right? they have to commit to entirely different topics or consumer groups.

If I understand your question correctly, you are asking whether the routes (the strategies we route message on) would have to commit on different topics.
That would not be the case. The router is meant exactly to track commits from parallel strategies on the same topic and reorder them.

@fpacifici
Copy link
Contributor Author

@fpacifici i've read the code, it looks like it would work. but what do you think of changing Message so that it carries a set of ranges per partition to commit? then there would be less complexity in figuring out what is safe to commit, and invocations of the commit callback would not have to carry information about which route was taken. it would therefore make the router a simple strategy instead of a strategy-factory that also has to wrap the commit callback.

I am not sure I grasp how this would work. I have a few questions:

Do I understand it correctly that the message that reach the strategy would already know up to where it can commit so the strategy could just go on and commit when it wants ?

If that the case how would we deal with a scenario where two parallel strategies commit at different intervals: the first commit each message while the second commit in large batches ? It is only when the commit is issued by the strategy that we can know which range we can commit.

Route 2: ------------------------- 25 --- 30 --- 35
```

Route1 offsets are processed by a different strategy independently from
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add docstring in several functions to show what the expected/desired outcome is.

logger = logging.getLogger(__name__)


class PartitionWatermark:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no thread-safety in here. Is this intended?

"""
self.__uncommitted[route].append(offset)

def rewind(self, route: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewind requires iteration of all committed messages, and there is no limit to that array, which makes it a O(N) operation. does it make sense to store highest committed value in a separate data structure to access it in O(1)? ofc, this optimization is worth pursuing if you expect it to be called often

An uncommitted offset is added when the message is consumed, it is
being processed but before the offset is committed.
"""
self.__uncommitted[route].append(offset)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small idea:

nit: Instead of growing/shrinking, pre-allocate a buffer and use it as storage with 2 pointers, and depending on the usage, increase it (like an arena allocator)


self.__highest_committed: Optional[int] = None

def add_message(self, route: str, offset: int) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use string interning instead of str as the key.


return high_watermark

@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using cached_property for uncommitted_offsets

raise

def close(self) -> None:
self.__closed = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: checking self.__closed on close and terminate might be nice.

@untitaker
Copy link
Member

If that the case how would we deal with a scenario where two parallel strategies commit at different intervals: the first commit each message while the second commit in large batches ? It is only when the commit is issued by the strategy that we can know which range we can commit.

the message struct would contain a set of ranges per partition, and the commit callback would commit the highest watermark that is "covered" by a set of contiguous ranges.

essentially this means the first message emits commit requests for 1..1, 2..2, 3..3, while the second strategy emits requests for 4..10, 10..20. it's then the job of the commit callback to reassemble those ranges, figure out that all of those can be constructed into a larger range 1..20, and since that range covers all offsets since the last commit, 20 can be committed.

@fpacifici
Copy link
Contributor Author

essentially this means the first message emits commit requests for 1..1, 2..2, 3..3, while the second strategy emits requests for 4..10, 10..20. it's then the job of the commit callback to reassemble those ranges, figure out that all of those can be constructed into a larger range 1..20, and since that range covers all offsets since the last commit, 20 can be committed.

I don't think that is enough. There is no guarantee that the offset space is contiguous.

  • Kafka only guarantees that it is incresing and strictly monotonic: if message b is produced after message a then offset b is greater than offset a as long as they are on the same partition. It does not imply that offset b = offset a + 1. In fact kafka skips offsets.
  • We may have a batching step before the router that coalesces messages so, the sequence of messages the router receives has holes.

In order for the range idea to work, the commit callback still needs to track each offset.

The other part I am not sure I understand is how would the router generate the range to attach to the routed message ?
Let's say the router receives offsets 1,3,5,7 in sequence and it routes them all to the same route. Let's say the strategy only commits after the last offset.
Would we attach these ranges to the messages? 1-1, 1-3, 1-5, 1-7? In that case, then we receive the commit we would only see commit 1-7 which would be good. Though what if, after sending range 1-7 we receive a commit for 1-5. and then a commit for 1-7. Now the callback needs to deal with overlapping ranges.

@untitaker
Copy link
Member

untitaker commented May 30, 2024

I don't think that is enough. There is no guarantee that the offset space is contiguous.

the commit callback would buffer up all offsets until there is a contiguous range that can be committed. the memory usage would be somewhat hard to control but I think it can be done.

Would we attach these ranges to the messages? 1-1, 1-3, 1-5, 1-7

that is the idea, we change the messasge struct to contain ranges. but the initial messages extracted from kafka don't contain 1-1, 1-2, 1-3, they contain 1-1, 2-2, 3-3. each message contains the individual offsets it represents, just compressed into a set of ranges. later, in reduce for example, message 1-1 and 2-2 are folded into message with 1-2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants