Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Oct 6, 2022
1 parent d9d784c commit 95a8912
Show file tree
Hide file tree
Showing 15 changed files with 817 additions and 276 deletions.
Expand Up @@ -31,20 +31,16 @@ public final class ServiceBusMessageBatch {
private final byte[] eventBytes;
private int sizeInBytes;
private final ServiceBusTracer tracer;
private final String entityPath;
private final String hostname;

ServiceBusMessageBatch(int maxMessageSize, ErrorContextProvider contextProvider, ServiceBusTracer tracer,
MessageSerializer serializer, String entityPath, String hostname) {
MessageSerializer serializer) {
this.maxMessageSize = maxMessageSize;
this.contextProvider = contextProvider;
this.serializer = serializer;
this.serviceBusMessageList = new ArrayList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracer = tracer;
this.entityPath = entityPath;
this.hostname = hostname;
}

/**
Expand Down
Expand Up @@ -32,10 +32,8 @@
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -606,7 +604,7 @@ Mono<ServiceBusReceivedMessage> peekMessage(String sessionId) {
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peek")));
}

Mono<ServiceBusReceivedMessage> messages = connectionProcessor
Mono<ServiceBusReceivedMessage> result = connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> {
final long sequence = lastPeekedSequenceNumber.get() + 1;
Expand All @@ -629,7 +627,7 @@ Mono<ServiceBusReceivedMessage> peekMessage(String sessionId) {
sink.next(message);
});

return instrumentation.instrumentReceive("ServiceBus.peekMessage", false, messages);
return tracer.traceManagementReceive("ServiceBus.peekMessage", result, ServiceBusReceivedMessage::getContext);
}

/**
Expand Down Expand Up @@ -666,11 +664,12 @@ Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber, String sessionI
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
}

return instrumentation.instrumentReceive("ServiceBus.peekMessage", false,
return tracer.traceManagementReceive("ServiceBus.peekMessage",
connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(node -> node.peek(sequenceNumber, sessionId, getLinkName(sessionId)))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)));
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)),
ServiceBusReceivedMessage::getContext);
}

/**
Expand All @@ -686,7 +685,7 @@ Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber, String sessionI
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-browsing">Message browsing</a>
*/
public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages) {
return instrumentation.instrumentReceive("ServiceBus.peekMessages", false,
return tracer.traceSyncReceive("ServiceBus.peekMessages",
peekMessages(maxMessages, receiverOptions.getSessionId()));
}

Expand Down Expand Up @@ -786,7 +785,7 @@ Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumbe
return fluxError(LOGGER, new IllegalArgumentException("'maxMessages' is not positive."));
}

return instrumentation.instrumentReceive("ServiceBus.peekMessages", false,
return tracer.traceSyncReceive("ServiceBus.peekMessages",
connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMapMany(node -> node.peek(sequenceNumber, sessionId, getLinkName(sessionId), maxMessages))
Expand Down Expand Up @@ -920,7 +919,7 @@ Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber, Stri
}

return
instrumentation.instrumentReceive("ServiceBus.receiveDeferredMessage", true,
tracer.traceManagementReceive("ServiceBus.receiveDeferredMessage",
connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(node -> node.receiveDeferredMessages(receiverOptions.getReceiveMode(),
Expand All @@ -937,7 +936,8 @@ sessionId, getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()

return receivedMessage;
})
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)));
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)),
ServiceBusReceivedMessage::getContext);
}

/**
Expand All @@ -953,7 +953,7 @@ sessionId, getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()
* @throws ServiceBusException if deferred messages cannot be received.
*/
public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers) {
return instrumentation.instrumentReceive("ServiceBus.receiveDeferredMessages", true,
return tracer.traceSyncReceive("ServiceBus.receiveDeferredMessages",
receiveDeferredMessages(sequenceNumbers, receiverOptions.getSessionId()));
}

Expand Down Expand Up @@ -1038,7 +1038,7 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
return monoError(LOGGER, new IllegalStateException(errorMessage));
}

return tracer.traceMonoWithLink("ServiceBus.renewMessageLock", renewMessageLock(message.getLockToken()), message)
return tracer.traceMonoWithLink("ServiceBus.renewMessageLock", renewMessageLock(message.getLockToken()), message, message.getContext())
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

Expand Down Expand Up @@ -1103,7 +1103,7 @@ public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration m
renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration),
operation);

return tracer.traceMonoWithLink("ServiceBus.renewMessageLock", operation.getCompletionOperation(), message)
return tracer.traceMonoWithLink("ServiceBus.renewMessageLock", operation.getCompletionOperation(), message, message.getContext())
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

Expand Down Expand Up @@ -1185,7 +1185,7 @@ public Mono<ServiceBusTransactionContext> createTransaction() {
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction")));
}

return tracer.traceMonoWithLink("ServiceBus.commitTransaction", connectionProcessor
return tracer.traceMono("ServiceBus.commitTransaction", connectionProcessor
.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME))
.flatMap(transactionSession -> transactionSession.createTransaction())
.map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId())))
Expand Down Expand Up @@ -1238,7 +1238,7 @@ public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionCont
return monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null."));
}

return tracer.traceMonoWithLink("ServiceBus.commitTransaction", connectionProcessor
return tracer.traceMono("ServiceBus.commitTransaction", connectionProcessor
.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME))
.flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(
transactionContext.getTransactionId()))))
Expand Down Expand Up @@ -1290,7 +1290,7 @@ public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionCo
return monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null."));
}

