Skip to content

Commit

Permalink
Merge 4.4.6 into 4.5
Browse files Browse the repository at this point in the history
Merge 4.4.6 into 4.5, fixing merge issues like version and the switch
between FluxSinkWrapper and SinkManyWrapper
  • Loading branch information
smcvb committed Jan 20, 2021
2 parents 7a8810c + 377a079 commit 3cb03c4
Show file tree
Hide file tree
Showing 31 changed files with 1,213 additions and 522 deletions.
Expand Up @@ -73,7 +73,10 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Spliterator.*;
import static java.util.Spliterator.CONCURRENT;
import static java.util.Spliterator.DISTINCT;
import static java.util.Spliterator.NONNULL;
import static java.util.Spliterator.ORDERED;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.ObjectUtils.getOrDefault;

Expand Down Expand Up @@ -460,7 +463,8 @@ protected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializ
logger.warn("Error occurred while creating a snapshot", e);
} else if (c != null) {
if (c.getSuccess()) {
logger.info("Snapshot created");
logger.debug("Snapshot created for aggregate type {}, identifier {}",
snapshot.getType(), snapshot.getAggregateIdentifier());
} else {
logger.warn("Snapshot creation failed for unknown reason. "
+ "Check server logs for details.");
Expand Down
Expand Up @@ -118,6 +118,7 @@ public class AxonServerQueryBus implements QueryBus, Distributed<QueryBus> {
private final TargetContextResolver<? super QueryMessage<?, ?>> targetContextResolver;
private final ShutdownLatch shutdownLatch = new ShutdownLatch();
private final ExecutorService queryExecutor;
private final QueryHandler localSegmentAdapter;
private final String context;

/**
Expand Down Expand Up @@ -165,6 +166,7 @@ public AxonServerQueryBus(Builder builder) {
).reversed()
);
queryExecutor = builder.executorServiceBuilder.apply(configuration, queryProcessQueue);
localSegmentAdapter = new LocalSegmentAdapter();
}

/**
Expand All @@ -180,61 +182,11 @@ public <R> Registration subscribe(String queryName,
Type responseType,
MessageHandler<? super QueryMessage<?, R>> handler) {
Registration localRegistration = localSegment.subscribe(queryName, responseType, handler);
QueryDefinition queryDefinition = new QueryDefinition(queryName, responseType);
io.axoniq.axonserver.connector.Registration serverRegistration =
axonServerConnectionManager.getConnection(context)
.queryChannel()
.registerQueryHandler(
new QueryHandler() {
@Override
public void handle(QueryRequest query,
ReplyChannel<QueryResponse> responseHandler) {
queryExecutor.submit(new QueryProcessingTask(
localSegment, query, responseHandler, serializer,
configuration.getClientId()
));
}

@Override
public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(
SubscriptionQuery query,
UpdateHandler sendUpdate
) {

UpdateHandlerRegistration<Object> updateHandler =
updateEmitter.registerUpdateHandler(
subscriptionSerializer.deserialize(query),
1024
);

updateHandler.getUpdates()
.doOnError(e -> {
ErrorMessage error =
ExceptionSerializer.serialize(
configuration.getClientId(),
e
);
String errorCode =
ErrorCode.QUERY_EXECUTION_ERROR
.errorCode();
QueryUpdate queryUpdate =
QueryUpdate.newBuilder()
.setErrorMessage(error)
.setErrorCode(errorCode)
.build();
sendUpdate.sendUpdate(queryUpdate);
sendUpdate.complete();
})
.doOnComplete(sendUpdate::complete)
.map(subscriptionSerializer::serialize)
.subscribe(sendUpdate::sendUpdate);
return () -> {
updateHandler.getRegistration().close();
return CompletableFuture.completedFuture(null);
};
}
},
new QueryDefinition(queryName, responseType)
);
.registerQueryHandler(localSegmentAdapter, queryDefinition);

return new AxonServerRegistration(localRegistration, serverRegistration::cancel);
}
Expand Down Expand Up @@ -392,6 +344,48 @@ public CompletableFuture<Void> shutdownDispatching() {
return shutdownLatch.initiateShutdown();
}

/**
* A {@link QueryHandler} implementation serving as a wrapper around the local {@link QueryBus} to push through the
* message handling and subscription query registration.
*/
private class LocalSegmentAdapter implements QueryHandler {

@Override
public void handle(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
QueryProcessingTask processingTask = new QueryProcessingTask(
localSegment, query, responseHandler, serializer, configuration.getClientId()
);
queryExecutor.submit(processingTask);
}

@Override
public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(SubscriptionQuery query,
UpdateHandler sendUpdate) {
UpdateHandlerRegistration<Object> updateHandler =
updateEmitter.registerUpdateHandler(subscriptionSerializer.deserialize(query), 1024);

updateHandler.getUpdates()
.doOnError(e -> {
ErrorMessage error = ExceptionSerializer.serialize(configuration.getClientId(), e);
String errorCode = ErrorCode.QUERY_EXECUTION_ERROR.errorCode();
QueryUpdate queryUpdate = QueryUpdate.newBuilder()
.setErrorMessage(error)
.setErrorCode(errorCode)
.build();
sendUpdate.sendUpdate(queryUpdate);
sendUpdate.complete();
})
.doOnComplete(sendUpdate::complete)
.map(subscriptionSerializer::serialize)
.subscribe(sendUpdate::sendUpdate);

return () -> {
updateHandler.getRegistration().close();
return CompletableFuture.completedFuture(null);
};
}
}

/**
* A {@link Runnable} implementation which is given to a {@link PriorityBlockingQueue} to be consumed by the query
* {@link ExecutorService}, in order. The {@code priority} is retrieved from the provided {@link QueryRequest} and
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.query.QueryChannel;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.query.QueryResponse;
Expand All @@ -44,9 +45,8 @@
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.mockito.*;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -62,20 +62,9 @@
import static org.axonframework.axonserver.connector.utils.AssertUtils.assertWithin;
import static org.axonframework.messaging.responsetypes.ResponseTypes.instanceOf;
import static org.axonframework.messaging.responsetypes.ResponseTypes.optionalInstanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

/**
* Unit test suite to verify the {@link AxonServerQueryBus}.
Expand All @@ -85,19 +74,19 @@
class AxonServerQueryBusTest {

private static final String TEST_QUERY = "testQuery";
public static final String CONTEXT = "default-test";
private static final String CONTEXT = "default-test";

private AxonServerConnectionManager axonServerConnectionManager;
private final QueryBus localSegment = mock(QueryBus.class);
private final Serializer serializer = XStreamSerializer.defaultSerializer();
private final TargetContextResolver<QueryMessage<?, ?>> targetContextResolver = spy(new TestTargetContextResolver<>());

private AxonServerQueryBus testSubject;
private AxonServerConnection mockConnection;
private AxonServerConnectionManager axonServerConnectionManager;
private QueryChannel mockQueryChannel;

private AxonServerQueryBus testSubject;

@BeforeEach
void setup() throws Exception {
void setup() {
AxonServerConfiguration configuration = new AxonServerConfiguration();
configuration.setContext(CONTEXT);
axonServerConnectionManager = mock(AxonServerConnectionManager.class);
Expand All @@ -112,17 +101,17 @@ void setup() throws Exception {
.targetContextResolver(targetContextResolver)
.build();

mockConnection = mock(AxonServerConnection.class);
AxonServerConnection mockConnection = mock(AxonServerConnection.class);
mockQueryChannel = mock(QueryChannel.class);

when(axonServerConnectionManager.getConnection(anyString())).thenReturn(mockConnection);
when(axonServerConnectionManager.getConnection()).thenReturn(mockConnection);

when(mockConnection.queryChannel()).thenReturn(mockQueryChannel);
when(mockQueryChannel.registerQueryHandler(any(), any())).thenReturn(() -> CompletableFuture.completedFuture(null));
when(mockQueryChannel.registerQueryHandler(any(), any()))
.thenReturn(() -> CompletableFuture.completedFuture(null));

when(localSegment.subscribe(any(), any(), any())).thenReturn(() -> true);

}

@AfterEach
Expand All @@ -140,6 +129,26 @@ void subscribe() {
verify(mockQueryChannel).registerQueryHandler(any(), eq(new QueryDefinition(TEST_QUERY, String.class)));
}

@Test
void testSeveralSubscribeInvocationsUseSameQueryHandlerInstance() {
QueryDefinition firstExpectedQueryDefinition = new QueryDefinition(TEST_QUERY, String.class);
QueryDefinition secondExpectedQueryDefinition = new QueryDefinition("testIntegerQuery", Integer.class);

ArgumentCaptor<QueryHandler> queryHandlerCaptor = ArgumentCaptor.forClass(QueryHandler.class);

Registration resultOne = testSubject.subscribe(TEST_QUERY, String.class, q -> "test");
assertNotNull(resultOne);
verify(mockQueryChannel).registerQueryHandler(queryHandlerCaptor.capture(), eq(firstExpectedQueryDefinition));

Registration resultTwo = testSubject.subscribe("testIntegerQuery", Integer.class, q -> 1337);
assertNotNull(resultTwo);
verify(mockQueryChannel).registerQueryHandler(queryHandlerCaptor.capture(), eq(secondExpectedQueryDefinition));

List<QueryHandler> resultQueryHandlers = queryHandlerCaptor.getAllValues();
assertEquals(2, resultQueryHandlers.size());
assertEquals(resultQueryHandlers.get(0), resultQueryHandlers.get(1));
}

@Test
void query() throws Exception {
when(mockQueryChannel.query(any())).thenReturn(new StubResultStream(stubResponse("<string>test</string>")));
Expand Down Expand Up @@ -170,8 +179,9 @@ void queryReportsDispatchException() throws Exception {

@Test
void testQueryReportsCorrectException() throws ExecutionException, InterruptedException {
when(mockQueryChannel.query(any())).thenReturn(new StubResultStream(stubErrorResponse(ErrorCode.QUERY_EXECUTION_ERROR.errorCode(),
"Faking exception result")));
when(mockQueryChannel.query(any())).thenReturn(new StubResultStream(
stubErrorResponse(ErrorCode.QUERY_EXECUTION_ERROR.errorCode(), "Faking exception result")
));
QueryMessage<String, String> testQuery = new GenericQueryMessage<>("Hello, World", instanceOf(String.class));

CompletableFuture<QueryResponseMessage<String>> result = testSubject.query(testQuery);
Expand All @@ -191,8 +201,8 @@ void testQueryReportsCorrectException() throws ExecutionException, InterruptedEx

@Test
void subscribeHandler() {

when(mockQueryChannel.registerQueryHandler(any(), any())).thenReturn(() -> CompletableFuture.completedFuture(null));
when(mockQueryChannel.registerQueryHandler(any(), any()))
.thenReturn(() -> CompletableFuture.completedFuture(null));

Registration result = testSubject.subscribe(TEST_QUERY, String.class, q -> "test: " + q.getPayloadType());

Expand Down Expand Up @@ -237,7 +247,9 @@ void queryForOptionalWillRequestInstanceOfFromRemoteDestination() {
testSubject.scatterGather(testQuery, 12, TimeUnit.SECONDS);

verify(targetContextResolver).resolveContext(testQuery);
verify(mockQueryChannel).query(argThat(r -> r.getResponseType().getType().equals(InstanceResponseType.class.getName())));
verify(mockQueryChannel).query(argThat(
r -> r.getResponseType().getType().equals(InstanceResponseType.class.getName())
));
}

@Test
Expand All @@ -255,8 +267,9 @@ void dispatchInterceptor() {

@Test
void handlerInterceptorRegisteredWithLocalSegment() {
MessageHandlerInterceptor<QueryMessage<?, ?>> interceptor =
(unitOfWork, interceptorChain) -> interceptorChain.proceed();

MessageHandlerInterceptor<QueryMessage<?, ?>> interceptor = (unitOfWork, interceptorChain) -> interceptorChain.proceed();
testSubject.registerHandlerInterceptor(interceptor);

verify(localSegment).registerHandlerInterceptor(interceptor);
Expand Down Expand Up @@ -315,7 +328,7 @@ private QueryResponse stubResponse(String payload) {
.build();
}

private QueryResponse stubErrorResponse(String errorCode, String message) {
private QueryResponse stubErrorResponse(String errorCode, @SuppressWarnings("SameParameterValue") String message) {
return QueryResponse.newBuilder()
.setRequestIdentifier("request")
.setMessageIdentifier(UUID.randomUUID().toString())
Expand Down
Expand Up @@ -693,7 +693,7 @@ protected void invokeStartHandlers() {
"One of the start handlers in phase [%d] failed with the following exception:",
currentLifecyclePhase
),
e.getCause()
e
);
}
);
Expand All @@ -712,8 +712,8 @@ protected void invokeShutdownHandlers() {
invokeLifecycleHandlers(
shutdownHandlers,
e -> logger.warn(
"One of the shutdown handlers in phase [{}] failed with the following exception: {}",
currentLifecyclePhase, e.getCause()
"One of the shutdown handlers in phase [{}] failed with the following exception: ",
currentLifecyclePhase, e
)
);

Expand Down

0 comments on commit 3cb03c4

Please sign in to comment.