-
Notifications
You must be signed in to change notification settings - Fork 358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub client ordering-keys #3099
Conversation
Integration tests using the emulator still TODO.
Integration tests using the emulator still TODO.
Emulator-based testing currently required, as the ordering-keys feature is not yet available on any live server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just comments for the first commit. Mostly nits of requests for comment clarifications, but I'd hesitate to say I'm confident about this. It's tricksy stuff.
@@ -182,5 +208,116 @@ public void SettingsValidation() | |||
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, 1, null) }.Validate(); | |||
new PublisherClient.Settings { BatchingSettings = new BatchingSettings(null, PublisherClient.ApiMaxBatchingSettings.ByteCountThreshold, null) }.Validate(); | |||
} | |||
|
|||
[Fact] | |||
public void OrderingKeyMustBeEnabled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test name is a bit confusing. How about "PublishingMessageWithOrderingKeyRequiresOrderingEnabled"?
(I think that's what it's testing... I could be wrong.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
And sets the record the TheLongestTestMethodNameSoFar().
// Other ordering-keys publish OK. | ||
await taskHelper.ConfigureAwait(pub.PublishAsync("ok-key", "ok")); | ||
// Including a recoverable error. | ||
await taskHelper.ConfigureAwait(pub.PublishAsync(recoverableKey, "not-an-error")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message name is a bit odd here. "Not an error" and "recoverable error" don't feel quite the same. Would "recoverable-error" be reasonable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, I meant to go back and improve these strings.
Done.
/// <summary> | ||
/// Exception thrown when publishing a message with an ordering key that is in an error state. | ||
/// </summary> | ||
public class OrderingKeyInErrorStateException : Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't suppose we've got any other types that would be a better base type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to derive from InvalidOperationException
.
@@ -340,6 +397,42 @@ public int AddMessage(PubsubMessage message, int byteCount) | |||
} | |||
} | |||
|
|||
private enum OrderingKeyState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: brace on next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -340,6 +397,42 @@ public int AddMessage(PubsubMessage message, int byteCount) | |||
} | |||
} | |||
|
|||
private enum OrderingKeyState { | |||
Normal = 0, | |||
InFlight, // In the "batches-ready" queue, or actually in-flight to/from server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these XML doc comments so they're easier to use when hovering in VS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
foreach (var batch in batchesToCancel) | ||
{ | ||
batch.BatchCompletion.SetCanceled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume it's deliberate that this is outside the lock, when it was inside before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was a bug (albeit fairly benign) before.
(SetCanceled()
may run continuations synchronously, so it's generally better to be called outside of locks, especially in this case where arbitrary user code will be in the continuation)
// Force queuing of the current batch, whatever the size. | ||
// There will always be at least one message in the batch. | ||
QueueCurrentBatch(); | ||
// Check for cancellation inside lock to avoid race-condition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment worries me given the part earlier about cancellation happening outside the lock...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous cancellation is a Task
., which when completed may run arbitrary user code.
This cancellation is a cancellation-token which will only be running code within this file. Cancellations of this token may occur when _lock
is locked, but Monitor
is re-entrant so it's OK if this continuation executes synchronously.
public List<PubsubMessage> Messages { get; } = new List<PubsubMessage>(); | ||
public CancellationTokenSource TimerCts { get; } = new CancellationTokenSource(); | ||
public List<PubsubMessage> Messages { get; private set; } = new List<PubsubMessage>(); | ||
public CancellationTokenSource ProcessedCts { get; } = new CancellationTokenSource(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "processed" mean here? I've seen it multiple times, and it's unclear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XmlDoc summary added to Batch
to explain this.
if (state.HasOrderingKey) | ||
{ | ||
// For batches with an ordering-key: Retry transient errors forever; | ||
// otherwise fail all unsent messages with the same ordering-key, then refuse any more until `ResumePublish()` is called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semi-colon in the first line is confusing, it makes me expect that the second line is about messages without an ordering key, but presumably it's about messages with an ordering key, but with a permanent error.
What about messages without an ordering key? That's covered later, but I think a single comment above the if statement that checks for the ordering key would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified.
} | ||
else | ||
{ | ||
// Prepare to fail all batches, clear all batches, and mark ordering-key as in error state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"All batches for this ordering key"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, comments on the other two commits. There are a few concerns, but not red flags - I suspect you'll be able to merge eventually without any massive changes, but it would be good to go through the ones I raised.
/// <param name="fn">The function to execute.</param> | ||
/// <returns>A Task that completes once <paramref name="fn"/> has been scheduled for execution.</returns> | ||
public async Task Process(long byteCount, Func<Task> fn) | ||
public async Task Process(long byteCount, string orderingKey, Func<Task> fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if this were internal - I hadn't spotted that it was in a private class for a bit, and worried that it would be a breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All changed to internal
.
_byteCount += byteCount; | ||
_elementCount += 1; | ||
if (orderingKey.Length > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's odd that there's no mention of what happens when there isn't an ordering key. Extra comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified.
if (nextFn.Fn != null) | ||
{ | ||
// Execute user code for the next message of this ordering-key. | ||
ExecuteFunction(orderingKey, nextFn.ByteCount, nextFn.Fn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly add a comment to explain that this isn't really recursing due to Task.Run? Otherwise it looks dangerous from a stack perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>netcoreapp2.2</TargetFramework> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set Packable=False?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
namespace Google.Cloud.PubSub.V1.OrderingKeyTester | ||
{ | ||
class Program |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be worth having a description here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary added
public string Message { get; } | ||
} | ||
|
||
static int Main(string[] args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to break this into a bunch of methods, then make Main just call the methods in turn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, split into local methods.
@jskeet PTAL |
Will look first thing in the morning, when I'm more awake. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still don't fully understand all of this, but I have no specific concerns.
@@ -526,6 +526,10 @@ internal async Task Process(long byteCount, string orderingKey, Func<Task> fn) | |||
// Add to stats for this element. | |||
_byteCount += byteCount; | |||
_elementCount += 1; | |||
// If there's no ordering-key then the user callback function can always immediately be executed | |||
// because there's no ordering constraint to meet. | |||
// If there is an ordering-key then the user callback function must be exeuted sequentially per |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: exeuted => executed
(I realize this was just a deliberate typo to check that I'm actually reviewing... ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
Thanks, will merge on green |
No description provided.