Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dlq): InvalidMessage exception refactored to handle multiple invalid messages #50

Merged
merged 4 commits into from
Apr 8, 2022

Conversation

rahul-kumar-saini
Copy link
Contributor

Overview

  • Currently, the DLQ only supports one message at a time to be submitted and exception handled
  • Certain processing strategies batch together messages on submit and do not actually process till later in another submit() or poll() call

Changes

  • Refactored InvalidMessage exception to InvalidMessages with the ability to throw an exception with a sequence of messages instead of just one
  • Refactored policies to handle multiple messages in an exception

Testing

  • Added an example batching strategy to the tests and tested the count policy to validate expected behavior

@rahul-kumar-saini rahul-kumar-saini requested a review from a team as a code owner March 23, 2022 20:44
@lynnagara
Copy link
Member

I am not convinced that we should be trying to solve this problem. The processing step interface is very much designed to operate on a single message at a time.

The scenario that you've described seems to be a processing strategy where each message is actually a group of submessages and we want to reject some arbitrary part of the actual message. Now this is possible only because a message can represent anything you like (including a bunch of nested messages). But this is not really how the processing steps are designed to be used, and I think a better solution to this problem would be to just compose two processing steps in such a way that a single message can easily be rejected at a time.

@evanh
Copy link
Member

evanh commented Mar 25, 2022

@lynnagara There are already examples in prod that are doing this type of batching: https://github.com/getsentry/sentry/blob/master/src/sentry/sentry_metrics/multiprocess.py#L225

Was this style of processing ever meant to be supported?

@rahul-kumar-saini
Copy link
Contributor Author

Since there are strategies that work with a batch of messages and we'd like to pass each broken message to the DLQ, I think a single exception with multiple messages might be the easiest way to do so.

a better solution to this problem would be to just compose two processing steps in such a way that a single message can easily be rejected at a time.

Does this mean the batching strategies would be modified to send each message in a batch to a "next step" for processing so individual messages could throw an exception?

@lynnagara
Copy link
Member

lynnagara commented Mar 28, 2022

@rahul-kumar-saini Can you share which other strategies you're referring to?

In the example @evanh linked about ProduceStep, it seems like everything in that processing strategy would work on a single message with the exception of this line in submit where it is passed messages in batches and has to loop through them to get the individual messages:

https://github.com/getsentry/sentry/blob/master/src/sentry/sentry_metrics/multiprocess.py#L302

So why couldn't we have the strategy that is calling this one just call next_step.submit(individual_message) for each message instead of next_step.submit(some_batch_of_message). It seems like we wouldn't have this problem then, and the strategy would be simpler as it doesn't need to know anything about the batches (which it doesn't really do anything with anyway).

@rahul-kumar-saini
Copy link
Contributor Author

Okay so I think we might be talking about two different things here, I do not want to do anything like:
dlq.submit(<batch of messages>)

The DLQ is meant to be the first processing step and will only receive one message at a time, and just forwards it one at a time to the next processing step.

The point is that at any processing step along the way after the DLQ, we can throw an exception and it should bubble back up to the main try/catch in the DLQ code.

The issue is that some processing steps/strategies (like the BatchingStrategy) will start collecting these messages and process a batch at once in parallel, so how do we handle the case where multiple messages in the batch are broken?

I agree, we should only submit one message at a time to the DLQ, however it should be able to handle an exception that is thrown with more than one message in order to handle all of the bad messages in the batch properly.

@lynnagara
Copy link
Member

lynnagara commented Mar 29, 2022

@rahul-kumar-saini I don't think we are talking about different things. I know that you wouldn't explicitly submit to a DLQ strategy. My comment was only talking about what happens in the ProduceStep processing strategy that Evan linked. If you look at that strategy it never does anything with the batch of messages - so I don't get why it can't just get passed an individual message instead of a group of them and raise a single exception.

Now I am not opposed to this strategy that handles a group of messages if we really need it, but I would like us to clarify the real use cases where this is needed first. Is there only that one example or do you have others in mind?

@rahul-kumar-saini
Copy link
Contributor Author

rahul-kumar-saini commented Mar 30, 2022

Right ok I misunderstood what you were trying to say, I think the real use case here is to avoid failing an entire batch if a single message is broken. If at any point in the processing of a message (in a batch) we throw an exception, we still want the successful messages to be processed and committed with only the bad ones being passed to the DLQ. I think if we don't collect the bad ones to throw an exception at once, the good ones would also fail?

