Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix isolated member auto restart
  • Loading branch information
songbo01 committed Jun 26, 2019
1 parent 137443b commit 4468d5b
Showing 1 changed file with 46 additions and 14 deletions.
Expand Up @@ -8,22 +8,26 @@

package org.opendaylight.controller.cluster.common.actor;

import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedAbstractActor;
import akka.japi.Effect;
import akka.remote.ThisActorSystemQuarantinedEvent;
import akka.remote.AssociationErrorEvent;
import akka.remote.RemotingLifecycleEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

/**
* This class listens to Akka RemotingLifecycleEvent events to detect when this node has been
* quarantined by another. Once this node gets quarantined, restart the ActorSystem to allow this
* node to rejoin the cluster.
*
* @author Gary Wu gary.wu1@huawei.com
*
*/
public class QuarantinedMonitorActor extends UntypedActor {
public class QuarantinedMonitorActor extends UntypedAbstractActor {

private static final Logger LOG = LoggerFactory.getLogger(QuarantinedMonitorActor.class);

Expand All @@ -32,36 +36,64 @@ public class QuarantinedMonitorActor extends UntypedActor {
private final Effect callback;
private boolean quarantined;

private Set<Address> addressSet = new HashSet<>();
private Integer count = 0;

protected QuarantinedMonitorActor(final Effect callback) {
this.callback = callback;

LOG.debug("Created QuarantinedMonitorActor");
LOG.info("Created QuarantinedMonitorActor");

getContext().system().eventStream().subscribe(getSelf(), ThisActorSystemQuarantinedEvent.class);
getContext().system().eventStream().subscribe(getSelf(), RemotingLifecycleEvent.class);
}

@Override
public void postStop() {
LOG.debug("Stopping QuarantinedMonitorActor");
LOG.info("Stopping QuarantinedMonitorActor");
}


@Override
public void onReceive(final Object message) throws Exception {
final String messageType = message.getClass().getSimpleName();
LOG.trace("onReceive {} {}", messageType, message);
LOG.info("onReceive {} {}", messageType, message);

// check to see if we got quarantined by another node
if (quarantined) {
return;
}

if (message instanceof ThisActorSystemQuarantinedEvent) {
final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
LOG.warn("Got quarantined by {}", event.remoteAddress());
quarantined = true;
// if (message instanceof ThisActorSystemQuarantinedEvent) {
// final ThisActorSystemQuarantinedEvent event = (ThisActorSystemQuarantinedEvent) message;
// LOG.warn("Got quarantined by {}", event.remoteAddress());
// quarantined = true;
//
// // execute the callback
// callback.apply();
// }
if (message instanceof AssociationErrorEvent) {
String errorMessage = message.toString();
LOG.info("QuarantinedMonitorActor errorMessage:{}", errorMessage);
if (errorMessage.contains("The remote system has a UID that has been quarantined")) {
Address address = ((AssociationErrorEvent) message).getRemoteAddress();
addressSet.add(address);
count++;
LOG.info("QuarantinedMonitorActor address:{}", address);
LOG.info("QuarantinedMonitorActor addressSet: {} count:{}", addressSet, count);
if (count >= 10 && addressSet.size() > 1) {
count = 0;
addressSet.clear();
final AssociationErrorEvent event = (AssociationErrorEvent) message;
LOG.warn("Got quarantined by {}", event.remoteAddress());
quarantined = true;

// execute the callback
callback.apply();
// execute the callback
callback.apply();
}
} else if (errorMessage.contains("The remote system explicitly disassociated")) {
count = 0;
addressSet.clear();
}
}
}

Expand Down

0 comments on commit 4468d5b

Please sign in to comment.