Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Pass in ProcessErrorArgs to subscribe({ processError }) for extra diagnostics #11927

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
52a6e48
Adding in an errorContext parameter to processError to allow users to…
richardpark-msft Oct 19, 2020
65a16af
No only's!
richardpark-msft Oct 19, 2020
2dcc74b
Fixing incorrect help link
richardpark-msft Oct 19, 2020
1242b9d
Fixing samples to show the context parameter and use it in a basic way
richardpark-msft Oct 19, 2020
e72915f
* userCallback -=> processMessageCallback
richardpark-msft Oct 19, 2020
a8edfeb
Removing the initialize error source in favor of just using receive.
richardpark-msft Oct 19, 2020
7c24a2f
Rerun prettier
richardpark-msft Oct 19, 2020
b89c91f
Adding in tests for the primary error sources (ourselves or their cal…
richardpark-msft Oct 20, 2020
635650c
Add in a changelog entry for the new ProcessErrorContext
richardpark-msft Oct 20, 2020
9ff8552
Updating help text. Need to figure out the right spot to put it, howe…
richardpark-msft Oct 20, 2020
632e26c
Updating help text - the abandon actually happens if they throw, rega…
richardpark-msft Oct 20, 2020
c3ec056
Updating help once more.
richardpark-msft Oct 20, 2020
ec2b809
Swap the errorSource for onInitialize() to be 'receive'. This has som…
richardpark-msft Oct 20, 2020
52ffcfa
* Remove the isAmqpError checks that, for some reason, used to preven…
richardpark-msft Oct 20, 2020
075036d
Updating readme - all the links appeared to have been broken.
richardpark-msft Oct 20, 2020
031402b
Changing processError(err: error, context: ProcessErrorContext) to be…
richardpark-msft Oct 20, 2020
a90f372
Fixing samples to use new ProcessErrorArgs
richardpark-msft Oct 20, 2020
f935f5f
Missed the changelog
richardpark-msft Oct 20, 2020
363824a
Updating changelog, migration guide and readme.
richardpark-msft Oct 20, 2020
daefcc4
Fixing an area where we weren't strictly typing the compared objects …
richardpark-msft Oct 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions sdk/servicebus/service-bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,13 @@ When you are done, call `receiver.close()` to stop receiving any more messages.
```javascript
const myMessageHandler = async (message) => {
// your code here
console.log(`message.body: ${message.body}`);
};
const myErrorHandler = async (error) => {
console.log(error);
const myErrorHandler = async (error, context) => {
console.log(
`Error occurred with ${context.entityPath} within ${context.fullyQualifiedNamespace}: `,
error
);
};
receiver.subscribe({
processMessage: myMessageHandler,
Expand Down
40 changes: 39 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {

// @public
export interface MessageHandlers<ReceivedMessageT> {
processError(err: Error): Promise<void>;
processError(err: Error, context: ProcessErrorContext): Promise<void>;
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
processMessage(message: ReceivedMessageT): Promise<void>;
}

Expand Down Expand Up @@ -223,6 +223,44 @@ export interface PeekMessagesOptions extends OperationOptionsBase {
fromSequenceNumber?: Long;
}

// @public
export interface ProcessErrorContext {
entityPath: string;
errorSource: "complete"
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
/**
* Covers errors that occur when the user (or autoComplete) abandons a message.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
| "abandon"
/**
* Errors thrown from the user's `processMessage` callback passed to `subscribe`
*/
| "userCallback"
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
/**
* Errors thrown when receiving messages.
*/
| "receive"
/**
* Errors thrown when the user calls renewLock or when the internal lock renewer calls renewLock.
* Automatic lock renewal can be enabled via the `CreateReceiverOptions.maxAutoLockRenewalDurationInMs`
* property passed when calling `ServiceBusClient.createReceiver()`
*/
| "renewLock"
/**
* Broadly covers a series of phases - the actual initialization that occurs when an operation
* is first started as well as errors that occur when reconnecting.
*/
| "initialize"
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
/**
* Errors thrown when accepting a session.
*/
| "acceptSession"
/**
* Errors thrown when closing a session.
*/
| "closeSession";
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
fullyQualifiedNamespace: string;
}

// @public
export interface QueueProperties {
authorizationRules?: AuthorizationRule[];
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/src/core/autoLockRenewer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { receiverLogger as logger } from "../log";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { calculateRenewAfterDuration } from "../util/utils";
import { LinkEntity } from "./linkEntity";
import { OnError } from "./messageReceiver";
import { OnErrorNoContext } from "./messageReceiver";

/**
* @internal
Expand Down Expand Up @@ -118,7 +118,7 @@ export class LockRenewer {
*
* @param bMessage The message whose lock renewal we will start.
*/
start(linkEntity: MinimalLink, bMessage: RenewableMessageProperties, onError: OnError) {
start(linkEntity: MinimalLink, bMessage: RenewableMessageProperties, onError: OnErrorNoContext) {
try {
const logPrefix = linkEntity.logPrefix;

Expand Down
16 changes: 15 additions & 1 deletion sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { LinkEntity, ReceiverType } from "./linkEntity";
import { ConnectionContext } from "../connectionContext";
import { DispositionType, ServiceBusMessageImpl } from "../serviceBusMessage";
import { getUniqueName } from "../util/utils";
import { ReceiveMode, SubscribeOptions } from "../models";
import { ProcessErrorContext, ReceiveMode, SubscribeOptions } from "../models";
import { DispositionStatusOptions } from "./managementClient";
import { AbortSignalLike } from "@azure/core-http";
import { onMessageSettled, DeferredPromiseAndTimer } from "./shared";
Expand Down Expand Up @@ -80,7 +80,21 @@ export interface OnMessage {
export interface OnError {
/**
* Handler for any error that occurs while receiving or processing messages.
*
* NOTE: if this signature changes make sure you reflect those same changes in the
* `OnErrorNoContext` definition below.
*/
(error: MessagingError | Error, context: ProcessErrorContext): void;
}

/**
* An onError method but without the context property. Used when wrapping OnError
* with an implicit ProcessErrorContext. Used by LockRenewer.
*
* @internal
* @ignore
*/
export interface OnErrorNoContext {
(error: MessagingError | Error): void;
}

Expand Down
42 changes: 35 additions & 7 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ export class StreamingReceiver extends MessageReceiver {
"retryable, we let the user know about it by calling the user's error handler.",
this.logPrefix
);
this._onError!(sbError);
this._onError!(sbError, {
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
} else {
logger.verbose(
"%s The received error is not retryable. However, the receiver was " +
Expand Down Expand Up @@ -234,7 +238,11 @@ export class StreamingReceiver extends MessageReceiver {
"retryable, we let the user know about it by calling the user's error handler.",
this.logPrefix
);
this._onError!(sbError);
this._onError!(sbError, {
errorSource: "receive",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
};
Expand Down Expand Up @@ -262,7 +270,11 @@ export class StreamingReceiver extends MessageReceiver {

this._lockRenewer?.start(this, bMessage, (err) => {
if (this._onError) {
this._onError(err);
this._onError(err, {
errorSource: "renewLock",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
});

Expand All @@ -279,7 +291,11 @@ export class StreamingReceiver extends MessageReceiver {
bMessage.messageId,
this.name
);
this._onError!(err);
this._onError!(err, {
errorSource: "userCallback",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}

// Do not want renewLock to happen unnecessarily, while abandoning the message. Hence,
Expand Down Expand Up @@ -314,7 +330,11 @@ export class StreamingReceiver extends MessageReceiver {
bMessage.messageId,
this.name
);
this._onError!(translatedError);
this._onError!(translatedError, {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
errorSource: "abandon",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
return;
Expand Down Expand Up @@ -346,7 +366,11 @@ export class StreamingReceiver extends MessageReceiver {
bMessage.messageId,
this.name
);
this._onError!(translatedError);
this._onError!(translatedError, {
errorSource: "complete",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}
};
Expand Down Expand Up @@ -513,7 +537,11 @@ export class StreamingReceiver extends MessageReceiver {
if (typeof this._onError === "function") {
logger.verbose(`${this.logPrefix} Unable to automatically reconnect`);
try {
this._onError(err);
this._onError(err, {
errorSource: "initialize",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
} catch (err) {
logger.logError(
err,
Expand Down
15 changes: 8 additions & 7 deletions sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export {
RetryOptions,
TokenCredential,
TokenType,
WebSocketOptions
WebSocketOptions,
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
} from "@azure/core-amqp";
export { OperationOptions } from "@azure/core-http";
export { Delivery, WebSocketImpl } from "rhea-promise";
Expand All @@ -22,11 +22,12 @@ export {
AcceptSessionOptions,
GetMessageIteratorOptions,
MessageHandlers,
ProcessErrorContext,
PeekMessagesOptions,
ReceiveMessagesOptions,
ReceiveMode,
SubQueue,
SubscribeOptions
SubscribeOptions,
} from "./models";
export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
Expand All @@ -36,18 +37,18 @@ export { NamespaceProperties } from "./serializers/namespaceResourceSerializer";
export {
CreateQueueOptions,
QueueProperties,
QueueRuntimeProperties
QueueRuntimeProperties,
} from "./serializers/queueResourceSerializer";
export { RuleProperties, SqlRuleAction, SqlRuleFilter } from "./serializers/ruleResourceSerializer";
export {
CreateSubscriptionOptions,
SubscriptionProperties,
SubscriptionRuntimeProperties
SubscriptionRuntimeProperties,
} from "./serializers/subscriptionResourceSerializer";
export {
CreateTopicOptions,
TopicProperties,
TopicRuntimeProperties
TopicRuntimeProperties,
} from "./serializers/topicResourceSerializer";
export {
EntitiesResponse,
Expand All @@ -60,7 +61,7 @@ export {
SubscriptionResponse,
SubscriptionRuntimePropertiesResponse,
TopicResponse,
TopicRuntimePropertiesResponse
TopicRuntimePropertiesResponse,
} from "./serviceBusAtomManagementClient";
export { ServiceBusClient } from "./serviceBusClient";
export {
Expand All @@ -70,7 +71,7 @@ export {
DeadLetterOptions,
ServiceBusReceivedMessage,
ServiceBusReceivedMessageWithLock,
ServiceBusMessage
ServiceBusMessage,
} from "./serviceBusMessage";
export { ServiceBusMessageBatch } from "./serviceBusMessageBatch";
export { AuthorizationRule, EntityStatus, EntityAvailabilityStatus } from "./util/utils";
55 changes: 54 additions & 1 deletion sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,59 @@
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import Long from "long";

/**
* Context for the error given in processError.
*/
export interface ProcessErrorContext {
/**
* The operation where the error originated.
*/
errorSource: /**
* Covers errors that occur when the user (or autoComplete) completes a message.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
| "complete"
/**
* Covers errors that occur when the user (or autoComplete) abandons a message.
*/
| "abandon"
/**
* Errors thrown from the user's `processMessage` callback passed to `subscribe`
*/
| "userCallback"
/**
* Errors thrown when receiving messages.
*/
| "receive"
/**
* Errors thrown when the user calls renewLock or when the internal lock renewer calls renewLock.
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
* Automatic lock renewal can be enabled via the `CreateReceiverOptions.maxAutoLockRenewalDurationInMs`
* property passed when calling `ServiceBusClient.createReceiver()`
*/
| "renewLock"
/**
* Broadly covers a series of phases - the actual initialization that occurs when an operation
* is first started as well as errors that occur when reconnecting.
*/
| "initialize"
/**
* Errors thrown when accepting a session.
*/
| "acceptSession"
/**
* Errors thrown when closing a session.
*/
| "closeSession";

/**
* The entity path for the current receiver.
*/
entityPath: string;
/**
* The fully qualified namespace for the Service Bus.
*/
fullyQualifiedNamespace: string;
}

/**
* The general message handler interface (used for streamMessages).
*/
Expand All @@ -18,7 +71,7 @@ export interface MessageHandlers<ReceivedMessageT> {
* Handler that processes errors that occur during receiving.
* @param err An error from Service Bus.
*/
processError(err: Error): Promise<void>;
processError(err: Error, context: ProcessErrorContext): Promise<void>;
}

/**
Expand Down
16 changes: 13 additions & 3 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
InternalMessageHandlers
} from "../models";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceivedMessage } from "..";
import { ServiceBusReceivedMessage } from "../serviceBusMessage";
import { ConnectionContext } from "../connectionContext";
import {
getAlreadyReceivingErrorMsg,
Expand Down Expand Up @@ -262,7 +262,11 @@ export class ServiceBusReceiverImpl<
try {
await onInitialize();
} catch (err) {
onError(err);
onError(err, {
errorSource: "userCallback",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}

if (!this.isClosed) {
Expand All @@ -275,7 +279,13 @@ export class ServiceBusReceiverImpl<
return;
})
.catch((err) => {
onError(err);
// TODO: being a bit broad here but the only errors that should filter out this
// far are going to be bootstrapping the subscription.
onError(err, {
errorSource: "initialize",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
});
}

Expand Down
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,11 @@ export class ServiceBusSessionReceiverImpl<
try {
this._messageSession.subscribe(onMessage, onError, options);
} catch (err) {
onError(err);
onError(err, {
errorSource: "initialize",
entityPath: this.entityPath,
fullyQualifiedNamespace: this._context.config.host
});
}
}

Expand Down