Skip to content

Commit

Permalink
ARTEMIS-1269 replication won't finish synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jul 1, 2017
1 parent c7af954 commit 6b2798a
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger {

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
void errorDecodingPacket(@Cause Exception e);
void errorDecodingPacket(@Cause Throwable e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ public void setCommandConfirmationHandler(final CommandConfirmationHandler handl

@Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Setting handler on " + this + " as " + handler);
}

this.handler = handler;
}

Expand Down Expand Up @@ -521,6 +525,9 @@ public void replayCommands(final int otherLastConfirmedCommandID) {

@Override
public void lock() {
if (logger.isTraceEnabled()) {
logger.trace("lock channel " + this);
}
lock.lock();

reconnectID.incrementAndGet();
Expand All @@ -532,6 +539,9 @@ public void lock() {

@Override
public void unlock() {
if (logger.isTraceEnabled()) {
logger.trace("unlock channel " + this);
}
lock.lock();

failingOver = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffe
doBufferReceived(packet);

super.bufferReceived(connectionID, buffer);
} catch (Exception e) {
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
throw new IllegalStateException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public boolean isSynchronizationIsFinishedAcknowledgement() {
return synchronizationIsFinishedAcknowledgement;
}

public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,16 @@ public void handlePacket(final Packet packet) {
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
}
channel.send(response);

if (response != null) {
if (logger.isTraceEnabled()) {
logger.trace("Returning " + response);
}

channel.send(response);
} else {
logger.trace("Response is null, ignoring response");
}
}

/**
Expand Down Expand Up @@ -332,34 +341,68 @@ public void setChannel(final Channel channel) {

private void finishSynchronization(String liveID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("finishSynchronization::" + liveID);
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
if (logger.isTraceEnabled()) {
logger.trace("getting lock on " + jc + ", journal = " + journal);
}
registerJournal(jc.typeByte, journal);
journal.synchronizationLock();
try {
if (logger.isTraceEnabled()) {
logger.trace("lock acquired on " + jc);
}
// files should be already in place.
filesReservedForSync.remove(jc);
registerJournal(jc.typeByte, journal);
if (logger.isTraceEnabled()) {
logger.trace("stopping journal for " + jc);
}
journal.stop();
if (logger.isTraceEnabled()) {
logger.trace("starting journal for " + jc);
}
journal.start();
if (logger.isTraceEnabled()) {
logger.trace("loadAndSync " + jc);
}
journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
} finally {
if (logger.isTraceEnabled()) {
logger.trace("unlocking " + jc);
}
journal.synchronizationUnlock();
}
}

if (logger.isTraceEnabled()) {
logger.trace("Sync on large messages...");
}
ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
ReplicatedLargeMessage lm = entry.getValue();
if (lm instanceof LargeServerMessageInSync) {
LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
if (logger.isTraceEnabled()) {
logger.trace("lmSync on " + lmSync.toString());
}
lmSync.joinSyncedData(buffer);
}
}

if (logger.isTraceEnabled()) {
logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
}

journalsHolder = null;
backupQuorum.liveIDSet(liveID);
activation.setRemoteBackupUpToDate();

if (logger.isTraceEnabled()) {
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
}

ActiveMQServerLogger.LOGGER.backupServerSynched(server);
return;
}
Expand Down Expand Up @@ -428,13 +471,28 @@ private ReplicationResponseMessageV2 handleStartReplicationSynchronization(final
if (logger.isTraceEnabled()) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started)
return replicationResponseMessage;

if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID());
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
executor.execute(() -> {
try {
// this is a long running process, we cannot block the reading thread from netty
finishSynchronization(packet.getNodeID());
if (logger.isTraceEnabled()) {
logger.trace("returning completion on synchronization catchup");
}
channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
} catch (Exception e) {
logger.warn(e.getMessage());
channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
}

});
// the write will happen through an executor
return null;
}

ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started) {
return replicationResponseMessage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,15 +356,16 @@ private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp
}

if (enabled) {
pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
Expand Down Expand Up @@ -411,9 +412,9 @@ private void replicated() {
OperationContext ctx = pendingTokens.poll();

if (ctx == null) {
throw new IllegalStateException("Missing replication token on the queue.");
logger.warn("Missing replication token on queue");
return;
}

ctx.replicationDone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.jboss.logging.Logger;

/**
* Stops the backup in case of an error at the start of Replication.
Expand All @@ -36,18 +36,22 @@
*/
final class ReplicationError implements Interceptor {

private final ActiveMQServer server;
private static final Logger logger = Logger.getLogger(ReplicationError.class);

private LiveNodeLocator nodeLocator;

ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
this.server = server;
ReplicationError(LiveNodeLocator nodeLocator) {
this.nodeLocator = nodeLocator;
}

@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
return true;

if (logger.isTraceEnabled()) {
logger.trace("Received ReplicationError::" + packet);
}
BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
switch (message.getRegistrationProblem()) {
case ALREADY_REPLICATING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public void init() throws Exception {
@Override
public void run() {
try {

logger.trace("SharedNothingBackupActivation..start");
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
Expand All @@ -109,16 +111,24 @@ public void run() {
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed)
if (closed) {
logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
return;
}
}

boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();

if (!activeMQServer.initialisePart1(scalingDown))
if (!activeMQServer.initialisePart1(scalingDown)) {
if (logger.isTraceEnabled()) {
logger.trace("could not initialize part1 " + scalingDown);
}
return;
}

logger.trace("Waiting for a synchronize now...");
synchronized (this) {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
Expand All @@ -136,16 +146,12 @@ public void run() {
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);

if (logger.isTraceEnabled()) {
logger.trace("Waiting on cluster connection");
}
//todo do we actually need to wait?
logger.trace("Waiting on cluster connection");
clusterController.awaitConnectionToReplicationCluster();

if (logger.isTraceEnabled()) {
logger.trace("Cluster Connected");
}
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
logger.trace("Cluster Connected");

clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));

// nodeManager.startBackup();
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -320,13 +326,19 @@ public void run() {
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
logger.trace("stop backup");
activeMQServer.getNodeManager().stopBackup();
logger.trace("start store manager");
activeMQServer.getStorageManager().start();
logger.trace("activated");
activeMQServer.getBackupManager().activated();
if (scalingDown) {
logger.trace("Scalling down...");
activeMQServer.initialisePart2(true);
} else {
logger.trace("Setting up new activation");
activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
logger.trace("initialize part 2");
activeMQServer.initialisePart2(false);

if (activeMQServer.getIdentity() != null) {
Expand All @@ -337,6 +349,8 @@ public void run() {

}

logger.trace("completeActivation at the end");

activeMQServer.completeActivation();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,11 @@ protected static final ClusterConnectionConfiguration basicClusterConnectionConf
for (String c : connectors) {
connectors0.add(c);
}
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
setStaticConnectors(connectors0);

return clusterConnectionConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.junit.Ignore;
import org.junit.Test;

public class LargeMessageFailoverTest extends FailoverTest {

@Override
@Test
@Ignore
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
Thread.sleep(1000);
}

@Override
@Test
@Ignore
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
Thread.sleep(1000);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public int getBackupServerCount() {

@Override
public boolean isNetty() {
return false;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private static TransportConfiguration transportConfiguration(String classname,
String name) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
Integer port = live ? 61616 : 5545;
Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams, name);
}
Expand Down

0 comments on commit 6b2798a

Please sign in to comment.