Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

https://jira.jboss.org/browse/HORNETQ-407 improving journal shutdown …

…on compacting and avoid a rare test failure on JournalRestartStressTest
  • Loading branch information...
commit 71c1fa39fd8541619419d0fabfd748900c02800e 1 parent eb0f433
@clebertsuconic clebertsuconic authored
View
17 src/main/org/hornetq/core/journal/impl/JournalImpl.java
@@ -2277,6 +2277,11 @@ private void checkCompact() throws Exception
// compacting is disabled
return;
}
+
+ if (state != JournalImpl.STATE_LOADED)
+ {
+ return;
+ }
JournalFile[] dataFiles = getDataFiles();
@@ -2536,6 +2541,16 @@ public synchronized void stop() throws Exception
try
{
+
+ state = JournalImpl.STATE_STOPPED;
+
+ compactorExecutor.shutdown();
+
+ if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
+ {
+ JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
+ }
+
filesExecutor.shutdown();
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2564,8 +2579,6 @@ public synchronized void stop() throws Exception
freeFiles.clear();
openedFiles.clear();
-
- state = JournalImpl.STATE_STOPPED;
}
finally
{
View
53 tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
@@ -23,6 +23,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AbstractJournalUpdateTask;
@@ -181,6 +183,57 @@ public void testCompactWithConcurrentAppend() throws Exception
{
internalCompactTest(false, false, true, true, false, false, false, false, false, false, true, true, true);
}
+
+ public void testCompactFirstFileReclaimed() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ final byte recordType = (byte)0;
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+
+ journal.start();
+
+ journal.loadInternalOnly();
+
+ journal.appendAddRecord(1, recordType, "test".getBytes(), true);
+
+ journal.forceMoveNextFile();
+
+
+ journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
+
+ journal.appendDeleteRecord(1, true);
+
+ journal.appendAddRecord(2, recordType, "finalRecord".getBytes(), true);
+
+
+ for (int i = 10 ; i < 100; i++)
+ {
+ journal.appendAddRecord(i, recordType, ("tst" + i).getBytes(), true);
+ journal.forceMoveNextFile();
+ journal.appendUpdateRecord(i, recordType, ("uptst" + i).getBytes(), true);
+ journal.appendDeleteRecord(i, true);
+ }
+
+ journal.compact();
+
+ journal.stop();
+
+ List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedRecords = new ArrayList<PreparedTransactionInfo>();
+
+ journal.start();
+
+ journal.load(records, preparedRecords, null);
+
+ assertEquals(1, records.size());
+
+
+
+ }
private void internalCompactTest(final boolean preXA, // prepare before compact
final boolean postXA, // prepare after compact
View
49 tests/src/org/hornetq/tests/stress/journal/JournalRestartStressTest.java
@@ -57,18 +57,16 @@ public void testLoad() throws Throwable
server2.getConfiguration().setJournalCompactMinFiles(3);
server2.getConfiguration().setJournalCompactPercentage(50);
-
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0; i < 10; i++)
{
server2.start();
-
+
ClientSessionFactory sf = createFactory(false);
sf.setMinLargeMessageSize(1024 * 1024);
sf.setBlockOnDurableSend(false);
-
ClientSession session = sf.createSession(true, true);
-
+
try
{
session.createQueue("slow-queue", "slow-queue");
@@ -79,8 +77,7 @@ public void testLoad() throws Throwable
session.start();
ClientConsumer consumer = session.createConsumer("slow-queue");
-
-
+
while (true)
{
System.out.println("Received message from previous");
@@ -91,15 +88,16 @@ public void testLoad() throws Throwable
}
msg.acknowledge();
}
-
-
-
+
+ session.close();
+
produceMessages(sf, 30000);
-
+
server2.stop();
}
}
+
// Package protected ---------------------------------------------
/**
@@ -110,19 +108,17 @@ public void testLoad() throws Throwable
* @throws Throwable
*/
private void produceMessages(final ClientSessionFactory sf, final int NMSGS) throws HornetQException,
- InterruptedException,
- Throwable
+ InterruptedException,
+ Throwable
{
-
+
final int TIMEOUT = 5000;
-
- System.out.println("sending " + NMSGS + " messages");
+ System.out.println("sending " + NMSGS + " messages");
final ClientSession sessionSend = sf.createSession(true, true);
-
+
ClientProducer prod2 = sessionSend.createProducer("slow-queue");
-
try
{
@@ -139,6 +135,7 @@ private void produceMessages(final ClientSessionFactory sf, final int NMSGS) thr
Thread tReceive = new Thread()
{
+ @Override
public void run()
{
try
@@ -149,8 +146,8 @@ public void run()
{
if (i % 500 == 0)
{
- double percent = (double)i / (double) NMSGS;
- System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
+ double percent = (double)i / (double)NMSGS;
+ System.out.println("msgs " + i + " of " + NMSGS + ", " + (int)(percent * 100) + "%");
Thread.sleep(100);
}
@@ -179,11 +176,14 @@ public void run()
for (int i = 0; i < NMSGS; i++)
{
ClientMessage msg = sessionSend.createMessage(true);
-
+
int size = RandomUtil.randomPositiveInt() % 10024;
- if (size == 0) size = 10 * 1024;
-
+ if (size == 0)
+ {
+ size = 10 * 1024;
+ }
+
byte[] buffer = new byte[size];
random.nextBytes(buffer);
@@ -191,7 +191,7 @@ public void run()
msg.getBodyBuffer().writeBytes(buffer);
prod.send(msg);
-
+
if (i % 5000 == 0)
{
prod2.send(msg);
@@ -203,6 +203,7 @@ public void run()
sessionReceive.close();
sessionSend.close();
+ sf.close();
for (Throwable e : errors)
{
Please sign in to comment.
Something went wrong with that request. Please try again.