Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ACK when receiving malformed events #906

Merged
merged 2 commits into from
Aug 3, 2023

Conversation

mattdowdell
Copy link
Contributor

@mattdowdell mattdowdell commented Jun 4, 2023

My team faced much the same issue as outlined in #757; malformed events are sent to a Kafka topic and clients endlessly fail to read the event. While this is hard to induce when the sender uses the Go CloudEvents SDK, there are a good amount of Python clients across our services which unfortunately lack validation that might prevent this.

I've elected to make this behaviour configurable via client options, as suggested in #757. This would be appropriate to use when no protocol.Responder implementation is available, as is the case with the kafka_sarama module. I explored wrapping the existing protocol.Receiver implementation to allow it to behave like protocol.Responder, but that ended up being a lot of code compared to the light touch that could be applied here.

@mattdowdell mattdowdell marked this pull request as ready for review June 5, 2023 06:01
@embano1
Copy link
Member

embano1 commented Jun 5, 2023

Thx for the contribution @mattdowdell!

Generally this LGTM, however, taking a step back for a second, I wonder whether there are more (similar) error scenarios which can lead to invokers/responders become stuck which we should generalize instead of implementing different error scenarios with additional fields/Options?

@duglin @lionelvillard can we think of more error cases where we'd not ACK and causing such infinite loops? Obviously, this is dependent on the protocol implementation, i.e., how it interprets NACK, but IIRC we had another issue with HTTP where the sender was stuck in an endless loop due to NACK from the consumer/receiver.

@@ -14,7 +14,7 @@ import (
)

func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) {
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil?
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false /*ackMalformedEvents*/) //TODO(slinkydeveloper) maybe not nil?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing, but are people ok with removing the /*ackMalformedEvents*/ thingy? Either do it for all params on all functions calls or remove it here. Let's be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as requested 👍

@duglin
Copy link
Contributor

duglin commented Jun 5, 2023

can we get a test (or two) for this?

@duglin
Copy link
Contributor

duglin commented Jun 5, 2023

Thx for the contribution @mattdowdell!

Generally this LGTM, however, taking a step back for a second, I wonder whether there are more (similar) error scenarios which can lead to invokers/responders become stuck which we should generalize instead of implementing different error scenarios with additional fields/Options?

@duglin @lionelvillard can we think of more error cases where we'd not ACK and causing such infinite loops? Obviously, this is dependent on the protocol implementation, i.e., how it interprets NACK, but IIRC we had another issue with HTTP where the sender was stuck in an endless loop due to NACK from the consumer/receiver.

