Permalink
Browse files

Merge pull request #673 from FranciscoBorges/simple

Fix race in test that could lead to hangs
  • Loading branch information...
2 parents ed18fa5 + 38e5c60 commit c5ec083ef04b2de8699f6b337e47a02d96a1e61b @andytaylor andytaylor committed Nov 15, 2012
View
@@ -49,6 +49,8 @@ hornetq-core/*.log
/target
/.metadata
+integration/hornetq-*-integration/.project
+
# /distribution/
/distribution/target
/distribution/.project
@@ -36,6 +36,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMemberImpl;
import org.hornetq.core.config.Configuration;
@@ -56,6 +57,7 @@
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.security.HornetQSecurityManager;
@@ -355,6 +357,50 @@ protected void waitForServer(HornetQServer server) throws InterruptedException
}
}
+ /**
+ * @param backup
+ */
+ public static final void waitForRemoteBackupSynchronization(final HornetQServer backup)
+ {
+ waitForRemoteBackup(null, 10, true, backup);
+ }
+
+ /**
+ * @param sessionFactory
+ * @param seconds
+ * @param waitForSync
+ * @param backup
+ */
+ public static final void waitForRemoteBackup(ClientSessionFactoryInternal sessionFactory, int seconds,
+ boolean waitForSync, final HornetQServer backup)
+ {
+ final HornetQServerImpl actualServer = (HornetQServerImpl)backup;
+ final long toWait = seconds * 1000;
+ final long time = System.currentTimeMillis();
+ while (true)
+ {
+ if ((sessionFactory == null || sessionFactory.getBackupConnector() != null) &&
+ (actualServer.isRemoteBackupUpToDate() || !waitForSync))
+ {
+ break;
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("backup started? (" + actualServer.isStarted() + "). Finished synchronizing (" +
+ actualServer.isRemoteBackupUpToDate() + "). SessionFactory!=null ? " + (sessionFactory != null) +
+ " || sessionFactory.getBackupConnector()==" +
+ (sessionFactory != null ? sessionFactory.getBackupConnector() : "not-applicable"));
+ }
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+ }
protected final HornetQServer
createServer(final boolean realFiles,
@@ -823,69 +869,7 @@ protected final ClientMessage createMessage(ClientSession session, int counter,
for (JournalFile file : filesToRead)
{
- JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback()
- {
-
- AtomicInteger getType(byte key)
- {
- if (key == 0)
- {
- System.out.println("huh?");
- }
- Integer ikey = new Integer(key);
- AtomicInteger value = recordsType.get(ikey);
- if (value == null)
- {
- value = new AtomicInteger();
- recordsType.put(ikey, value);
- }
- return value;
- }
-
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- getType(recordInfo.getUserRecordType()).incrementAndGet();
- }
-
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
- {
- getType(recordInfo.getUserRecordType()).incrementAndGet();
- }
-
- public void onReadRollbackRecord(long transactionID) throws Exception
- {
- }
-
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- }
-
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- }
-
- public void onReadDeleteRecord(long recordID) throws Exception
- {
- }
-
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- }
-
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- getType(recordInfo.getUserRecordType()).incrementAndGet();
- }
-
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
- {
- getType(recordInfo.getUserRecordType()).incrementAndGet();
- }
-
- public void markAsDataFile(JournalFile file)
- {
- }
- });
+ JournalImpl.readJournalFile(messagesFF, file, new RecordTypeCounter(recordsType));
}
return recordsType;
}
@@ -935,6 +919,78 @@ public void markAsDataFile(JournalFile file)
return recordsType;
}
+ private static final class RecordTypeCounter implements JournalReaderCallback
+ {
+ private final HashMap<Integer, AtomicInteger> recordsType;
+
+ /**
+ * @param recordsType
+ */
+ public RecordTypeCounter(HashMap<Integer, AtomicInteger> recordsType)
+ {
+ this.recordsType = recordsType;
+ }
+
+ AtomicInteger getType(byte key)
+ {
+ if (key == 0)
+ {
+ System.out.println("huh?");
+ }
+ Integer ikey = new Integer(key);
+ AtomicInteger value = recordsType.get(ikey);
+ if (value == null)
+ {
+ value = new AtomicInteger();
+ recordsType.put(ikey, value);
+ }
+ return value;
+ }
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ getType(recordInfo.getUserRecordType()).incrementAndGet();
+ }
+
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ getType(recordInfo.getUserRecordType()).incrementAndGet();
+ }
+
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ getType(recordInfo.getUserRecordType()).incrementAndGet();
+ }
+
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ getType(recordInfo.getUserRecordType()).incrementAndGet();
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ }
+
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ }
+
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ }
+
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ }
+
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ }
+
+ public void markAsDataFile(JournalFile file0)
+ {
+ }
+ }
/**
* Deleting a file on LargeDir is an asynchronous process. We need to keep looking for a while if
@@ -37,7 +37,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.InVMNodeManager;
@@ -298,46 +297,9 @@ protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, int se
}
}
- /**
- * @param sessionFactory
- * @param seconds
- * @param waitForSync
- * @param backup
- */
- public static void waitForRemoteBackup(ClientSessionFactoryInternal sessionFactory, int seconds,
- boolean waitForSync, final HornetQServer backup)
- {
- final HornetQServerImpl actualServer = (HornetQServerImpl)backup;
- final long toWait = seconds * 1000;
- final long time = System.currentTimeMillis();
- while (true)
- {
- if ((sessionFactory == null || sessionFactory.getBackupConnector() != null) &&
- (actualServer.isRemoteBackupUpToDate() || !waitForSync))
- {
- break;
- }
- if (System.currentTimeMillis() > (time + toWait))
- {
- fail("backup started? (" + actualServer.isStarted() + "). Finished synchronizing (" +
- actualServer.isRemoteBackupUpToDate() + "). SessionFactory!=null ? " +
- (sessionFactory != null) + " || sessionFactory.getBackupConnector()==" +
- (sessionFactory != null ? sessionFactory.getBackupConnector() : "not-applicable"));
- }
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
- }
- }
-
protected abstract TransportConfiguration getAcceptorTransportConfiguration(boolean live);
- protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
+ protected abstract TransportConfiguration getConnectorTransportConfiguration(final boolean live);
protected ServerLocatorInternal getServerLocator() throws Exception
{
@@ -21,7 +21,11 @@
*/
package org.hornetq.tests.integration.cluster.failover;
+import java.util.ArrayList;
+import java.util.List;
+
import junit.framework.Assert;
+
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
@@ -30,15 +34,12 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.tests.integration.cluster.util.TestableServer;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * 8/6/12
*/
public class ReplicatedMultipleServerFailoverExtraBackupsTest extends ReplicatedMultipleServerFailoverTest
{
+ @Override
public void testStartLiveFirst() throws Exception
{
backupServers.get(2).getServer().getConfiguration().setBackupGroupName(getNodeGroupName() + "-0");
@@ -50,12 +51,15 @@ public void testStartLiveFirst() throws Exception
for (TestableServer backupServer : backupServers)
{
backupServer.start();
+ waitForRemoteBackupSynchronization(backupServer.getServer());
}
+
sendCrashReceive();
waitForTopology(backupServers.get(0).getServer(), liveServers.size(), 2);
sendCrashBackupReceive();
}
+ @Override
public void testStartBackupFirst() throws Exception
{
backupServers.get(2).getServer().getConfiguration().setBackupGroupName(getNodeGroupName() + "-0");
@@ -69,6 +73,12 @@ public void testStartBackupFirst() throws Exception
{
liveServer.start();
}
+
+ for (TestableServer backupServer : backupServers)
+ {
+ waitForRemoteBackupSynchronization(backupServer.getServer());
+ }
+
waitForTopology(liveServers.get(0).getServer(), liveServers.size(), 2);
sendCrashReceive();
}

0 comments on commit c5ec083

Please sign in to comment.