Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Fix MaxListenersExceeded for management client #11738

62 changes: 38 additions & 24 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,20 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
target: { address: this.replyTo },
onSessionError: (context: EventContext) => {
const id = context.connection.options.id;
const ehError = translate(context.session!.error!);
const sbError = translate(context.session!.error!);
logError(
ehError,
sbError,
"[%s] An error occurred on the session for request/response links for " +
"$management: %O",
id,
ehError
sbError
);
}
};
const sropt: SenderOptions = { target: { address: this.address } };

// If multiple parallel requests reach here, the initLink secures a lock
// which ensures that there won't be multiple initializations
await this.initLink(
{
senderOptions: sropt,
Expand All @@ -248,31 +250,43 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
abortSignal
);

this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => {
const id = context.connection.options.id;
const ehError = translate(context.sender!.error!);
logError(
ehError,
"[%s] An error occurred on the $management sender link.. %O",
id,
ehError
);
});
this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const id = context.connection.options.id;
const ehError = translate(context.receiver!.error!);
logError(
ehError,
"[%s] An error occurred on the $management receiver link.. %O",
id,
ehError
);
});
// Attach listeners for the `sender_error` and `receiver_error` events to log the errors.
// - It is possible that the previous "_init" call had already added the listeners
// (example: parallel _init calls can cause this),
// hence checking the count of the listeners and adding them only if they're not present.
const senderErrorListenerCount = this.link?.sender.listenerCount(SenderEvents.senderError);
const receiverErrorListenerCount = this.link?.receiver.listenerCount(
ReceiverEvents.receiverError
);
if (senderErrorListenerCount && senderErrorListenerCount < 1) {
this.link!.sender.on(SenderEvents.senderError, (context: EventContext) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are options that you can pass to initLink for callbacks that are intended to be only set once - if the ones you're setting aren't there we should add them and register them there.

That should eliminate the need to do this kind of checking.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did look into that. There are complications, however.

"rhea" doesn't allow setting only the "onError" handler in the options if it is not accompanied by an "onMessage" handler.
image

Our onMessage handler("message" event) is set at core-amqp and we are trying to pass the onError handler in the options.
If we want to pass both onError and onMessage through core-amqp, we'd need to update the core-amqp's logic drastically, which would also impact event-hubs I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have the lock to protect you here so you can do one-time init-only modification to the receiver.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, updated createRheaLink instead to add the listener and thus, this would happen as a one-time init-only modification.

const id = context.connection.options.id;
const sbError = translate(context.sender!.error!);
logError(
sbError,
"[%s] An error occurred on the $management sender link.. %O",
id,
sbError
);
});
}
if (receiverErrorListenerCount && receiverErrorListenerCount < 1) {
this.link!.receiver.on(ReceiverEvents.receiverError, (context: EventContext) => {
const id = context.connection.options.id;
const sbError = translate(context.receiver!.error!);
logError(
sbError,
"[%s] An error occurred on the $management receiver link.. %O",
id,
sbError
);
});
}
} catch (err) {
err = translate(err);
logError(
err,
"[%s] An error occured while establishing the $management links: %O",
"[%s] An error occurred while establishing the $management links: %O",
this._context.connectionId,
err
);
Expand Down