Skip to content

Commit

Permalink
#11 Subscription Query - Work In Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
saratry committed Jun 11, 2018
1 parent 05a6ce0 commit b293eff
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 197 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@
<version>${project.parent.version}</version>
</dependency>

<!-- OPTIONAL dependency on reactor -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<optional>true</optional>
</dependency>

<!-- OPTIONAL dependency on Spring -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2018. AxonIQ
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonhub.client.query.subscription;

import io.axoniq.axonhub.SubscriptionQueryResponse;
import io.grpc.stub.StreamObserver;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import java.util.Optional;
import java.util.function.Consumer;

/**
*
* SubscriptionQueryResult that emits initial response and update when subscription query response message is received.
*
* @author Sara Pellegrini
*/
class AxonHubSubscriptionQueryResult<I, U> implements
SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>>,
StreamObserver<SubscriptionQueryResponse> {

private final Logger logger = LoggerFactory.getLogger(AxonHubSubscriptionQueryResult.class);

private final SubscriptionMessageSerializer serializer;

private final Mono<QueryResponseMessage<I>> initialResult;

private final Flux<SubscriptionQueryUpdateMessage<U>> updates;

private MonoSink<QueryResponseMessage<I>> initialResultSink;

private FluxSink<SubscriptionQueryUpdateMessage<U>> updateMessageFluxSink;

private Disposable disposable;

private Consumer<Integer> responseCounterConsumer = i -> {};

AxonHubSubscriptionQueryResult(
SubscriptionMessageSerializer serializer,
SubscriptionQueryBackpressure backpressure) {
this.serializer = serializer;
this.initialResult = Mono.create(sink -> initialResultSink = sink);
this.updates = Flux.create(sink -> {
updateMessageFluxSink = sink;
sink.onDispose(() -> Optional.ofNullable(disposable).ifPresent(Disposable::dispose));
}, backpressure.getOverflowStrategy());
}


@Override
public Mono<QueryResponseMessage<I>> initialResult() {
return initialResult;
}

@Override
public Flux<SubscriptionQueryUpdateMessage<U>> updates() {
return updates;
}

@Override
public void onNext(SubscriptionQueryResponse response) {
responseCounterConsumer.accept(1);
switch (response.getResponseCase()) {
case INITIAL_RESPONSE:
logger.debug("Initial response received: {}", response);
initialResultSink.success(serializer.deserialize(response.getInitialResponse()));
break;
case UPDATE:
logger.debug("Update received: {}", response);
updateMessageFluxSink.next(serializer.deserialize(response.getUpdate()));
break;
case COMPLETE:
updateMessageFluxSink.complete();
break;
case COMPLETE_EXCEPTIONALLY:
logger.debug("Received complete exceptionally subscription query: {}", response);
updateMessageFluxSink.error(serializer.deserialize(response.getCompleteExceptionally()));
break;
}
}

@Override
public void onError(Throwable t) {
initialResultSink.error(t);
updateMessageFluxSink.error(t);
}

@Override
public void onCompleted() {
initialResultSink.success();
updateMessageFluxSink.complete();
}

void onDispose(Disposable disposable) {
this.disposable = disposable;
}

void onResponse(Consumer<Integer> responseCounterConsumer){
this.responseCounterConsumer = responseCounterConsumer;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
import io.axoniq.axonhub.client.Publisher;
import io.axoniq.axonhub.client.query.AxonHubQueryBus;
import io.axoniq.axonhub.client.query.QueryPriorityCalculator;
import io.axoniq.axonhub.client.util.FlowControllingStreamObserver;
import io.axoniq.axonhub.grpc.QueryProviderInbound;
import io.axoniq.axonhub.grpc.QueryProviderOutbound;
import io.grpc.stub.StreamObserver;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandler;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;

import java.lang.reflect.Type;
import java.util.Map;
Expand All @@ -55,13 +57,15 @@
*/
public class EnhancedAxonHubQueryBus implements QueryBus, QueryUpdateEmitter {

private final Logger logger = LoggerFactory.getLogger(AxonHubQueryBus.class);
private final Logger logger = LoggerFactory.getLogger(EnhancedAxonHubQueryBus.class);

private final AxonHubConfiguration configuration;

private final AxonHubQueryBus axonHubQueryBus;

private final SubscriptionMessageSerializer serializer;

private final Map<String, Registration> registrationMap = new ConcurrentHashMap<>();
private final Map<String, Disposable> subscriptions = new ConcurrentHashMap<>();

private final Publisher<QueryProviderOutbound> publisher;

Expand All @@ -80,6 +84,7 @@ public EnhancedAxonHubQueryBus(PlatformConnectionManager platformConnectionManag
messageSerializer,
genericSerializer,
priorityCalculator);
this.configuration = configuration;
serializer = new SubscriptionMessageSerializer(configuration,
messageSerializer,
genericSerializer);
Expand All @@ -106,6 +111,27 @@ public <Q, R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q, R> q
return axonHubQueryBus.scatterGather(query, timeout, unit);
}

@Override
public <Q, I, U> SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> subscriptionQuery(
SubscriptionQueryMessage<Q, I, U> query, SubscriptionQueryBackpressure backpressure) {
logger.debug("Subscription Query requested with subscriptionId " + query.getIdentifier());
AxonHubSubscriptionQueryResult<I,U> responseObserver = new AxonHubSubscriptionQueryResult<>(serializer, backpressure);
SubscriptionQuery subscriptionQuery = this.serializer.serialize(query);

FlowControllingStreamObserver<SubscriptionQuery> requestObserver = new FlowControllingStreamObserver<>(
this.axonHubQueryBus.queryServiceStub().subscription(responseObserver),
configuration, flowControl -> SubscriptionQuery.newBuilder(subscriptionQuery)
.setNumberOfPermits(flowControl.getPermits())
.build(),t-> false);
responseObserver.onDispose(() -> {
logger.debug("Subscription Query with subscriptionId " + query.getIdentifier()+ " disposed");
requestObserver.onCompleted();
});

responseObserver.onResponse(requestObserver::markConsumed);
requestObserver.onNext(subscriptionQuery);
return responseObserver;
}
@Override
public <U> void emit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter,
SubscriptionQueryUpdateMessage<U> update) {
Expand All @@ -122,24 +148,6 @@ public void completeExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> f
this.updateEmitter.completeExceptionally(filter, cause);
}

@Override
public <Q, I, U> Registration subscriptionQuery(SubscriptionQueryMessage<Q, I, U> query,
UpdateHandler<I, U> updateHandler) {
String subscriptionId = query.getIdentifier();
logger.debug("subscriptionQuery request with subscriptionId " + subscriptionId);

StreamObserver<SubscriptionQuery> requestObserver = this.axonHubQueryBus
.queryServiceStub()
.subscription(new SubscriptionResponseHandler<>(updateHandler, serializer));
requestObserver.onNext(this.serializer.serialize(query));

return () -> {
logger.debug("Unsubscribe request for subscriptionId " + subscriptionId);
requestObserver.onCompleted();
return true;
};
}

private void onSubscriptionQueryRequest(QueryProviderInbound inbound) {
SubscriptionQueryRequest subscriptionQuery = inbound.getSubscriptionQuery();
switch (subscriptionQuery.getResponseCase()) {
Expand All @@ -154,17 +162,28 @@ private void onSubscriptionQueryRequest(QueryProviderInbound inbound) {

private void subscribeLocally(SubscriptionQuery subscribe) {
String subscriptionId = subscribe.getQueryRequest().getMessageIdentifier();
UpdateHandler updateHandler = new AxonHubUpdateHandler(subscriptionId, publisher, serializer);
SubscriptionQueryMessage subscriptionQueryMessage = serializer.deserialize(subscribe);
Registration registration = this.localSegment.subscriptionQuery(subscriptionQueryMessage, updateHandler);
registrationMap.put(subscriptionId, registration);
logger.debug("subscribe locally subscriptionId " + subscriptionId);
SubscriptionQueryResult<QueryResponseMessage<Object>, SubscriptionQueryUpdateMessage<Object>> result =
this.localSegment.subscriptionQuery(serializer.deserialize(subscribe));

result.initialResult().subscribe(
i -> publisher.publish(serializer.serialize(i, subscriptionId)),
e -> logger.debug("Error in initial result for subscription id: {}", subscriptionId));
Disposable disposable = result.updates().subscribe(
u -> publisher.publish(serializer.serialize(u, subscriptionId)),
e -> publisher.publish(serializer.serializeCompleteExceptionally(subscriptionId, e)),
() -> publisher.publish(serializer.serializeComplete(subscriptionId)));

subscriptions.put(subscriptionId, disposable);
}

private void unsubscribeLocally(SubscriptionQuery unsubscribe) {
registrationMap.remove(unsubscribe.getQueryRequest().getMessageIdentifier()).cancel();
String subscriptionId = unsubscribe.getQueryRequest().getMessageIdentifier();
logger.debug("unsubscribe locally subscriptionId " + subscriptionId);
subscriptions.remove(subscriptionId).dispose();
}

private void onApplicationDisconnected() {
registrationMap.clear();
subscriptions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ SubscriptionQuery serialize(SubscriptionQueryMessage message) {
.putAllMetaData(metadataSerializer.apply(message.getMetaData())).build();

SubscriptionQuery.Builder builder = SubscriptionQuery.newBuilder()
.setNumberOfPermits(conf.getInitialNrOfPermits())
.setUpdateResponseType(responseTypeSerializer.apply(message.getUpdateResponseType()))
.setQueryRequest(queryRequest);
return builder.build();
}

SubscriptionQueryMessage deserialize(SubscriptionQuery query) {
return new GrpcBackedSubscriptionQueryMessage(query, messageSerializer, genericSerializer);
<Q, I, U> SubscriptionQueryMessage<Q, I, U> deserialize(SubscriptionQuery query) {
return new GrpcBackedSubscriptionQueryMessage<>(query, messageSerializer, genericSerializer);
}


Expand Down
Loading

0 comments on commit b293eff

Please sign in to comment.