-
Notifications
You must be signed in to change notification settings - Fork 107
feat(pubsub): add ability to resume publishing for Publisher #4338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4338 +/- ##
=======================================
Coverage 94.87% 94.87%
=======================================
Files 188 188
Lines 7237 7241 +4
=======================================
+ Hits 6866 6870 +4
Misses 371 371 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| pub async fn resume_publish<T: std::convert::Into<std::string::String>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a synchronous operation. We don't need for the resume to complete. Any messages that are put into the channel afterward will reach the unpaused worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While messages that are put into the channel after will reach the unpaused worker, I think there are cases where the application would want a signal that the worker is now unpaused and publishing is back to the normal behavior.
This is achievable with async or potentially a long blocking sync. My preference is for it to be async and the application choose to .await on it if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the use case. Could you explain it a bit more? If you call a synchronous (fast) resume_publish(), all publishes that occur after it will be batched and sent (until the next error of course). Why do we need to wait for the signal to reach the background worker? What additional benefit is there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Image if generating a message is expensive with a short TTL. In this case, the application may choose to await until the worker is ready again instead of having the messages potentially expire due to some other on going processes (i.e., flush).
Of course, this use case is completely imaginary but I also think that giving the application the option is beneficial. Is there a reason why sync fast resume is preferred over async version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the application may choose to await until the worker is ready again instead of having the messages potentially expire due to some other on going processes (i.e., flush).
This is interesting, but I think maybe they should be awaiting on the flush not the resume_publish in this case.
One reason is to keep resume_publish cheap, its basically flipping a bit and we can immediately know that future publishes will get batched so there is just no need to delay in saying that its done.
Also from discussing this with Alex early in the design process he thought this should be a synchronous operation (if I am remembering correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, the issue with sync is that the worker may be blocked on a long flush operation causing a large delay in the resume_publish being processed due to it awaiting on other ordering keys. The ideal case would be for resume_publish to be sync and for flush not block other operations from being processed.
I'll update resume_publish to be sync and we can update flush in a later PR.
| let got = publisher | ||
| .publish( | ||
| PubsubMessage::new() | ||
| .set_ordering_key("ordering key with error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be the same as the line 1318? without error? I'm not sure the name is that much more descriptive than just doing key1 and key2 if we want a second one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
Updated to use a variable instead.
I'll follow up this PR with a cleanup of the tests.
| got_err = publisher | ||
| .publish( | ||
| PubsubMessage::new() | ||
| .set_ordering_key("ordering key with error 0") | ||
| .set_data("msg 2"), | ||
| ) | ||
| .await | ||
| .unwrap_err(); | ||
| let source = got_err | ||
| .source() | ||
| .and_then(|e| e.downcast_ref::<crate::error::PublishError>()); | ||
| assert!( | ||
| matches!( | ||
| source, | ||
| Some(crate::error::PublishError::OrderingKeyPaused(())) | ||
| ), | ||
| "{got_err:?}" | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe this could be in a helper, its kind of verbose and these tests become hard to skim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledge.
I'll follow up this PR with a cleanup of the tests.
Co-authored-by: Suzy Mueller <suzmue@google.com>
| @@ -35,8 +35,8 @@ pub(crate) enum ToBatchWorker { | |||
| Publish(BundledMessage), | |||
| /// A request to flush all outstanding messages. | |||
| Flush(oneshot::Sender<()>), | |||
| // TODO(#4015): Add a resume function to allow resume Publishing on a ordering key after a | |||
| // failure. | |||
| /// A request to resume publishing. | |||
| ResumePublish(), | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: I'm only now noticing that ToWorker and ToBatchWorker are basically the same thing. I don't know if we can save anything performance wise from using the same type, but could be a place to consider refactoring in the future.
Fixes: #4015