return tracer.traceMonoWithLink("ServiceBus.rollbackTransaction", connectionProcessor
return tracer.traceMono("ServiceBus.rollbackTransaction", connectionProcessor
.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME))
.flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(
transactionContext.getTransactionId()))))
Expand Down Expand Up @@ -1462,7 +1462,7 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
}
}

return instrumentation.instrumentSettlement(updateDispositionOperation, message, dispositionStatus)
return instrumentation.instrumentSettlement(updateDispositionOperation, message, message.getContext(), dispositionStatus)
.onErrorMap(throwable -> {
if (throwable instanceof ServiceBusException) {
return throwable;
Expand Down Expand Up @@ -1584,7 +1584,7 @@ Mono<OffsetDateTime> renewSessionLock(String sessionId) {
? sessionManager.getLinkName(sessionId)
: null;

return tracer.traceMonoWithLink("ServiceBus.renewSessionLock", connectionProcessor
return tracer.traceMono("ServiceBus.renewSessionLock", connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName)))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
Expand All @@ -1611,7 +1611,7 @@ Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
maxLockRenewalDuration, true, this::renewSessionLock);

renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
return tracer.traceMonoWithLink("ServiceBus.renewSessionLock", operation.getCompletionOperation())
return tracer.traceMono("ServiceBus.renewSessionLock", operation.getCompletionOperation())
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

Expand All @@ -1626,7 +1626,7 @@ Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
? sessionManager.getLinkName(sessionId)
: null;

return tracer.traceMonoWithLink("ServiceBus.setSessionState", connectionProcessor
return tracer.traceMono("ServiceBus.setSessionState", connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)))
.onErrorMap((err) -> mapError(err, ServiceBusErrorSource.RECEIVE));
Expand All @@ -1650,7 +1650,7 @@ Mono<byte[]> getSessionState(String sessionId) {
.flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId)));
}

return tracer.traceMonoWithLink("ServiceBus.setSessionState", result)
return tracer.traceMono("ServiceBus.setSessionState", result)
.onErrorMap((err) -> mapError(err, ServiceBusErrorSource.RECEIVE));
}

Expand Down
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
Expand Down Expand Up @@ -65,7 +66,7 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
/* To ensure synchronousMessageSubscriber is subscribed only once. */
private final AtomicBoolean syncSubscribed = new AtomicBoolean(false);

private final ServiceBusReceiverInstrumentation instrumentation;
private final ServiceBusTracer tracer;
/**
* Creates a synchronous receiver given its asynchronous counterpart.
*
Expand All @@ -79,7 +80,7 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
this.operationTimeout = Objects.requireNonNull(operationTimeout, "'operationTimeout' cannot be null.");
this.isPrefetchDisabled = isPrefetchDisabled;
this.instrumentation = asyncClient.getInstrumentation();
this.tracer = asyncClient.getInstrumentation().getTracer();
}

/**
Expand Down Expand Up @@ -398,7 +399,7 @@ IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, String s
final Flux<ServiceBusReceivedMessage> messages = asyncClient.peekMessages(maxMessages, sessionId)
.timeout(operationTimeout);

final Flux<ServiceBusReceivedMessage> tracedMessages = instrumentation.instrumentReceive("ServiceBus.peekMessages", false, messages);
final Flux<ServiceBusReceivedMessage> tracedMessages = tracer.traceSyncReceive("ServiceBus.peekMessages", messages);
// Subscribe to message flux so we can kick off this operation, but not to tracing - caller subscriber will take care of it.
messages.subscribe();

Expand Down Expand Up @@ -449,7 +450,7 @@ IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long seq
final Flux<ServiceBusReceivedMessage> messages = asyncClient.peekMessages(maxMessages, sequenceNumber,
sessionId).timeout(operationTimeout);

final Flux<ServiceBusReceivedMessage> tracedMessages = instrumentation.instrumentReceive("ServiceBus.peekMessages", false, messages);
final Flux<ServiceBusReceivedMessage> tracedMessages = tracer.traceSyncReceive("ServiceBus.peekMessages", messages);
// Subscribe to message flux so we can kick off this operation, but not to tracing - caller subscriber will take care of it.
messages.subscribe();

Expand Down Expand Up @@ -511,7 +512,7 @@ public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages
queueWork(maxMessages, maxWaitTime, emitter);

final Flux<ServiceBusReceivedMessage> messagesFlux = emitter.asFlux();
final Flux<ServiceBusReceivedMessage> tracedMessages = instrumentation.instrumentReceive("ServiceBus.receiveMessages", true, messagesFlux);
final Flux<ServiceBusReceivedMessage> tracedMessages = tracer.traceSyncReceive("ServiceBus.receiveMessages", messagesFlux);

// Subscribe to message flux so we can kick off this operation, but not to tracing - caller subscriber will take care of it.
messagesFlux.subscribe();
Expand Down Expand Up @@ -587,7 +588,7 @@ IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<L
final Flux<ServiceBusReceivedMessage> messages = asyncClient.receiveDeferredMessages(sequenceNumbers,
sessionId).timeout(operationTimeout);

final Flux<ServiceBusReceivedMessage> tracedMessages = instrumentation.instrumentReceive("ServiceBus.receiveDeferredMessageBatch", true, messages);
final Flux<ServiceBusReceivedMessage> tracedMessages = tracer.traceSyncReceive("ServiceBus.receiveDeferredMessageBatch", messages);
// Subscribe so we can kick off this operation.
messages.subscribe();

Expand Down

0 comments on commit 95a8912

Please sign in to comment.