Skip to content

Commit

Permalink
Fixed the issue of ServiceBusReceiverClient recreating the link after…
Browse files Browse the repository at this point in the history
… closing the client (#34665)

* Fixed the issue of ServiceBusReceiverClient recreating the link after closing the client

* Changelog update

* unit tests to assert expected error from Iterable iteration.
  • Loading branch information
anuchandy committed Apr 28, 2023
1 parent 4dc93b8 commit 6db1030
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- Fixed issue where receiving messages from `ServiceBusSessionReceiverAsyncClient` would never complete. ([#34597](https://github.com/Azure/azure-sdk-for-java/issues/34597))
- Fixed issue causing some messages to not be returned when calling peek on receiver client.
- Fixed the issue of `ServiceBusReceiverClient` recreating the link after closing the client. ([#34664](https://github.com/Azure/azure-sdk-for-java/issues/34664))

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
*/
@Override
public void close() {
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ protected void hookOnError(Throwable throwable) {
dispose("Errors occurred upstream", throwable);
}

@Override
protected void hookOnComplete() {
dispose("Upstream signaled completion", null);
}

@Override
protected void hookOnCancel() {
this.dispose();
Expand Down Expand Up @@ -397,4 +402,3 @@ int getWorkQueueSize() {
return this.workQueue.size();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
Expand All @@ -17,6 +19,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opentest4j.AssertionFailedError;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
Expand All @@ -43,6 +46,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -859,4 +863,98 @@ void setSessionState() {
// Assert
verify(asyncClient).setSessionState(SESSION_ID, contents);
}

@Test
void iterationReplaysUpstreamTerminalError() {
// Arrange
final int messageCount = 10;
final AmqpException terminalError = new AmqpException(false, "non-retriable terminal error.", new AmqpErrorContext("contoso.com"));
final AtomicInteger state = new AtomicInteger(0);

Flux<ServiceBusReceivedMessage> messageSink = Flux.create(sink -> {
sink.onRequest(r -> {
final int s = state.getAndIncrement();
if (s == 0) {
for (int m = 0; m < r; m++) {
sink.next(mock(ServiceBusReceivedMessage.class));
if (m >= messageCount) {
sink.error(new AssertionFailedError(String.format("Received request for %d when expected to emit %d messages.", r, messageCount)));
}
}
} else if (s == 1) {
sink.error(terminalError);
} else {
sink.error((new AssertionFailedError("Unexpected request after termination.")));
}
});

sink.onCancel(() -> {
LOGGER.info("Cancelled. Completing sink.");
sink.complete();
});
});
when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink);

// Assert the first receive get messages.
final IterableStream<ServiceBusReceivedMessage> messages0 = client.receiveMessages(messageCount);
assertNotNull(messages0);
final long collected = messages0.stream().count();
assertEquals(messageCount, collected);

// Assert the second receive iteration get terminal error.
final IterableStream<ServiceBusReceivedMessage> messages1 = client.receiveMessages(messageCount);
assertNotNull(messages1);
final AmqpException e1 = assertThrows(AmqpException.class, () -> messages1.stream().count());
assertEquals(terminalError, e1);

// Assert the same terminal error 'replayed' to further receive iterations.
final IterableStream<ServiceBusReceivedMessage> messages2 = client.receiveMessages(messageCount);
assertNotNull(messages2);
final AmqpException e2 = assertThrows(AmqpException.class, () -> messages2.stream().count());
assertEquals(terminalError, e2);
}

@Test
void iterationAfterCloseEmitsError() {
// Arrange
final int messageCount = 10;
final AtomicInteger state = new AtomicInteger(0);

Flux<ServiceBusReceivedMessage> messageSink = Flux.create(sink -> {
sink.onRequest(r -> {
final int s = state.getAndIncrement();
if (s == 0) {
for (int m = 0; m < r; m++) {
sink.next(mock(ServiceBusReceivedMessage.class));
if (m >= messageCount) {
sink.error(new AssertionFailedError(String.format("Received request for %d when expected to emit %d messages.", r, messageCount)));
}
}
} else {
sink.error((new AssertionFailedError("Unexpected request after termination.")));
}
});

sink.onCancel(() -> {
LOGGER.info("Cancelled. Completing sink.");
sink.complete();
});
});
when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink);
doNothing().when(asyncClient).close();

// Assert the first receive get messages.
final IterableStream<ServiceBusReceivedMessage> messages0 = client.receiveMessages(messageCount);
assertNotNull(messages0);
final long collected = messages0.stream().count();
assertEquals(messageCount, collected);

client.close();

// Assert any iteration on iterable obtained after client close get error.
final IterableStream<ServiceBusReceivedMessage> messages1 = client.receiveMessages(messageCount);
assertNotNull(messages1);
final RuntimeException e = assertThrows(RuntimeException.class, () -> messages1.stream().count());
assertEquals("The receiver client is terminated. Re-create the client to continue receive attempt.", e.getMessage());
}
}

0 comments on commit 6db1030

Please sign in to comment.