Here's another example linked in the original ticket for this task:

https://github.com/getsentry/sentry/blob/master/src/sentry/sentry_metrics/multiprocess.py#L225
Edit: (wrong link)
https://github.com/getsentry/arroyo/blob/main/arroyo/processing/strategies/streaming/transform.py#L275

@lynnagara
Copy link
Member

That's the same example that Evan linked above and my comments were relating already to though. Is the problem is specific to this one strategy? If that is true, should we consider either a) putting this strategy next to it in the Sentry codebase instead of Arroyo, or b) just updating that strategy so it processes and rejects one message at a time instead of a batch?

@fpacifici
Copy link
Contributor

I will try to provide some more context as the actual problematic step is being missed.
https://github.com/getsentry/arroyo/blob/main/arroyo/processing/strategies/streaming/transform.py#L275
This step receives individual messages. It does not process them on submit. It batches them on submit. Then when a batch is ready it is dispatched to a subprocess. The subprocess process the entire batch. The calls to poll ensure that, when a batch is ready and processed, each individual message is propagated to the next step.

The relevant use case in sentry is this one, not the produce step https://github.com/getsentry/sentry/blob/master/src/sentry/sentry_metrics/multiprocess.py#L481-L488
It is a slight variation of the one above (transform.py). In this case batches are created by a separate steps. Then the following step takes batches (each message is a batch). It process the batch in one shot (this is mandatory) and then it deliver it to the producer. Indeed the last step could receive individual messages and not batch but the produce one is not the problem. The problem is the step between batching and producing which has to processes batches.

Now let's take a step back and talk about requirements.
We have processing steps that process one batch per call (to the poll method) instead of one message per call to submit or per call to poll.
In these cases (which are very common as the parallel consumer implies this case) we should be able to intercept individual invalid message from the batch, isolate them and still process the rest of the batch.

I think the approach in this PR may have a flaw. The type of message attached to the exception is not the same type of message the submit call takes if the submit call takes the entire batch. We should look into this problem.

@lynnagara
Copy link
Member

The type of message attached to the exception is not the same type of message the submit call takes if the submit call takes the entire batch. We should look into this problem.

This is the part that feels uncanny about this to me. The interface of the strategy exclusively deals with individual messages (that just happen to get executed in a batch... but IMO this detail should should stay internal to the strategy). Just a wild idea I'll put out there: could we just maybe throw one at a time and have the DLQ strategy make subsequent calls to poll() until it has collected all the InvalidMessage exceptions

@fpacifici
Copy link
Contributor

The interface of the strategy exclusively deals with individual messages

Not always, see the examples above. We need to cover both cases: when a step receives a batch and when it receives individual messages and batches them. This case https://github.com/getsentry/sentry/blob/master/src/sentry/sentry_metrics/multiprocess.py#L481-L488 could potentially be turned into a case where the processing step also takes care of batching, though that seems a work around. The streaming pipeline is designed to be able to change the message type from one step to the next so we should support that in the dead letter queue.

@lynnagara
Copy link
Member

lynnagara commented Apr 4, 2022

My point is just that the concept of a "batch" exists only because a message can be anything you like including a grouping of messages. But it feels to me quite specific to those processing strategies, and I am not sure if it's reusable and generic enough to live in Arroyo rather than next to the code that is actually creating those batches of that type. Anyway, I am not going to block this PR on this issue, just my $0.02.

@rahul-kumar-saini
Copy link
Contributor Author

@lynnagara

could we just maybe throw one at a time and have the DLQ strategy make subsequent calls to poll() until it has collected all the InvalidMessage exceptions

Would the processing of the batch not stop entirely as soon as one exception is thrown that goes over let's say a count limit?
I don't know how to tell the DLQ that there is a batch of messages being processed and to collect till the batch is done processing and only then handle the collected bad messages.

@rahul-kumar-saini
Copy link
Contributor Author

@lynnagara @fpacifici

I've changed the type for InvalidMessagesException to Sequence[Any] from Sequence[Message[TPayload]] following yesterdays discussion. Let me know if there's anything we still need to consider or if this can be merged.

Copy link
Member

@lynnagara lynnagara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit:

@rahul-kumar-saini rahul-kumar-saini merged commit f81aa44 into main Apr 8, 2022
@rahul-kumar-saini rahul-kumar-saini deleted the fix/multiple-invalid-messages branch April 8, 2022 16:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants