Skip to content

Commit

Permalink
This closes #1264
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed May 11, 2017
2 parents ec49c43 + f328c24 commit a98dccb
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public boolean exists() {
}

@Override
public synchronized void open() throws Exception {
public void open() throws Exception {
try {
if (!isOpen) {
synchronized (writeLock) {
Expand Down Expand Up @@ -151,12 +151,14 @@ public void delete() throws IOException, InterruptedException, ActiveMQException
}
}

private synchronized int internalWrite(byte[] data, IOCallback callback) throws Exception {
private synchronized int internalWrite(byte[] data, IOCallback callback) {
try {
synchronized (writeLock) {
int noBytes = dbDriver.writeToFile(this, data);
seek(noBytes);
System.out.println("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
if (logger.isTraceEnabled()) {
logger.trace("Write: ID: " + this.getId() + " FileName: " + this.getFileName() + size());
}
if (callback != null)
callback.done();
return noBytes;
Expand All @@ -169,42 +171,25 @@ private synchronized int internalWrite(byte[] data, IOCallback callback) throws
return 0;
}

public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) throws Exception {
public synchronized int internalWrite(ActiveMQBuffer buffer, IOCallback callback) {
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return internalWrite(data, callback);
}

private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) throws Exception {
private synchronized int internalWrite(ByteBuffer buffer, IOCallback callback) {
return internalWrite(buffer.array(), callback);
}

private void scheduleWrite(final ActiveMQBuffer bytes, final IOCallback callback) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
// internalWrite will notify the CriticalIOErrorListener
}
}
executor.execute(() -> {
internalWrite(bytes, callback);
});
}

private void scheduleWrite(final ByteBuffer bytes, final IOCallback callback) {
final SequentialFile file = this;
executor.execute(new Runnable() {
@Override
public void run() {
try {
internalWrite(bytes, callback);
} catch (Exception e) {
logger.error(e);
fileFactory.onIOError(e, "Error on JDBC file sync", file);
}
}
executor.execute(() -> {
internalWrite(bytes, callback);
});
}

Expand Down Expand Up @@ -292,19 +277,16 @@ public long position() {
}

@Override
public synchronized void close() throws Exception {
public void close() throws Exception {
isOpen = false;
sync();
fileFactory.sequentialFileClosed(this);
}

@Override
public void sync() throws IOException {
final SimpleWaitIOCallback callback = new SimpleWaitIOCallback();
executor.execute(new Runnable() {
@Override
public void run() {
callback.done();
}
});
executor.execute(callback::done);

try {
callback.waitCompletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
Expand All @@ -34,6 +34,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;

public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveMQComponent {
Expand All @@ -42,7 +43,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM

private boolean started;

private final List<JDBCSequentialFile> files = new ArrayList<>();
private final Set<JDBCSequentialFile> files = new ConcurrentHashSet<>();

private final Executor executor;

Expand Down Expand Up @@ -155,6 +156,14 @@ public SequentialFile createSequentialFile(String fileName) {
return null;
}

public void sequentialFileClosed(SequentialFile file) {
files.remove(file);
}

public int getNumberOfOpenFiles() {
return files.size();
}

@Override
public int getMaxIO() {
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public void openFile(JDBCSequentialFile file) throws SQLException {
}
}

void removeFile(JDBCSequentialFile file) {

}

/**
* Checks to see if a file with filename and extension exists. If so returns the ID of the file or returns -1.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand All @@ -41,6 +42,7 @@
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.apache.derby.jdbc.EmbeddedDriver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -59,9 +61,11 @@ public class JDBCSequentialFileFactoryTest {

private JDBCSequentialFileFactory factory;

private ExecutorService executor;

@Before
public void setup() throws Exception {
Executor executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());

String connectionUrl = "jdbc:derby:target/data;create=true";
String tableName = "FILES";
Expand All @@ -75,6 +79,7 @@ public void onIOException(Throwable code, String message, SequentialFile file) {

@After
public void tearDown() throws Exception {
executor.shutdown();
factory.destroy();
}

Expand All @@ -94,17 +99,26 @@ public void testJDBCFileFactoryStarted() throws Exception {
@Test
public void testCreateFiles() throws Exception {
int noFiles = 100;
List<SequentialFile> files = new LinkedList<>();

Set<String> fileNames = new HashSet<>();
for (int i = 0; i < noFiles; i++) {
String fileName = UUID.randomUUID().toString() + ".txt";
fileNames.add(fileName);
SequentialFile file = factory.createSequentialFile(fileName);
// We create files on Open
file.open();
files.add(file);
}

List<String> queryFileNames = factory.listFiles("txt");
assertTrue(queryFileNames.containsAll(fileNames));

for (SequentialFile file: files) {
file.close();
}

Assert.assertEquals(0, factory.getNumberOfOpenFiles());
}

@Test
Expand Down

0 comments on commit a98dccb

Please sign in to comment.