Skip to content

Commit

Permalink
Add session idle timeout (#34700)
Browse files Browse the repository at this point in the history
* Add session idle timeout config option
  • Loading branch information
lmolkova committed May 3, 2023
1 parent fde9319 commit f079ad7
Show file tree
Hide file tree
Showing 27 changed files with 498 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static void addAttribute(io.opentelemetry.api.common.AttributesBuilder attribute
} else if (value instanceof Byte) {
attributesBuilder.put(AttributeKey.longKey(key), (Byte) value);
} else {
LOGGER.warning("Could not populate attribute with key '{}', type {} is not supported.", key, value.getClass().getName());
LOGGER.warning("Could not populate attribute with key '{}', type '{}' is not supported.", key, value.getClass().getName());
}
}
/**
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Features Added

- Added `sessionIdleTimeout` method to configure session idle timeout on `ServiceBusSessionProcessorClientBuilder`. After this time has elapsed,
the processor will close the session and attempt to process another session. ([#34700](https://github.com/Azure/azure-sdk-for-java/issues/34700))

### Breaking Changes

### Bugs Fixed
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/azure-messaging-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-reads com.azure.messaging.servicebus=com.azure.http.netty
--add-reads com.azure.messaging.servicebus=com.azure.core.tracing.opentelemetry
</javaModulesSurefireArgLine>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,39 @@
*
* @see ServiceBusReceiverAsyncClient
*/
class ReceiverOptions {
final class ReceiverOptions {
private final ServiceBusReceiveMode receiveMode;
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final Integer maxConcurrentSessions;
private final Duration maxLockRenewDuration;
private final Duration sessionIdleTimeout;

ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
static ReceiverOptions createNonSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null, null);
}

ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
static ReceiverOptions createNamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId) {
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, sessionId, null, null);
}

static ReceiverOptions createUnnamedSessionOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, Integer maxConcurrentSessions, Duration sessionIdleTimeout) {
return new ReceiverOptions(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions, sessionIdleTimeout);
}

private ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions, Duration sessionIdleTimeout) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.enableAutoComplete = enableAutoComplete;
this.sessionId = sessionId;
this.maxConcurrentSessions = maxConcurrentSessions;
this.maxLockRenewDuration = maxLockRenewDuration;
this.sessionIdleTimeout = sessionIdleTimeout;
}

/**
Expand All @@ -43,6 +55,7 @@ class ReceiverOptions {
Duration getMaxLockRenewDuration() {
return maxLockRenewDuration;
}

/**
* Gets the receive mode for the message.
*
Expand Down Expand Up @@ -98,6 +111,15 @@ public Integer getMaxConcurrentSessions() {
return maxConcurrentSessions;
}

/**
* Gets the {@code sessionIdleTimeout} to roll to another session if no messages wew be received.
*
* @return the session idle timeout.
*/
Duration getSessionIdleTimeout() {
return sessionIdleTimeout;
}

public boolean isEnableAutoComplete() {
return enableAutoComplete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.regex.Pattern;

import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
import static com.azure.messaging.servicebus.ReceiverOptions.createNonSessionOptions;
import static com.azure.messaging.servicebus.ReceiverOptions.createUnnamedSessionOptions;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;

/**
Expand Down Expand Up @@ -208,7 +209,6 @@ public final class ServiceBusClientBuilder implements
private static final String UNKNOWN = "UNKNOWN";
private static final String LIBRARY_NAME;
private static final String LIBRARY_VERSION;
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
private static final Duration MAX_LOCK_RENEW_DEFAULT_DURATION = Duration.ofMinutes(5);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusClientBuilder.class);
private final Object connectionLock = new Object();
Expand Down Expand Up @@ -1041,6 +1041,21 @@ public ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration
return this;
}

/**
* Sets the maximum amount of time to wait for a message to be received for the currently active session.
* After this time has elapsed, the processor will close the session and attempt to process another session.
* If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used.
*
* @param sessionIdleTimeout Session idle timeout.
* @return The updated {@link ServiceBusSessionProcessorClientBuilder} object.
* @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
*/
public ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) {
validateAndThrow(sessionIdleTimeout);
sessionReceiverClientBuilder.sessionIdleTimeout(sessionIdleTimeout);
return this;
}

