Permalink
Browse files

A few tweaks on the journal and OperationContext

  • Loading branch information...
clebertsuconic committed Dec 4, 2009
1 parent 779e501 commit 7a329d5f63cb735efe9bfc5ff742b26e051a1204
@@ -187,7 +187,9 @@ public void flush() throws Exception
if (writingChannel != null)
{
sequentialFile.position(0);
- sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+ sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
+ completion.waitCompletion();
sequentialFile.close();
newDataFiles.add(currentFile);
}
@@ -224,7 +226,7 @@ protected void openFile() throws Exception
writingChannel.writeInt(fileID);
}
- protected void addToRecordsSnaptsho(long id)
+ protected void addToRecordsSnaptshot(long id)
{
recordsSnapshot.add(id);
}
@@ -141,7 +141,8 @@ public void deactivateBuffer()
{
if (timedBuffer != null)
{
- timedBuffer.flush();
+ // When moving to a new file, we need to make sure any pending buffer will be transfered to the buffer
+ timedBuffer.flush(true);
timedBuffer.setObserver(null);
}
}
@@ -176,15 +176,15 @@ public void addCommandCommit(final JournalTransaction liveTransaction, final Jou
{
for (long id : ids)
{
- addToRecordsSnaptsho(id);
+ addToRecordsSnaptshot(id);
}
}
if (ids2 != null)
{
for (long id : ids2)
{
- addToRecordsSnaptsho(id);
+ addToRecordsSnaptshot(id);
}
}
}
@@ -271,10 +271,19 @@ public synchronized void addBytes(final EncodingSupport bytes, final boolean syn
}
public void flush()
+ {
+ flush(false);
+ }
+
+ /**
+ * force means the Journal is moving to a new file. Any pending write need to be done immediately
+ * or data could be lost
+ * */
+ public void flush(final boolean force)
{
synchronized (this)
{
- if (!delayFlush && buffer.writerIndex() > 0)
+ if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
@@ -223,9 +223,6 @@ public void run()
*/
public void complete()
{
- // We hold errors until the complete is set, or the callbacks will never get informed
- errorCode = -1;
- errorMessage = null;
}
/* (non-Javadoc)
@@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +43,7 @@
// Public --------------------------------------------------------
- public void testCaptureException() throws Exception
+ public void testCaptureExceptionOnExecutor() throws Exception
{
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.shutdown();
@@ -92,6 +93,80 @@ public void run()
assertEquals(1, numberOfFailures.get());
}
+ public void testCaptureExceptionOnFailure() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final OperationContextImpl context = new OperationContextImpl(executor)
+ {
+ public void complete()
+ {
+ super.complete();
+ latch.countDown();
+ }
+
+ };
+
+ context.storeLineUp();
+
+ final AtomicInteger failures = new AtomicInteger(0);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ context.waitCompletion(5000);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ failures.incrementAndGet();
+ }
+ }
+ };
+
+ t.start();
+
+ // Need to wait complete to be called first or the test would be invalid.
+ // We use a latch instead of forcing a sleep here
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ context.onError(1, "Poop happens!");
+
+ t.join();
+
+ assertEquals(1, failures.get());
+
+
+ failures.set(0);
+
+ final AtomicInteger operations = new AtomicInteger(0);
+
+ // We should be up to date with lineUps and executions. this should now just finish processing
+ context.executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void done()
+ {
+ operations.incrementAndGet();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ failures.incrementAndGet();
+ }
+
+ });
+
+
+ assertEquals(1, failures.get());
+ assertEquals(0, operations.get());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

0 comments on commit 7a329d5

Please sign in to comment.