Skip to content

Commit

Permalink
Make ClientActorRefs serializable so we can send this directly instea…
Browse files Browse the repository at this point in the history
…d of

sending a SourceRef

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jan 19, 2022
1 parent 9fd8d01 commit c7e4090
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@
import akka.japi.pf.FSMStateFunctionBuilder;
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.SourceRef;
import akka.stream.javadsl.Sink;

/**
Expand Down Expand Up @@ -504,7 +503,6 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState()
sender().tell(HealthSignal.PONG, self());
return stay();
})
.event(SourceRef.class, this::refreshClientActorRefs)
.event(ClientActorRefs.class, this::refreshClientActorRefs)
.event(FatalPubSubException.class, this::failConnectionDueToPubSubException);
}
Expand Down Expand Up @@ -540,19 +538,6 @@ private FSM.State<BaseClientState, BaseClientData> otherClientActorTerminated(fi
return stay();
}

private FSM.State<BaseClientState, BaseClientData> refreshClientActorRefs(
final SourceRef clientActorRefsSourceRef,
final BaseClientData data) {
((SourceRef<ActorRef>) clientActorRefsSourceRef).getSource()
.runWith(Sink.seq(), Materializer.createMaterializer(getContext()))
.thenAccept(clientActorRefs -> {
final ClientActorRefs newClientActorRefs = ClientActorRefs.empty();
newClientActorRefs.add(clientActorRefs);
self().tell(newClientActorRefs, ActorRef.noSender());
});
return stay();
}

private FSM.State<BaseClientState, BaseClientData> refreshClientActorRefs(final ClientActorRefs clientActorRefs,
final BaseClientData data) {
this.clientActorRefs.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.internal.utils.cluster.AkkaJacksonCborSerializable;
import org.eclipse.ditto.internal.utils.pubsub.PubSubFactory;

import akka.actor.ActorPath;
Expand All @@ -29,7 +30,7 @@
* Collection of all client actor refs of a connection actor.
*/
@NotThreadSafe
public final class ClientActorRefs {
public final class ClientActorRefs implements AkkaJacksonCborSerializable {

private final Map<ActorPath, ActorRef> refsByPath = new HashMap<>();
private List<ActorRef> sortedRefs = List.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class ClientActorRefsAggregationActor extends AbstractFSM<AggregationState

/**
* @param clientCount the expected number of clients
* @param receiver the receiver of the aggregated {@link ClientActorRefs}. Make sure that this actor ref is not remote as {@link ClientActorRefs} cannot be serialized.
* @param receiver the receiver of the aggregated {@link ClientActorRefs}.
* @param clientActorRouter the client actor router used to send broadcasts to all clients.
* @param aggregationInterval the interval for aggregation of client actor refs.
* @param aggregationTimeout the maximum time the aggregation actor should wait for a response from all client actors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@
import akka.routing.ConsistentHashingPool;
import akka.routing.ConsistentHashingRouter;
import akka.routing.Pool;
import akka.stream.Materializer;
import akka.stream.SourceRef;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamRefs;

/**
* Handles {@code *Connection} commands and manages the persistence of connection. The actual connection handling to the
Expand Down Expand Up @@ -1092,10 +1088,8 @@ private void addClientActor(final ActorRef newClientActor) {
}

private void syncClientActorRefs(final ClientActorRefs clientActorRefs) {
final SourceRef<ActorRef> clientActorRefsSourceRef = Source.from(clientActorRefs.getSortedRefs())
.runWith(StreamRefs.sourceRef(), Materializer.createMaterializer(getContext()));
if (clientActorRouter != null) {
clientActorRouter.tell(new Broadcast(clientActorRefsSourceRef), ActorRef.noSender());
clientActorRouter.tell(new Broadcast(clientActorRefs), ActorRef.noSender());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ditto {
akka {

management.http.port = 25610
management.http.port = ${?AKKA_MANAGEMENT_PORT}

remote {
# for debugging purpose
Expand Down

0 comments on commit c7e4090

Please sign in to comment.