Skip to content

Commit

Permalink
ARTEMIS-1269 replication won't finish synchronization
Browse files Browse the repository at this point in the history
(cherry picked from commit 00b0733)
  • Loading branch information
clebertsuconic committed Jul 1, 2017
1 parent c5e88c5 commit 6297f7a
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 30 deletions.
Expand Up @@ -369,7 +369,7 @@ public interface ActiveMQClientLogger extends BasicLogger {

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
void errorDecodingPacket(@Cause Exception e);
void errorDecodingPacket(@Cause Throwable e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)
Expand Down
Expand Up @@ -457,6 +457,10 @@ public void setCommandConfirmationHandler(final CommandConfirmationHandler handl

@Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Setting handler on " + this + " as " + handler);
}

this.handler = handler;
}

Expand Down Expand Up @@ -516,6 +520,9 @@ public void replayCommands(final int otherLastConfirmedCommandID) {

@Override
public void lock() {
if (logger.isTraceEnabled()) {
logger.trace("lock channel " + this);
}
lock.lock();

reconnectID.incrementAndGet();
Expand All @@ -527,6 +534,9 @@ public void lock() {

@Override
public void unlock() {
if (logger.isTraceEnabled()) {
logger.trace("unlock channel " + this);
}
lock.lock();

failingOver = false;
Expand Down
Expand Up @@ -362,7 +362,7 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe
doBufferReceived(packet);

super.bufferReceived(connectionID, buffer);
} catch (Exception e) {
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
}
}
Expand Down
Expand Up @@ -38,8 +38,9 @@ public boolean isSynchronizationIsFinishedAcknowledgement() {
return synchronizationIsFinishedAcknowledgement;
}

public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
return this;
}

@Override
Expand Down
Expand Up @@ -209,7 +209,16 @@ public void handlePacket(final Packet packet) {
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
}
channel.send(response);

if (response != null) {
if (logger.isTraceEnabled()) {
logger.trace("Returning " + response);
}

channel.send(response);
} else {
logger.trace("Response is null, ignoring response");
}
}