When do we normally ACK messages? It seems to me that the only time we wouldn't ACK a message is if 1) the client specifically told us not to, or 2) we think re-processing the exact same message later on would change the (unhappy) result. For example, a malformed event should always yield the same result, so retrying later (and not ACK'ing) makes no sense to me. Do we have a mechanism by which the client can tell us not to ACK? (sorry, I'm not as familiar with the code yet as I should be)

@mattdowdell
Copy link
Contributor Author

can we get a test (or two) for this?

Added a couple of unit tests. I'm more used to testify's assert/require helpers, so I might not have used the most idiomatic implementation when using just the stdlib.

@mattdowdell
Copy link
Contributor Author

Generally this LGTM, however, taking a step back for a second, I wonder whether there are more (similar) error scenarios which can lead to invokers/responders become stuck which we should generalize instead of implementing different error scenarios with additional fields/Options?

I'm on board with a more general solution if there's a proposed design. Let me know what you guys agree on and I'll take a shot at an implementation.

For example, a malformed event should always yield the same result, so retrying later (and not ACK'ing) makes no sense to me. Do we have a mechanism by which the client can tell us not to ACK?

I haven't seen anything that would support a configurable ACK/NACK in these conditions from digging through the code so far. But I guess being told to not ACK would be the inverse of the option I've added here. If you're ok with the behaviour breaking change on the basis that it's very unlikely to be what users need, I'm happy to flip it.

@embano1
Copy link
Member

embano1 commented Jun 7, 2023

When do we normally ACK messages? It seems to me that the only time we wouldn't ACK a message is if 1) the client specifically told us not to, or 2) we think re-processing the exact same message later on would change the (unhappy) result. For example, a malformed event should always yield the same result, so retrying later (and not ACK'ing) makes no sense to me. Do we have a mechanism by which the client can tell us not to ACK? (sorry, I'm not as familiar with the code yet as I should be)

I went through the code, not sure I 100% follow, but my understanding is that we (N)ACK:

@mattdowdell

With your new implementation, I'm assuming this leads to an HTTP 200 if ackMalformedEvent is true, hence no retries. Are there any warnings/errors logged besides the optional tracing in observabilityService? From the current code on ServeHTTP I see that if we ACK for an invalid event (when ackMalformedEvent is true) and was wondering whether we should add some logging/debugging here? Just curious on what you think.

@mattdowdell
Copy link
Contributor Author

mattdowdell commented Jun 7, 2023

With your new implementation, I'm assuming this leads to an HTTP 200 if ackMalformedEvent is true, hence no retries.

I'm not overly familiar with how the HTTP protocol works, but based on a scan over the response func in https://github.com/cloudevents/sdk-go/blob/main/v2/protocol/http/protocol.go#L350-L398, returning a 200 for this particular ACK looks very likely. However, I'd note that the HTTP protocol seems to function based on a protocol.Responder implementation rather than a protocol.Receiver which we see in the kafka protocol. I could restrict ackMalformedEvent to only apply when a responder is not being used if you'd consider that to be less confusing for consumers? That would enforce what the docs on the new option already imply.

Are there any warnings/errors logged besides the optional tracing in observabilityService?

There's a call to the observabilityService as you say, but there's also a log call when the invoker exits in https://github.com/cloudevents/sdk-go/blob/main/v2/client/client.go#L254. That would be triggered whether the message is ACK'd or NACK'd by virtue of protocol.NewReceipt being a non-nil value. It does assume that there's a logger in the context, which does not appear to consistently be the case per https://github.com/cloudevents/sdk-go/blob/main/v2/context/logger.go#L25. But presumably failing to create a logger is exceedingly rare in practice.

@mattdowdell
Copy link
Contributor Author

@duglin I'm having trouble figuring out if the integration tests failure is something I've caused or a latent issue that pre-dates this change. Could you advise on whether it's likely I need to investigate further?

@duglin
Copy link
Contributor

duglin commented Jun 22, 2023

I reran the tests, it's a random race condition. Looks ok now.

@mattdowdell
Copy link
Contributor Author

@duglin @embano1 Is there anything else I can/should do to get this ready to merge? I'd love to see this (or equivalent functionality) available if possible.

@embano1
Copy link
Member

embano1 commented Jul 11, 2023

Sorry @mattdowdell got distracted. Can you please rebase on main to run against the latest workflows? I'll take a look at the PR later this week.

@mattdowdell mattdowdell force-pushed the ack-malformed-events branch 2 times, most recently from cb6109a to f5110d4 Compare July 11, 2023 07:49
@mattdowdell
Copy link
Contributor Author

@embano1 No problem, rebased (and squashed) as requested.


c, err := client.New(receiver, tc.opts...)
if err != nil {
t.Errorf("failed to construct client: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: t.Fatalf (fail immediately)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, fixed.

})

// wait for receive to occur
time.Sleep(time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do? also: those are places for flakiness

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

I think I added it in a previous iteration that lacked the select below. And then go test -race run by CI failed the test run because it was indeed a bit flaky, and I forgot to remove it.

@@ -490,3 +557,38 @@ func isImportantHeader(h string) bool {
}
return true
}

type mockMessage struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might want to use pre-defined mocks in v2/binding/test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint - v2/binding/test.UnknownMessage was exactly what I needed.

embano1
embano1 previously approved these changes Jul 16, 2023
Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@duglin
Copy link
Contributor

duglin commented Jul 17, 2023

Quick question (overall it looks good), when would we ever NOT want to ACK the event? Why do we think that it'll suddenly become valid later on and unblock things? It seems to me that we should only not ACK an incoming message (event) if we didn't read it all for some reason. But once it's been read fully, why would we not always ACK it?

@mattdowdell
Copy link
Contributor Author

Quick question (overall it looks good), when would we ever NOT want to ACK the event? Why do we think that it'll suddenly become valid later on and unblock things? It seems to me that we should only not ACK an incoming message (event) if we didn't read it all for some reason. But once it's been read fully, why would we not always ACK it?

I can't think of a good reason to not always ACK, but I'm also not overly familiar with the nuances of all the protocol implementations. One example I can think of: I believe AWS SQS will track the amount of NACKs and send events tha cross a NACK threshold to a DLQ for further investigation. Consistently ACKing would likely inhibit that feature which some teams might rely on. I don't know if SQS is supported by CloudEvents currently, but I saw a discussion somewhere in the cloudevents org discussing it.

From an observability side, it may also introduce difficult to debug issues. Whilst there are logs when this occurs, metrics or tracing might be the preferred mechaism for observing these failures. That can be implemented via an ObservabilityService, but that's an additional step that might not have been adopted yet.

I general, I defer to your more informed judgement on the expected usage. Perhaps this might be something to reflect upon if you reach v3 and take the opportunity for a breaking change. But there's always a risk someone is (sub)conciously depending on the current behaviour.

@duglin
Copy link
Contributor

duglin commented Jul 20, 2023

LGTM
I'd still like to explore the option of always ACK'ing but we can do that later
@lionelvillard for the merge

@duglin
Copy link
Contributor

duglin commented Aug 2, 2023

rebase needed

@embano1
Copy link
Member

embano1 commented Aug 3, 2023

Quick question (overall it looks good), when would we ever NOT want to ACK the event? Why do we think that it'll suddenly become valid later on and unblock things? It seems to me that we should only not ACK an incoming message (event) if we didn't read it all for some reason. But once it's been read fully, why would we not always ACK it?

See bullet points 3 and 4 in this comment for reasons where NACK can be invoked: #906 (comment)

My team faced much the same issue as outlined in cloudevents#757; malformed events are sent to a Kafka topic
and clients endlessly fail to read the event. While this is hard to induce when the sender uses
the Go CloudEvents SDK, there are a good amount of Python clients across our services which
unfortunately lack validation that might prevent this.

I've elected to make this behaviour configurable via client options, as suggested in cloudevents#757. This
would be appropriate to use when no `protocol.Responder` implementation is available, as is the
case with the `kafka_sarama` module. I explored wrapping the existing `protocol.Receiver`
implementation to allow it to behave like `protocol.Responder`, but that ended up being a lot of
code compared to the light touch that could be applied here.

Signed-off-by: Matthew Dowdell <matthew.dowdell@hpe.com>
- Fix typo in error message.
- Remove redundant sleep.
- Use Fatalf to stop the test early.
- Use prexisting unknown message in v2/binding/test.

Signed-off-by: Matthew Dowdell <matthew.dowdell@hpe.com>
Copy link
Member

@embano1 embano1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@embano1 embano1 merged commit 4cc8c4a into cloudevents:main Aug 3, 2023
9 checks passed
@mattdowdell mattdowdell deleted the ack-malformed-events branch August 3, 2023 06:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants