Skip to content

Commit

Permalink
minor formatting changes;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Aug 26, 2022
1 parent ff52956 commit 0a4e6ec
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ protected SubscriptionAbortedException doBuild(final DittoHeaders dittoHeaders,
return new SubscriptionAbortedException(dittoHeaders, message, description, cause, href);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ private SearchActor(final QueryParser queryParser, final ThingsSearchPersistence
* Creates Akka configuration object Props for this SearchActor.
*
* @param queryFactory factory of query objects.
* @param searchPersistence the {@link org.eclipse.ditto.thingsearch.service.persistence.read.ThingsSearchPersistence} to use in order to execute queries.
* @param searchPersistence the {@link org.eclipse.ditto.thingsearch.service.persistence.read.ThingsSearchPersistence}
* to use in order to execute queries.
* @param pubSubMediator the Akka pub-sub mediator.
* @return the Akka configuration Props object.
*/
Expand All @@ -181,7 +182,8 @@ static Props props(final QueryParser queryFactory, final ThingsSearchPersistence

@Override
public void preStart() {
final var subscribe = DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, ACTOR_NAME, getSelf());
final var subscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, ACTOR_NAME, getSelf());
pubSubMediator.tell(subscribe, getSelf());

final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
Expand Down Expand Up @@ -322,7 +324,8 @@ private CompletionStage<Object> performStream(final StreamThings streamThings, f
final ThreadSafeDittoLoggingAdapter l) {

final var queryType = "query"; // same as queryThings
final var searchTimer = startNewTimer(streamThings.getImplementedSchemaVersion(), queryType, streamThings);
final var searchTimer =
startNewTimer(streamThings.getImplementedSchemaVersion(), queryType, streamThings);
final var queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME);
final var namespaces = streamThings.getNamespaces().orElse(null);

Expand Down Expand Up @@ -375,7 +378,8 @@ private CompletionStage<Object> performQuery(final QueryThings queryThings, fina
l.debug("Starting to process QueryThings command: {}", queryThings);

final var queryType = "query";
final var searchTimer = startNewTimer(queryThings.getImplementedSchemaVersion(), queryType, queryThings);
final var searchTimer =
startNewTimer(queryThings.getImplementedSchemaVersion(), queryType, queryThings);
final var queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME);
final var namespaces = queryThings.getNamespaces().orElse(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ public static Props props(final ActorRef pubSubMediator,
public void preStart() {
CoordinatedShutdown.get(getContext().getSystem())
.addTask(CoordinatedShutdown.PhaseServiceUnbind(), "service-unbind-" + ACTOR_NAME, () -> {
final var unsub = DistPubSubAccess.unsubscribeViaGroup(PolicyTag.PUB_SUB_TOPIC_MODIFIED,
ACTOR_NAME, getSelf());
final var unsub =
DistPubSubAccess.unsubscribeViaGroup(PolicyTag.PUB_SUB_TOPIC_MODIFIED, ACTOR_NAME, getSelf());
final var shutdownAskTimeout = Duration.ofMinutes(1); // does not matter as phase will timeout

return Patterns.ask(pubSubMediator, unsub, shutdownAskTimeout).thenApply(reply -> Done.done());
});
}
Expand Down Expand Up @@ -242,4 +243,5 @@ private enum Control {
}

private record LocalWrappedPolicyTag(PolicyTag delegate) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import akka.stream.javadsl.Source;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.collection.JavaConverters;
import scala.jdk.javaapi.FutureConverters;

/**
* Tests the graceful shutdown behavior of {@code SearchActor}.
Expand All @@ -87,8 +85,8 @@ public void unbindAndStopWithoutQuery() {
final var props = SearchActor.props(queryParser, persistence, getRef());
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe = DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX,
SearchActor.ACTOR_NAME, underTest);
final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

Expand All @@ -113,8 +111,8 @@ public void waitForQueries() {
final var props = SearchActor.props(queryParser, persistence, getRef());
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe = DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX,
SearchActor.ACTOR_NAME, underTest);
final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

Expand Down Expand Up @@ -168,15 +166,16 @@ public void terminateStreams() {
final var props = SearchActor.props(queryParser, persistence, getRef());
final var underTest = childActorOf(props, SearchActor.ACTOR_NAME);

final var expectedSubscribe = DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX,
SearchActor.ACTOR_NAME, underTest);
final var expectedSubscribe =
DistPubSubAccess.subscribeViaGroup(ThingSearchCommand.TYPE_PREFIX, SearchActor.ACTOR_NAME, underTest);
expectMsg(expectedSubscribe);
reply(new DistributedPubSubMediator.SubscribeAck(expectedSubscribe));

final var serviceRequestsDone = SearchActor.Control.SERVICE_REQUESTS_DONE;
use(p -> p.findAllUnlimited(any(), any(), any()));

final var stream = StreamThings.of(null, null, null, null, DittoHeaders.empty());
final var stream =
StreamThings.of(null, null, null, null, DittoHeaders.empty());

underTest.tell(stream, getRef());
final SourceRef<?> sourceRef = expectMsgClass(SourceRef.class);
Expand Down Expand Up @@ -206,4 +205,5 @@ private ActorRef use(final Consumer<ThingsSearchPersistence> stubberConsumer) {
stubberConsumer.accept(Mockito.doAnswer(inv -> mat.second()).when(persistence));
return mat.first();
}

}

0 comments on commit 0a4e6ec

Please sign in to comment.