/**
Expand Down Expand Up @@ -331,34 +340,68 @@ public void setChannel(final Channel channel) {

private void finishSynchronization(String liveID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("finishSynchronization::" + liveID);
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
if (logger.isTraceEnabled()) {
logger.trace("getting lock on " + jc + ", journal = " + journal);
}
journal.synchronizationLock();
try {
if (logger.isTraceEnabled()) {
logger.trace("lock acquired on " + jc);
}
// files should be already in place.
filesReservedForSync.remove(jc);
registerJournal(jc.typeByte, journal);
if (logger.isTraceEnabled()) {
logger.trace("stopping journal for " + jc);
}
journal.stop();
if (logger.isTraceEnabled()) {
logger.trace("starting journal for " + jc);
}
journal.start();
if (logger.isTraceEnabled()) {
logger.trace("loadAndSync " + jc);
}
journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
} finally {
if (logger.isTraceEnabled()) {
logger.trace("unlocking " + jc);
}
journal.synchronizationUnlock();
}
}

if (logger.isTraceEnabled()) {
logger.trace("Sync on large messages...");
}
ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
ReplicatedLargeMessage lm = entry.getValue();
if (lm instanceof LargeServerMessageInSync) {
LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
if (logger.isTraceEnabled()) {
logger.trace("lmSync on " + lmSync.toString());
}
lmSync.joinSyncedData(buffer);
}
}

if (logger.isTraceEnabled()) {
logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
}

journalsHolder = null;
backupQuorum.liveIDSet(liveID);
activation.setRemoteBackupUpToDate();

if (logger.isTraceEnabled()) {
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
}

ActiveMQServerLogger.LOGGER.backupServerSynched(server);
return;
}
Expand Down Expand Up @@ -427,13 +470,28 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final
if (logger.isTraceEnabled()) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started)
return replicationResponseMessage;

if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID());
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
executor.execute(() -> {
try {
// this is a long running process, we cannot block the reading thread from netty
finishSynchronization(packet.getNodeID());
if (logger.isTraceEnabled()) {
logger.trace("returning completion on synchronization catchup");
}
channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
} catch (Exception e) {
logger.warn(e.getMessage());
channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
}

});
// the write will happen through an executor
return null;
}

ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started) {
return replicationResponseMessage;
}

Expand Down
Expand Up @@ -350,15 +350,16 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp
}

if (enabled) {
pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
Expand Down Expand Up @@ -405,9 +406,9 @@ private void replicated() {
OperationContext ctx = pendingTokens.poll();

if (ctx == null) {
throw new IllegalStateException("Missing replication token on the queue.");
logger.warn("Missing replication token on queue");
return;
}

ctx.replicationDone();
}

Expand Down
Expand Up @@ -22,10 +22,10 @@
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.jboss.logging.Logger;

/**
* Stops the backup in case of an error at the start of Replication.
Expand All @@ -36,18 +36,22 @@
*/
final class ReplicationError implements Interceptor {

private final ActiveMQServer server;
private static final Logger logger = Logger.getLogger(ReplicationError.class);

private LiveNodeLocator nodeLocator;

ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
this.server = server;
ReplicationError(LiveNodeLocator nodeLocator) {
this.nodeLocator = nodeLocator;
}

@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
return true;

if (logger.isTraceEnabled()) {
logger.trace("Received ReplicationError::" + packet);
}
BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
switch (message.getRegistrationProblem()) {
case ALREADY_REPLICATING:
Expand Down
Expand Up @@ -101,6 +101,8 @@ public void init() throws Exception {
@Override
public void run() {
try {

logger.trace("SharedNothingBackupActivation..start");
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
Expand All @@ -109,16 +111,24 @@ public void run() {
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed)
if (closed) {
logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
return;
}
}

boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();

if (!activeMQServer.initialisePart1(scalingDown))
if (!activeMQServer.initialisePart1(scalingDown)) {
if (logger.isTraceEnabled()) {
logger.trace("could not initialize part1 " + scalingDown);
}
return;
}

logger.trace("Waiting for a synchronize now...");
synchronized (this) {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck);
Expand All @@ -136,16 +146,12 @@ public void run() {
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);

if (logger.isTraceEnabled()) {
logger.trace("Waiting on cluster connection");
}
//todo do we actually need to wait?
logger.trace("Waiting on cluster connection");
clusterController.awaitConnectionToReplicationCluster();

if (logger.isTraceEnabled()) {
logger.trace("Cluster Connected");
}
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
logger.trace("Cluster Connected");

clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));

// nodeManager.startBackup();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -319,13 +325,19 @@ public void run() {
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
logger.trace("stop backup");
activeMQServer.getNodeManager().stopBackup();
logger.trace("start store manager");
activeMQServer.getStorageManager().start();
logger.trace("activated");
activeMQServer.getBackupManager().activated();
if (scalingDown) {
logger.trace("Scalling down...");
activeMQServer.initialisePart2(true);
} else {
logger.trace("Setting up new activation");
activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
logger.trace("initialize part 2");
activeMQServer.initialisePart2(false);

if (activeMQServer.getIdentity() != null) {
Expand All @@ -336,6 +348,8 @@ public void run() {

}

logger.trace("completeActivation at the end");

activeMQServer.completeActivation();
}
} catch (Exception e) {
Expand Down
Expand Up @@ -531,7 +531,11 @@ protected static final ClusterConnectionConfiguration basicClusterConnectionConf
for (String c : connectors) {
connectors0.add(c);
}
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
setStaticConnectors(connectors0);

return clusterConnectionConfiguration;
}
Expand Down
Expand Up @@ -135,7 +135,7 @@ public int getBackupServerCount() {

@Override
public boolean isNetty() {
return false;
return true;
}

@Override
Expand Down
Expand Up @@ -102,7 +102,7 @@ private static TransportConfiguration transportConfiguration(String classname,
String name) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
Integer port = live ? 61616 : 5545;
Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams, name);
}
Expand Down

0 comments on commit 6297f7a

Please sign in to comment.