Skip to content

Commit

Permalink
don't remove all other addresses in ClusterMemberRemovedAware when ow…
Browse files Browse the repository at this point in the history
…n instance was terminated

* don't log error in Subscriber watchign Termination when own instance was terminated

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 15, 2021
1 parent 8d3e888 commit 4568b50
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ddata.Replicator;
Expand Down Expand Up @@ -67,15 +66,20 @@ default void subscribeForClusterMemberRemovedAware() {
* @param memberRemoved the member-removed event.
*/
default void memberRemoved(final ClusterEvent.MemberRemoved memberRemoved) {
// acksUpdater detected unreachable remote. remove it from local ORMultiMap.
final Address address = memberRemoved.member().address();
log().info("Removing declared acks on removed member <{}>", address);
getDDataWriter().removeAddress(address, writeLocal())
.whenComplete((unused, error) -> {
if (error != null) {
log().error(error, "Failed to remove declared acks on removed cluster member <{}>", address);
}
});
final var address = memberRemoved.member().address();
if (Cluster.get(context().system()).isTerminated()) {
log().info("This instance was terminated from cluster, NOT removing declared acks on removed member <{}>",
address);
} else {
// acksUpdater detected unreachable remote. remove it from local ORMultiMap.
log().info("Removing declared acks on removed member <{}>", address);
getDDataWriter().removeAddress(address, writeLocal())
.whenComplete((unused, error) -> {
if (error != null) {
log().error(error, "Failed to remove declared acks on removed cluster member <{}>", address);
}
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,26 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.ack.GroupedSnapshot;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;

Expand Down Expand Up @@ -119,7 +120,9 @@ private void receiveLocalDeclaredAcks(final Control receiveLocalDeclaredAcks) {
}

private void terminated(final Terminated terminated) {
if (terminated.getActor().equals(ackUpdater)) {
if (Cluster.get(getContext().getSystem()).isTerminated()) {
logger.info("This cluster instance was terminated - no action required ..");
} else if (terminated.getActor().equals(ackUpdater)) {
logger.error("Notifying SubUpdater <{}> of AckUpdater termination: <{}>", subUpdater, terminated);
if (subUpdater != null) {
subUpdater.tell(ActorEvent.PUBSUB_TERMINATED, getSelf());
Expand Down

0 comments on commit 4568b50

Please sign in to comment.