Skip to content

Commit

Permalink
fixed bug that lead ConnectionActor to stop itself for brandnew conne…
Browse files Browse the repository at this point in the history
…ctions

* and added ddata "remember-entities" in order to failover more gracefully in case of rolling update or failover

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Feb 13, 2019
1 parent b087bd7 commit 059a529
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -15,3 +15,4 @@ dependencies.txt
/deployment/docker/sandbox/postgres
/deployment/docker/sandbox/graphite/storage
ajcore.*.txt
*.mdb
Expand Up @@ -96,6 +96,7 @@
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.routing.ClusterRouterPool;
import akka.cluster.routing.ClusterRouterPoolSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
Expand Down Expand Up @@ -123,9 +124,6 @@ public final class ConnectionActor extends AbstractPersistentActor {
private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-journal";
private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-snapshots";

private static final String UNRESOLVED_PLACEHOLDERS_MESSAGE =
"Failed to substitute all placeholders in '{}', target is dropped.";

private static final String PUB_SUB_GROUP_PREFIX = "connection:";

/**
Expand Down Expand Up @@ -281,8 +279,10 @@ public Receive createReceiveRecover() {
subscribeForEvents();
}
getContext().become(connectionCreatedBehaviour);
} else {
} else if (lastSequenceNr() > 0) {
// if the last sequence number is already > 0 we can assume that the connection was deleted:
stopSelfIfDeletedAfterDelay();
// otherwise not - as the connection may just be created!
}

getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
Expand Down Expand Up @@ -911,9 +911,9 @@ private void stopChildActor(final ActorRef actor) {
}

private void stopSelf() {
log.debug("Shutting down");
// stop the supervisor (otherwise it'd restart this actor) which causes this actor to stop, too.
getContext().getParent().tell(PoisonPill.getInstance(), getSelf());
log.info("Passivating / shutting down");
final ShardRegion.Passivate passivateMessage = new ShardRegion.Passivate(PoisonPill.getInstance());
getContext().getParent().tell(passivateMessage, getSelf());
}

private void stopSelfIfDeletedAfterDelay() {
Expand Down
Expand Up @@ -34,6 +34,7 @@
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.sharding.ShardRegion;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.DeciderBuilder;
Expand Down Expand Up @@ -157,6 +158,7 @@ public Receive createReceive() {
getContext().dispatcher(), null);
restartCount += 1;
})
.match(ShardRegion.Passivate.class, passivate -> getContext().getParent().tell(passivate, getSelf()))
.matchAny(message -> {
LogUtil.enhanceLogWithCustomField(log, BaseClientData.MDC_CONNECTION_ID, connectionId);
if (child != null) {
Expand Down
Expand Up @@ -115,6 +115,11 @@ akka {
cluster {
sharding {
role = "connectivity"

# When this is set to 'on' the active entity actors will automatically be restarted
# upon Shard restart. i.e. if the Shard is started on a different ShardRegion
# due to rebalance or crash.
remember-entities = on
}

roles = [
Expand Down

0 comments on commit 059a529

Please sign in to comment.