Skip to content

Commit

Permalink
Removed unnecessary classes needed for previous version of Axon Frame…
Browse files Browse the repository at this point in the history
…work.
  • Loading branch information
saratry committed Sep 17, 2018
1 parent 93696d4 commit b341004
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 371 deletions.
Expand Up @@ -15,6 +15,10 @@

package io.axoniq.axonserver.connector.query;

import io.axoniq.axonserver.connector.query.subscription.AxonServerSubscriptionQueryResult;
import io.axoniq.axonserver.connector.query.subscription.DeserializedResult;
import io.axoniq.axonserver.connector.query.subscription.SubscriptionMessageSerializer;
import io.axoniq.axonserver.connector.query.subscription.SubscriptionQueryRequestTarget;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
Expand Down Expand Up @@ -44,6 +48,11 @@
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -66,44 +75,53 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static io.axoniq.axonserver.connector.util.ProcessingInstructionHelper.numberOfResults;
import static io.axoniq.axonserver.connector.util.ProcessingInstructionHelper.priority;
import static io.axoniq.axonserver.grpc.query.QueryProviderInbound.RequestCase.SUBSCRIPTION_QUERY_REQUEST;

/**
* AxonHub implementation for the QueryBus. Delegates incoming queries to the specified localSegment.
*
* @author Marc Gathier
*/
public class AxonServerQueryBus implements QueryBus {
public class AxonServerQueryBus implements QueryBus, QueryUpdateEmitter {
private final Logger logger = LoggerFactory.getLogger(AxonServerQueryBus.class);
private final AxonServerConfiguration configuration;
private final QueryUpdateEmitter updateEmitter;
private final QueryBus localSegment;
private final QuerySerializer serializer;
private final SubscriptionMessageSerializer subscriptionSerializer;
private final QueryPriorityCalculator priorityCalculator;
private final QueryProvider queryProvider;
private final PlatformConnectionManager platformConnectionManager;
private final ClientInterceptor[] interceptors;
private final Collection<String> subscriptions = new CopyOnWriteArraySet<>();
private final DispatchInterceptors<QueryMessage<?, ?>> dispatchInterceptors = new DispatchInterceptors<>();
private final Map<RequestCase, Collection<Consumer<QueryProviderInbound>>> queryHandlers = new EnumMap<>(RequestCase.class);


/**
* Creates an instance of the AxonHubQueryBus
*
* @param platformConnectionManager creates connection to AxonHub platform
* @param configuration contains client and component names used to identify the application in AxonHub
* @param updateEmitter
* @param localSegment handles incoming query requests
* @param messageSerializer serializer/deserializer for payload and metadata of query requests and responses
* @param genericSerializer serializer for communication of other objects than payload and metadata
* @param priorityCalculator calculates the request priority based on the content and adds it to the request
*/
public AxonServerQueryBus(PlatformConnectionManager platformConnectionManager, AxonServerConfiguration configuration, QueryBus localSegment,
Serializer messageSerializer, Serializer genericSerializer, QueryPriorityCalculator priorityCalculator) {
public AxonServerQueryBus(PlatformConnectionManager platformConnectionManager,
AxonServerConfiguration configuration,
QueryUpdateEmitter updateEmitter, QueryBus localSegment,
Serializer messageSerializer, Serializer genericSerializer,
QueryPriorityCalculator priorityCalculator) {
this.configuration = configuration;
this.updateEmitter = updateEmitter;
this.localSegment = localSegment;
this.serializer = new QuerySerializer(messageSerializer, genericSerializer, configuration);
this.priorityCalculator = priorityCalculator;
Expand All @@ -113,7 +131,13 @@ public AxonServerQueryBus(PlatformConnectionManager platformConnectionManager, A
this.platformConnectionManager.addDisconnectListener(queryProvider::unsubscribeAll);
interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(configuration.getToken()),
new ContextAddingInterceptor(configuration.getContext())};

this.subscriptionSerializer = new SubscriptionMessageSerializer(configuration, messageSerializer, genericSerializer);
platformConnectionManager.addDisconnectListener(this::onApplicationDisconnected);
platformConnectionManager.addReconnectInterceptor(this::interceptReconnectRequest);
SubscriptionQueryRequestTarget target =
new SubscriptionQueryRequestTarget(localSegment, this::publish, subscriptionSerializer);
this.on(SUBSCRIPTION_QUERY_REQUEST, target::onSubscriptionQueryRequest);
platformConnectionManager.addDisconnectListener(target::onApplicationDisconnected);

}

Expand Down Expand Up @@ -217,6 +241,22 @@ public void disconnect() {
queryProvider.disconnect();
}

@Override
public <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter,
SubscriptionQueryUpdateMessage<U> update) {
this.updateEmitter.emit(filter, update);
}

@Override
public void complete(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter) {
this.updateEmitter.complete(filter);
}

@Override
public void completeExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter, Throwable cause) {
this.updateEmitter.completeExceptionally(filter, cause);
}

class QueryProvider {
private final ConcurrentMap<QueryDefinition, Set<MessageHandler<? super QueryMessage<?, ?>>>> subscribedQueries = new ConcurrentHashMap<>();
private final PriorityBlockingQueue<QueryRequest> queryQueue;
Expand Down Expand Up @@ -437,4 +477,40 @@ public void on(RequestCase requestCase, Consumer<QueryProviderInbound> consumer)
consumers.add(consumer);
}

@Override
public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(
SubscriptionQueryMessage<Q, I, U> query, SubscriptionQueryBackpressure backPressure, int updateBufferSize) {
String subscriptionId = query.getIdentifier();

if (this.subscriptions.contains(subscriptionId)) {
String errorMessage = "Already exists a subscription query with the same subscriptionId: " + subscriptionId;
logger.warn(errorMessage);
throw new IllegalArgumentException(errorMessage);
}
logger.debug("Subscription Query requested with subscriptionId {}", subscriptionId);
subscriptions.add(subscriptionId);
QueryServiceGrpc.QueryServiceStub queryService = this.queryServiceStub();

AxonServerSubscriptionQueryResult result = new AxonServerSubscriptionQueryResult(
subscriptionSerializer.serialize(query),
queryService::subscription,
configuration,
backPressure,
updateBufferSize,
() -> subscriptions.remove(subscriptionId));

return new DeserializedResult<>(result.get(), subscriptionSerializer);
}

private Runnable interceptReconnectRequest(Runnable reconnect) {
if (subscriptions.isEmpty()) {
return reconnect;
}
return () -> logger.info("Reconnect refused because there are active subscription queries.");
}

private void onApplicationDisconnected() {
subscriptions.clear();
}

}
Expand Up @@ -49,7 +49,7 @@
*
* @author Sara Pellegrini
*/
class AxonServerSubscriptionQueryResult implements
public class AxonServerSubscriptionQueryResult implements
Supplier<SubscriptionQueryResult<QueryResponse, QueryUpdate>>,
StreamObserver<SubscriptionQueryResponse> {

Expand All @@ -67,7 +67,7 @@ class AxonServerSubscriptionQueryResult implements

private MonoSink<QueryResponse> initialResultSink;

AxonServerSubscriptionQueryResult(
public AxonServerSubscriptionQueryResult(
SubscriptionQuery query,
Function<StreamObserver<SubscriptionQueryResponse>, StreamObserver<SubscriptionQueryRequest>> openStreamFn,
AxonServerConfiguration configuration,
Expand Down

This file was deleted.

Expand Up @@ -119,7 +119,7 @@ QueryProviderOutbound serializeCompleteExceptionally(String subscriptionId, Thro
}


SubscriptionQuery serialize(SubscriptionQueryMessage message) {
public SubscriptionQuery serialize(SubscriptionQueryMessage message) {
QueryRequest queryRequest = QueryRequest.newBuilder().setTimestamp(System.currentTimeMillis())
.setMessageIdentifier(message.getIdentifier())
.setQuery(message.getQueryName())
Expand Down
Expand Up @@ -37,7 +37,7 @@
*
* @author Sara Pellegrini
*/
class SubscriptionQueryRequestTarget {
public class SubscriptionQueryRequestTarget {

private final Logger logger = LoggerFactory.getLogger(SubscriptionQueryRequestTarget.class);

Expand All @@ -49,7 +49,7 @@ class SubscriptionQueryRequestTarget {

private final Map<String, SubscriptionQueryResult<QueryResponseMessage<Object>, SubscriptionQueryUpdateMessage<Object>>> subscriptions = new ConcurrentHashMap<>();

SubscriptionQueryRequestTarget(QueryBus localSegment,
public SubscriptionQueryRequestTarget(QueryBus localSegment,
Publisher<QueryProviderOutbound> publisher,
SubscriptionMessageSerializer serializer) {
this.localSegment = localSegment;
Expand All @@ -58,7 +58,7 @@ class SubscriptionQueryRequestTarget {
}


void onSubscriptionQueryRequest(QueryProviderInbound inbound) {
public void onSubscriptionQueryRequest(QueryProviderInbound inbound) {
SubscriptionQueryRequest subscriptionQuery = inbound.getSubscriptionQueryRequest();
try {
switch (subscriptionQuery.getRequestCase()) {
Expand Down Expand Up @@ -103,11 +103,11 @@ private void getInitialResult(SubscriptionQuery query) {

private void unsubscribe(SubscriptionQuery unsubscribe) {
String subscriptionId = unsubscribe.getSubscriptionIdentifier();
logger.debug("unsubscribe locally subscriptionId " + subscriptionId);
logger.debug("unsubscribe locally subscriptionId {}", subscriptionId);
subscriptions.remove(subscriptionId).cancel();
}

void onApplicationDisconnected() {
public void onApplicationDisconnected() {
subscriptions.values().forEach(Registration::cancel);
subscriptions.clear();
}
Expand Down

0 comments on commit b341004

Please sign in to comment.