New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Service Bus] Bug fix: batching receiver upon a disconnect #13374
Changes from all commits
f450d8d
19bf713
f3cece0
af78050
4775ed0
85eda12
7e78332
a1135af
e9d199b
8f5f4d6
677fb45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,20 +6,26 @@ import { packageJsonInfo } from "./util/constants"; | |
import { | ||
ConnectionConfig, | ||
ConnectionContextBase, | ||
Constants, | ||
CreateConnectionContextBaseParameters, | ||
delay | ||
CreateConnectionContextBaseParameters | ||
} from "@azure/core-amqp"; | ||
import { TokenCredential } from "@azure/core-auth"; | ||
import { ServiceBusClientOptions } from "./constructorHelpers"; | ||
import { Connection, ConnectionEvents, EventContext, OnAmqpEvent } from "rhea-promise"; | ||
import { | ||
AmqpError, | ||
Connection, | ||
ConnectionError, | ||
ConnectionEvents, | ||
EventContext, | ||
OnAmqpEvent | ||
} from "rhea-promise"; | ||
import { MessageSender } from "./core/messageSender"; | ||
import { MessageSession } from "./session/messageSession"; | ||
import { MessageReceiver } from "./core/messageReceiver"; | ||
import { ManagementClient } from "./core/managementClient"; | ||
import { formatUserAgentPrefix } from "./util/utils"; | ||
import { getRuntimeInfo } from "./util/runtimeInfo"; | ||
import { SharedKeyCredential } from "./servicebusSharedKeyCredential"; | ||
import { ReceiverType } from "./core/linkEntity"; | ||
|
||
/** | ||
* @internal | ||
|
@@ -130,6 +136,66 @@ type ConnectionContextMethods = Omit< | |
> & | ||
ThisType<ConnectionContextInternalMembers>; | ||
|
||
/** | ||
* @internal | ||
* @hidden | ||
* Helper method to call onDetached on the receivers from the connection context upon seeing an error. | ||
*/ | ||
async function callOnDetachedOnReceivers( | ||
connectionContext: ConnectionContext, | ||
contextOrConnectionError: Error | ConnectionError | AmqpError | undefined, | ||
receiverType: ReceiverType | ||
) { | ||
const detachCalls: Promise<void>[] = []; | ||
|
||
for (const receiverName of Object.keys(connectionContext.messageReceivers)) { | ||
const receiver = connectionContext.messageReceivers[receiverName]; | ||
if (receiver && receiver.receiverType === receiverType) { | ||
logger.verbose( | ||
"[%s] calling detached on %s receiver '%s'.", | ||
connectionContext.connection.id, | ||
receiver.receiverType, | ||
receiver.name | ||
); | ||
detachCalls.push( | ||
receiver.onDetached(contextOrConnectionError).catch((err) => { | ||
logger.logError( | ||
err, | ||
"[%s] An error occurred while calling onDetached() on the %s receiver '%s'", | ||
connectionContext.connection.id, | ||
receiver.receiverType, | ||
receiver.name | ||
); | ||
}) | ||
); | ||
} | ||
} | ||
|
||
return Promise.all(detachCalls); | ||
} | ||
|
||
/** | ||
* @internal | ||
* @hidden | ||
* Helper method to get the number of receivers of specified type from the connectionContext. | ||
*/ | ||
async function getNumberOfReceivers( | ||
connectionContext: Pick<ConnectionContext, "messageReceivers" | "messageSessions">, | ||
receiverType: ReceiverType | ||
) { | ||
if (receiverType === "session") { | ||
const receivers = connectionContext.messageSessions; | ||
return Object.keys(receivers).length; | ||
} | ||
const receivers = connectionContext.messageReceivers; | ||
const receiverNames = Object.keys(receivers); | ||
const count = receiverNames.reduce( | ||
(acc, name) => (receivers[name].receiverType === receiverType ? ++acc : acc), | ||
0 | ||
); | ||
return count; | ||
} | ||
|
||
/** | ||
* @internal | ||
* @hidden | ||
|
@@ -325,7 +391,6 @@ export namespace ConnectionContext { | |
// by cleaning up the timers and closing the links. | ||
// We don't call onDetached for sender after `refreshConnection()` | ||
// because any new send calls that potentially initialize links would also get affected if called later. | ||
// TODO: do the same for batching receiver | ||
logger.verbose( | ||
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` + | ||
`senders. We should not reconnect.` | ||
|
@@ -354,47 +419,51 @@ export namespace ConnectionContext { | |
await Promise.all(detachCalls); | ||
} | ||
|
||
// Calling onDetached on batching receivers for the same reasons as sender | ||
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching"); | ||
if (!state.wasConnectionCloseCalled && numBatchingReceivers) { | ||
logger.verbose( | ||
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` + | ||
`batching receivers. We should reconnect.` | ||
); | ||
|
||
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation | ||
await callOnDetachedOnReceivers( | ||
connectionContext, | ||
connectionError || contextError, | ||
"batching" | ||
); | ||
|
||
// TODO: | ||
// `callOnDetachedOnReceivers` handles "connectionContext.messageReceivers". | ||
// ...What to do for sessions (connectionContext.messageSessions) ?? | ||
HarshaNalluru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
await refreshConnection(connectionContext); | ||
waitForConnectionRefreshResolve(); | ||
waitForConnectionRefreshPromise = undefined; | ||
// The connection should always be brought back up if the sdk did not call connection.close() | ||
// and there was at least one receiver link on the connection before it went down. | ||
logger.verbose("[%s] state: %O", connectionContext.connectionId, state); | ||
if (!state.wasConnectionCloseCalled && state.numReceivers) { | ||
|
||
// Calling onDetached on streaming receivers | ||
const numStreamingReceivers = getNumberOfReceivers(connectionContext, "streaming"); | ||
if (!state.wasConnectionCloseCalled && numStreamingReceivers) { | ||
logger.verbose( | ||
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numReceivers} ` + | ||
`receivers. We should reconnect.` | ||
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numStreamingReceivers} ` + | ||
`streaming receivers. We should reconnect.` | ||
); | ||
await delay(Constants.connectionReconnectDelay); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed the delay for all kinds of receivers. |
||
|
||
const detachCalls: Promise<void>[] = []; | ||
|
||
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation | ||
// and streaming receivers can decide whether to reconnect or not. | ||
for (const receiverName of Object.keys(connectionContext.messageReceivers)) { | ||
const receiver = connectionContext.messageReceivers[receiverName]; | ||
if (receiver) { | ||
logger.verbose( | ||
"[%s] calling detached on %s receiver '%s'.", | ||
connectionContext.connection.id, | ||
receiver.receiverType, | ||
receiver.name | ||
); | ||
detachCalls.push( | ||
receiver.onDetached(connectionError || contextError).catch((err) => { | ||
logger.logError( | ||
err, | ||
"[%s] An error occurred while calling onDetached() on the %s receiver '%s'", | ||
connectionContext.connection.id, | ||
receiver.receiverType, | ||
receiver.name | ||
); | ||
}) | ||
); | ||
} | ||
} | ||
|
||
await Promise.all(detachCalls); | ||
// Calling `onDetached()` on streaming receivers after the refreshConnection() since `onDetached()` would | ||
// recover the streaming receivers and that would only be possible after the connection is refreshed. | ||
// | ||
// This is different from the batching receiver since `onDetached()` for the batching receiver would | ||
// return the outstanding messages and close the receive link. | ||
await callOnDetachedOnReceivers( | ||
connectionContext, | ||
connectionError || contextError, | ||
"streaming" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling onDetached on streaming receivers after the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Walking through this mentally it seems like it should be fine. I believe the sequence we'll end up with is something like this: 'connection refreshing promise started'
Does that match your analysis? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One correction.. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for sharing this reasoning! Any chance you can put it as a comment as well for our future selves? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added more comments. :) |
||
); | ||
} | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,7 +93,7 @@ export class BatchingReceiver extends MessageReceiver { | |
); | ||
} | ||
|
||
await this._batchingReceiverLite.close(connectionError); | ||
this._batchingReceiverLite.close(connectionError); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at this now it seems 'close' is a bit misnamed - it's more like "cancel" or something of that nature. (ie, it doesn't change the link state at all). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (not something you need to fix in this PR, just curious if you agree) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm... maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logged #13390 |
||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this affect session receivers at all? Just want to make sure we're still calling onDetached on all receivers like we used to now that we're checking receiverType as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onDetached
was(and is) only called for the streaming and batching receivers.(Session receivers can only be accessed through
connectionContext.messageSessions
. We never calledonDetached
for sessions, there is noonDetached
method that is implemented for sessions.)Previously,
onDetached
was called for "batching" and "streaming" receivers after the refresh connection.Now,
onDetached
is called for "batching" before the refresh connection, unlike the streaming receivers.I left a TODO to investigate further on the "sessions" part, I'll log an issue for it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is this related issue #8875 which intends to deal with the work related to disconnects for sessions, that issue should take care of the remaining work for sessions, not logging a new issue.
Added a comment with questions - #8875 (comment)