Skip to content

Commit

Permalink
Fix concurrency problem in ConnectionTester.
Browse files Browse the repository at this point in the history
- ChildActorNanny cannot be thread-safe; label it as such.
- Add thread-safe wrapper AsyncChildActorNanny.
- Add health check to MqttPublisherActor and MqttConsumerActor.
- Move CompletableFutureUtils to ditto.base.service for reuse.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 15, 2022
1 parent efd58c3 commit 635cec6
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.gateway.service.health;
package org.eclipse.ditto.base.service;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -20,7 +20,7 @@
/**
* Utility methods for working with {@link java.util.concurrent.CompletableFuture}.
*/
final class CompletableFutureUtils {
public final class CompletableFutureUtils {

private CompletableFutureUtils() {
throw new AssertionError();
Expand All @@ -35,14 +35,24 @@ private CompletableFutureUtils() {
* @param <T> the type of the results.
* @return the resulting single completable future.
*/
static <T> CompletableFuture<List<T>> collectAsList(final List<CompletableFuture<T>> futures) {
public static <T> CompletableFuture<List<T>> collectAsList(final List<CompletableFuture<T>> futures) {
return collect(futures, Collectors.toList());
}

private static <T, A, R> CompletableFuture<R> collect(final List<CompletableFuture<T>> futures,
/**
* Collect the results of futures with a collector.
*
* @param futures the completable futures.
* @param collector the collector.
* @param <T> the type of results of each future.
* @param <A> the type of accumulators of the collector.
* @param <R> the type of results of the collector.
* @return the resulting single completable future.
*/
public static <T, A, R> CompletableFuture<R> collect(final List<CompletableFuture<T>> futures,
final Collector<T, A, R> collector) {

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(collector));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;

/**
* Provides asynchronous {@link ChildActorNanny} methods for concurrent access.
* Wraps a {@link ChildActorNannyActor}.
*/
public final class AsyncChildActorNanny {

/**
* Timeout waiting for a child actor to start.
*/
public static final Duration TIMEOUT = Duration.ofSeconds(10);

private final ActorRef childActorNannyActor;

private AsyncChildActorNanny(final ActorRef childActorNannyActor) {
this.childActorNannyActor = childActorNannyActor;
}

/**
* Returns a new instance of {@code AsyncChildActorNanny}.
*
* @param childActorNanny the nanny with which the {@code ChildActorNannyActor} is created.
* @return the new instance.
* @throws NullPointerException if any argument is {@code null}.
*/
public static AsyncChildActorNanny newInstance(final ChildActorNanny childActorNanny) {
checkNotNull(childActorNanny, "childActorNanny");
final ActorRef childActorNannyActor =
childActorNanny.startChildActorConflictFree("childActorNanny", ChildActorNannyActor.props());
return new AsyncChildActorNanny(childActorNannyActor);
}

/**
* Returns a new instance of {@code AsyncChildActorNanny} for test.
*
* @param actorSystem Actor system in which to create the {@code ChildActorNannyActor}.
* @param nannyToTest The {@code ChildActorNanny} to use to create actors.
* @return The new instance.
* @throws java.lang.NullPointerException if any argument is {@code null}.
*/
public static AsyncChildActorNanny newInstanceForTest(final ActorSystem actorSystem,
final ChildActorNanny nannyToTest) {
checkNotNull(actorSystem, "actorSystem");
checkNotNull(nannyToTest, "childActorNanny");
final ActorRef childActorNannyActor = actorSystem.actorOf(ChildActorNannyActor.propsForTest(nannyToTest));
return new AsyncChildActorNanny(childActorNannyActor);
}

/**
* Creates a child actor in the context of the {@code ChildActorNannyActor}.
* The specified name gets concatenated by an increasing count number for that name.
* <p>
* For example, if this method was called two times with the base actor name {@code "myActor"}, the actual child
* actor would be
* <ol>
* <li>{@code "myActor1"},</li>
* <li>{@code "myActor2"}.</li>
* </ol>
* The count numbers are maintained per base actor name.
*
* @param baseActorName the base name of the child actor to create. This name must not be {@code null}, empty or
* start with {@code "$"}. The actual actor name differs in the way that it is concatenated with a count number.
* @param actorProps the {@code Props} of the child actor to create.
* @return the {@code ActorRef} of the created child actor.
*/
public CompletionStage<ActorRef> startChildActorConflictFree(final CharSequence baseActorName,
final Props actorProps) {
final var message = new ChildActorNannyActor.StartChildActorConflictFree(baseActorName, actorProps);
return Patterns.ask(childActorNannyActor, message, TIMEOUT).thenApply(ActorRef.class::cast);
}

/**
* Asynchronously stops the actor wrapped by this object.
* All subsequent calls to {@code startChildActorConflictFree} will fail.
*/
public void stop() {
childActorNannyActor.tell(new ChildActorNannyActor.StopThisActor(), ActorRef.noSender());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.concurrent.ThreadSafe;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
Expand All @@ -34,12 +31,11 @@
* (see {@link #startChildActorConflictFree(CharSequence, Props)}).
* In this case the names of the actors to be started are concatenated with a count to make the names unique.
*/
@ThreadSafe
public final class ChildActorNanny {

private final ActorRefFactory actorRefFactory;
private final LoggingAdapter logger;
private final Map<String, AtomicInteger> childActorCounts;
private final Map<String, Integer> childActorCounts;

private ChildActorNanny(final ActorRefFactory actorRefFactory, final LoggingAdapter logger) {
this.actorRefFactory = actorRefFactory;
Expand Down Expand Up @@ -121,8 +117,14 @@ public ActorRef startChildActorConflictFree(final CharSequence baseActorName, fi
}

private String getNextChildActorName(final String baseActorName) {
final var childActorCount = childActorCounts.computeIfAbsent(baseActorName, unused -> new AtomicInteger(0));
return baseActorName + childActorCount.incrementAndGet();
final var childActorCount = childActorCounts.compute(baseActorName, (name, previousCount) -> {
if (previousCount == null) {
return 1;
} else {
return ++previousCount;
}
});
return baseActorName + childActorCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;

/**
* An actor that create child actors on command. Used for connection testing.
*/
public final class ChildActorNannyActor extends AbstractActor {

record StartChildActorConflictFree(CharSequence baseActorName, Props props) {}

record StopThisActor() {}

private final ChildActorNanny childActorNanny;

@SuppressWarnings("unused") // called by reflection
private ChildActorNannyActor() {
childActorNanny =
ChildActorNanny.newInstance(getContext(), DittoLoggerFactory.getDiagnosticLoggingAdapter(this));
}

@SuppressWarnings("unused") // called by reflection
private ChildActorNannyActor(final ChildActorNanny childActorNanny) {
this.childActorNanny = childActorNanny;
}

static Props props() {
return Props.create(ChildActorNannyActor.class);
}

static Props propsForTest(final ChildActorNanny nannyToTest) {
return Props.create(ChildActorNannyActor.class, nannyToTest);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(StartChildActorConflictFree.class, this::startChildActorConflictFree)
.match(StopThisActor.class, this::stopThisActor)
.build();
}

private void startChildActorConflictFree(final StartChildActorConflictFree cmd) {
getSender().tell(childActorNanny.startChildActorConflictFree(cmd.baseActorName(), cmd.props()), getSelf());
}

private void stopThisActor(final StopThisActor trigger) {
getContext().stop(getSelf());
}
}
Loading

0 comments on commit 635cec6

Please sign in to comment.