Skip to content

Commit

Permalink
r10373:10374 from Branch_2_2_eap
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 26, 2011
1 parent 5947305 commit ceaf0b3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 43 deletions.
2 changes: 1 addition & 1 deletion src/main/org/hornetq/core/journal/TestableJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface TestableJournal extends Journal

boolean isAutoReclaim();

void compact() throws Exception;
void testCompact() throws Exception;

JournalFile getCurrentFile();

Expand Down
19 changes: 12 additions & 7 deletions src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ private static final void trace(final String message)

private final int userVersion;

private Executor filesExecutor;
private Executor openFilesExecutor;

private Executor closeFilesExecutor;

// Static --------------------------------------------------------

Expand All @@ -104,9 +106,10 @@ public JournalFilesRepository(final SequentialFileFactory fileFactory,

// Public --------------------------------------------------------

public void setExecutor(final Executor executor)
public void setExecutor(final Executor fileExecutor, final Executor closeExecutor)
{
filesExecutor = executor;
this.openFilesExecutor = fileExecutor;
this.closeFilesExecutor = closeExecutor;
}

public void clear()
Expand Down Expand Up @@ -358,13 +361,13 @@ public void run()
}
};

if (filesExecutor == null)
if (openFilesExecutor == null)
{
run.run();
}
else
{
filesExecutor.execute(run);
openFilesExecutor.execute(run);
}

JournalFile nextFile = null;
Expand Down Expand Up @@ -417,13 +420,15 @@ public void run()
}
};

if (filesExecutor == null)
// We can't close files while the compactor is running
// as we may be closing files that are being read by the compactor
if (closeFilesExecutor == null)
{
run.run();
}
else
{
filesExecutor.execute(run);
closeFilesExecutor.execute(run);
}

}
Expand Down
65 changes: 60 additions & 5 deletions src/main/org/hornetq/core/journal/impl/JournalImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -1551,13 +1552,67 @@ public void failedTransaction(final long transactionID,

return info;
}


public void testCompact() throws Exception
{
final AtomicInteger errors = new AtomicInteger(0);

final CountDownLatch latch = new CountDownLatch(1);

compactorRunning.set(true);

// We can't use the executor for the compacting... or we would dead lock because of file open and creation
// operations (that will use the executor)
compactorExecutor.execute(new Runnable()
{
public void run()
{

try
{
JournalImpl.this.compact();
}
catch (Throwable e)
{
errors.incrementAndGet();
JournalImpl.log.error(e.getMessage(), e);
e.printStackTrace();
}
finally
{
latch.countDown();
}
}
});

try
{
if (!latch.await(60, TimeUnit.SECONDS))
{
throw new RuntimeException("Didn't finish compact timely");
}

if (errors.get() > 0)
{
throw new RuntimeException("Error during testCompact, look at the logs");
}
}
finally
{
compactorRunning.set(false);
}
}

/**
*
* Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
*
* Note: only synchronized methods on journal are methods responsible for the life-cycle such as stop, start
* records will still come as this is being executed
*
*/
public synchronized void compact() throws Exception
protected synchronized void compact() throws Exception
{

if (compactor != null)
Expand Down Expand Up @@ -2489,7 +2544,7 @@ public Thread newThread(final Runnable r)
}
});

filesRepository.setExecutor(filesExecutor);
filesRepository.setExecutor(filesExecutor, compactorExecutor);

fileFactory.start();

Expand Down Expand Up @@ -2521,7 +2576,7 @@ public synchronized void stop() throws Exception

filesExecutor.shutdown();

filesRepository.setExecutor(null);
filesRepository.setExecutor(null, null);

if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
Expand Down Expand Up @@ -2892,7 +2947,7 @@ private JournalFile appendRecord(final JournalInternalRecord encoder,
callback = null;
}

if (sync)
if (sync && !compactorRunning.get())
{
// In an edge case the transaction could still have pending data from previous files.
// This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
Expand Down Expand Up @@ -2956,7 +3011,7 @@ private void scheduleReclaim()

if (autoReclaim && !compactorRunning.get())
{
filesExecutor.execute(new Runnable()
compactorExecutor.execute(new Runnable()
{
public void run()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,14 @@ public int read(final ByteBuffer bytes) throws Exception
return read(bytes, null);
}

public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
try
{
if (channel == null)
{
throw new Exception("File " + this.getFileName() + " has a null channel");
}
int bytesRead = channel.read(bytes);

if (callback != null)
Expand Down
Loading

0 comments on commit ceaf0b3

Please sign in to comment.