/**
* Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
*
Expand Down Expand Up @@ -1235,6 +1250,7 @@ public final class ServiceBusSessionReceiverClientBuilder {
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
private Duration sessionIdleTimeout = null;
private SubQueue subQueue = SubQueue.NONE;

private ServiceBusSessionReceiverClientBuilder() {
Expand Down Expand Up @@ -1270,6 +1286,21 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
return this;
}

/**
* Sets the maximum amount of time to wait for a message to be received for the currently active session.
* After this time has elapsed, the processor will close the session and attempt to process another session.
* If not specified, the {@link AmqpRetryOptions#getTryTimeout()} will be used.
*
* @param sessionIdleTimeout Session idle timeout.
* @return The updated {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException If {code maxAutoLockRenewDuration} is negative.
*/
ServiceBusSessionReceiverClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout) {
validateAndThrow(sessionIdleTimeout);
this.sessionIdleTimeout = sessionIdleTimeout;
return this;
}

/**
* Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
*
Expand Down Expand Up @@ -1403,9 +1434,9 @@ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
}

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);

final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout);

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
Expand Down Expand Up @@ -1484,8 +1515,8 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
}

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);
final ReceiverOptions receiverOptions = createUnnamedSessionOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, maxConcurrentSessions, sessionIdleTimeout);

final String clientIdentifier;
if (clientOptions instanceof AmqpClientOptions) {
Expand Down Expand Up @@ -1792,7 +1823,6 @@ public final class ServiceBusReceiverClientBuilder {
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;

private ServiceBusReceiverClientBuilder() {
}

Expand Down Expand Up @@ -1967,7 +1997,7 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
}

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
final ReceiverOptions receiverOptions = createNonSessionOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete);

final String clientIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ServiceBusSessionManager implements AutoCloseable {
private final List<Scheduler> schedulers;
private final Deque<Scheduler> availableSchedulers = new ConcurrentLinkedDeque<>();
private final Duration maxSessionLockRenewDuration;
private final Duration sessionIdleTimeout;

/**
* SessionId to receiver mapping.
Expand Down Expand Up @@ -107,6 +108,9 @@ class ServiceBusSessionManager implements AutoCloseable {
this.processor = EmitterProcessor.create(numberOfSchedulers, false);
this.sessionReceiveSink = processor.sink();
this.receiveLink = receiveLink;
this.sessionIdleTimeout = receiverOptions.getSessionIdleTimeout() != null
? receiverOptions.getSessionIdleTimeout()
: connectionProcessor.getRetryOptions().getTryTimeout();
}

ServiceBusSessionManager(String entityPath, MessagingEntityType entityType,
Expand Down Expand Up @@ -348,8 +352,8 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
}

return new ServiceBusSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(),
receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this::renewSessionLock,
maxSessionLockRenewDuration);
receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock,
maxSessionLockRenewDuration, disposeOnIdle ? sessionIdleTimeout : null);
})))
.flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> {
LOGGER.atVerbose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {
* @param messageSerializer Serializes and deserializes messages from Service Bus.
* @param retryOptions Retry options for the receiver.
* @param prefetch Number of messages to prefetch from session.
* @param disposeOnIdle true to dispose the session receiver if there are no more messages and the receiver is
* idle.
* @param scheduler The scheduler to publish messages on.
* @param renewSessionLock Function to renew the session lock.
* @param maxSessionLockRenewDuration Maximum time to renew the session lock for. {@code null} or {@link
* Duration#ZERO} to disable session lock renewal.
* @param sessionIdleTimeout Timeout after which session receiver will be disposed if there are no more messages
* and the receiver is idle. Set it to {@code null} to not dispose receiver.
*/
ServiceBusSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer,
AmqpRetryOptions retryOptions, int prefetch, boolean disposeOnIdle, Scheduler scheduler,
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration) {
AmqpRetryOptions retryOptions, int prefetch, Scheduler scheduler,
Function<String, Mono<OffsetDateTime>> renewSessionLock, Duration maxSessionLockRenewDuration, Duration sessionIdleTimeout) {

this.receiveLink = receiveLink;
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);
Expand Down Expand Up @@ -146,12 +146,12 @@ class ServiceBusSessionReceiver implements AsyncCloseable, AutoCloseable {

// Creates a subscription that disposes/closes the receiver when there are no more messages in the session and
// receiver is idle.
if (disposeOnIdle) {
if (sessionIdleTimeout != null) {
this.subscriptions.add(Flux.switchOnNext(messageReceivedEmitter
.map((String lockToken) -> Mono.delay(this.retryOptions.getTryTimeout())))
.map((String lockToken) -> Mono.delay(sessionIdleTimeout)))
.subscribe(item -> {
withReceiveLinkInformation(LOGGER.atInfo())
.addKeyValue("timeout", retryOptions.getTryTimeout())
.addKeyValue("timeout", sessionIdleTimeout)
.log("Did not a receive message within timeout.");
cancelReceiveProcessor.onComplete();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.ReceiverOptions.createNamedSessionOptions;

/**
* This <b>asynchronous</b> session receiver client is used to acquire session locks from a queue or topic and create
Expand Down Expand Up @@ -136,9 +137,9 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
public Mono<ServiceBusReceiverAsyncClient> acceptNextSession() {
return tracer.traceMono("ServiceBus.acceptNextSession", unNamedSessionManager.getActiveLink().flatMap(receiveLink -> receiveLink.getSessionId()
.map(sessionId -> {
final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(),
final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(),
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
receiverOptions.isEnableAutoComplete(), sessionId, null);
receiverOptions.isEnableAutoComplete(), sessionId);
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath,
entityType, connectionProcessor, messageSerializer, newReceiverOptions,
receiveLink, identifier);
Expand Down Expand Up @@ -172,9 +173,9 @@ public Mono<ServiceBusReceiverAsyncClient> acceptSession(String sessionId) {
return monoError(LOGGER, new IllegalArgumentException("'sessionId' cannot be empty"));
}

final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(),
final ReceiverOptions newReceiverOptions = createNamedSessionOptions(receiverOptions.getReceiveMode(),
receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(),
receiverOptions.isEnableAutoComplete(), sessionId, null);
receiverOptions.isEnableAutoComplete(), sessionId);
final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType,
connectionProcessor, messageSerializer, newReceiverOptions, identifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private Throwable mapError(Throwable throwable) {
*/
@Override
public Mono<OffsetDateTime> renewMessageLock(String lockToken, String associatedLinkName) {
return isAuthorized(OPERATION_PEEK).then(createChannel.flatMap(channel -> {
return isAuthorized(ManagementConstants.OPERATION_RENEW_LOCK).then(createChannel.flatMap(channel -> {
final Message requestMessage = createManagementMessage(ManagementConstants.OPERATION_RENEW_LOCK,
associatedLinkName);
final Map<String, Object> requestBody = new HashMap<>();
Expand Down Expand Up @@ -428,7 +428,7 @@ public Mono<Void> updateDisposition(String lockToken, DispositionStatus disposit
final UUID[] lockTokens = new UUID[]{UUID.fromString(lockToken)};
return isAuthorized(OPERATION_UPDATE_DISPOSITION).then(createChannel.flatMap(channel -> {
logger.atVerbose()
.addKeyValue("lockTokens", Arrays.toString(lockTokens))
.addKeyValue("lockTokens", () -> Arrays.toString(lockTokens))
.addKeyValue(DISPOSITION_STATUS_KEY, dispositionStatus)
.addKeyValue(SESSION_ID_KEY, sessionId)
.log("Update disposition of deliveries.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic
private final Mono<OffsetDateTime> sessionLockedUntil;

public ServiceBusReactorReceiver(AmqpConnection connection, String entityPath, Receiver receiver,
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, Duration timeout,
AmqpRetryPolicy retryPolicy) {
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorProvider provider, AmqpRetryPolicy retryPolicy) {
super(connection, entityPath, receiver, handler, tokenManager, provider.getReactorDispatcher(),
retryPolicy.getRetryOptions());
this.receiver = receiver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Dura
protected ReactorReceiver createConsumer(String entityPath, Receiver receiver,
ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
return new ServiceBusReactorReceiver(amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager,
reactorProvider, retryOptions.getTryTimeout(), retryPolicy);
reactorProvider, retryPolicy);
}

private Mono<ServiceBusReceiveLink> createConsumer(String linkName, String entityPath,
Expand Down

0 comments on commit f079ad7

Please sign in to comment.