Skip to content

Commit

Permalink
Simplified FifoParallelDataProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
overheadhunter committed Jan 1, 2016
1 parent c3652a2 commit 9665ca8
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.cryptomator.common.UncheckedInterruptedException;

Expand All @@ -30,35 +26,16 @@
*/
class FifoParallelDataProcessor<T> {

private final BlockingQueue<SequencedFutureResult> processedData = new PriorityBlockingQueue<>();
private final AtomicLong jobSequence = new AtomicLong();
private final BlockingQueue<Runnable> workQueue;
private final BlockingQueue<Future<T>> processedData;
private final ExecutorService executorService;

/**
* @param numThreads How many jobs can run in parallel.
* @param workQueueSize Maximum number of jobs accepted without blocking, when no results are polled from {@link #processedData()}.
* @param workAhead Maximum number of jobs accepted in {@link #submit(Callable)} without blocking until results are polled from {@link #processedData()}.
*/
public FifoParallelDataProcessor(int numThreads, int workQueueSize) {
this.workQueue = new ArrayBlockingQueue<>(workQueueSize);
this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, workQueue, this::rejectedExecution);
}

/**
* Enqueues tasks into the blocking queue, if they can not be executed immediately.
*
* @see ThreadPoolExecutor#execute(Runnable)
* @see RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)
*/
private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor has been shut down.");
}
try {
this.workQueue.put(r);
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e);
}
public FifoParallelDataProcessor(int numThreads, int workAhead) {
this.processedData = new ArrayBlockingQueue<>(workAhead);
this.executorService = Executors.newFixedThreadPool(numThreads);
}

/**
Expand All @@ -70,7 +47,7 @@ private void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
void submit(Callable<T> processingJob) throws InterruptedException {
try {
Future<T> future = executorService.submit(processingJob);
processedData.offer(new SequencedFutureResult(future, jobSequence.getAndIncrement()));
processedData.put(future);
} catch (UncheckedInterruptedException e) {
throw e.getCause();
}
Expand All @@ -94,36 +71,15 @@ void submitPreprocessed(T preprocessedData) throws InterruptedException {
* @throws InterruptedException If the calling thread was interrupted while waiting for the next result.
*/
T processedData() throws InterruptedException {
return processedData.take().get();
}

private class SequencedFutureResult implements Comparable<SequencedFutureResult> {

private final Future<T> result;
private final long sequenceNumber;

public SequencedFutureResult(Future<T> result, long sequenceNumber) {
this.result = result;
this.sequenceNumber = sequenceNumber;
}

public T get() throws InterruptedException {
try {
return result.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
try {
return processedData.take().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
}

@Override
public int compareTo(SequencedFutureResult other) {
return Long.compare(this.sequenceNumber, other.sequenceNumber);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,36 +63,33 @@ public void testStrictFifoOrder() throws InterruptedException {
@Test
public void testBlockingBehaviour() throws InterruptedException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
processor.submit(new IntegerJob(100, 1)); // runs immediatley
processor.submit(new IntegerJob(100, 2)); // #1 in queue
processor.submitPreprocessed(1); // #1 in queue

Thread t1 = new Thread(() -> {
try {
processor.submit(new IntegerJob(10, 3)); // #2 in queue
processor.submitPreprocessed(2); // #2 in queue
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
t1.start();
t1.join(10);
// job 3 should not have been submitted by now, thus t1 is still alive
// job 2 should not have been submitted by now, thus t1 is still alive
Assert.assertTrue(t1.isAlive());
Assert.assertEquals(1, (int) processor.processedData());
Assert.assertEquals(2, (int) processor.processedData());
Assert.assertEquals(3, (int) processor.processedData());
Assert.assertFalse(t1.isAlive());
t1.join();
}

@Test
public void testInterruptionDuringSubmission() throws InterruptedException {
FifoParallelDataProcessor<Integer> processor = new FifoParallelDataProcessor<>(1, 1);
processor.submit(new IntegerJob(100, 1)); // runs immediatley
processor.submit(new IntegerJob(100, 2)); // #1 in queue
processor.submitPreprocessed(1); // #1 in queue

final AtomicBoolean interruptedExceptionThrown = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
try {
processor.submit(new IntegerJob(10, 3)); // #2 in queue
processor.submitPreprocessed(2); // #2 in queue
} catch (InterruptedException e) {
interruptedExceptionThrown.set(true);
Thread.currentThread().interrupt();
Expand All @@ -102,11 +99,10 @@ public void testInterruptionDuringSubmission() throws InterruptedException {
t1.join(10);
t1.interrupt();
t1.join(10);
// job 3 should not have been submitted by now, thus t1 is still alive
// job 2 should not have been submitted by now, thus t1 is still alive
Assert.assertFalse(t1.isAlive());
Assert.assertTrue(interruptedExceptionThrown.get());
Assert.assertEquals(1, (int) processor.processedData());
Assert.assertEquals(2, (int) processor.processedData());
}

private static class IntegerJob implements Callable<Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,57 +104,67 @@ public void testEncryptionAndDecryption() throws InterruptedException {
Assert.assertArrayEquals("cleartext message".getBytes(), result);
}

@Test
@Test(timeout = 20000) // assuming a minimum speed of 10mb/s during encryption and decryption 20s should be enough
public void testEncryptionAndDecryptionSpeed() throws InterruptedException, IOException {
final byte[] keyBytes = new byte[32];
final SecretKey encryptionKey = new SecretKeySpec(keyBytes, "AES");
final SecretKey macKey = new SecretKeySpec(keyBytes, "HmacSHA256");
final FileContentCryptor cryptor = new FileContentCryptorImpl(encryptionKey, macKey, RANDOM_MOCK);
final Path tmpFile = Files.createTempFile("encrypted", ".tmp");

final Thread fileWriter;
final ByteBuffer header;
final long encStart = System.nanoTime();
try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty()); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) {
final ByteBuffer cleartext = ByteBuffer.allocate(32768); // 32k
ByteBuffer ciphertext;
for (int i = 0; i < 4096; i++) { // 128M total
try (FileContentEncryptor encryptor = cryptor.createFileContentEncryptor(Optional.empty())) {
fileWriter = new Thread(() -> {
try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.WRITE)) {
ByteBuffer ciphertext;
while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) {
fc.write(ciphertext);
}
} catch (Exception e) {
e.printStackTrace();
}
});
fileWriter.start();

final ByteBuffer cleartext = ByteBuffer.allocate(100000); // 100k
for (int i = 0; i < 1000; i++) { // 100M total
cleartext.rewind();
encryptor.append(cleartext);
if (i > Runtime.getRuntime().availableProcessors()) {
ciphertext = encryptor.ciphertext();
Assert.assertEquals(32 * 1024 + 32, ciphertext.remaining());
fc.write(ciphertext);
}
}
encryptor.append(FileContentCryptor.EOF);
while ((ciphertext = encryptor.ciphertext()) != FileContentCryptor.EOF) {
fc.write(ciphertext);
}
header = encryptor.getHeader();
}
fileWriter.join();
final long encEnd = System.nanoTime();
LOG.debug("Encryption of 128M took {}ms", (encEnd - encStart) / 1000 / 1000);
LOG.debug("Encryption of 100M took {}ms", (encEnd - encStart) / 1000 / 1000);

final Thread fileReader;
final long decStart = System.nanoTime();
try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header); FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) {
final ByteBuffer ciphertext = ByteBuffer.allocate(654321);
ByteBuffer cleartext;
for (int i = 0; fc.read(ciphertext) != -1; i++) {
ciphertext.flip();
decryptor.append(ciphertext);
ciphertext.clear();
if (i > Runtime.getRuntime().availableProcessors()) {
cleartext = decryptor.cleartext();
Assert.assertTrue(cleartext.hasRemaining());
try (FileContentDecryptor decryptor = cryptor.createFileContentDecryptor(header)) {
fileReader = new Thread(() -> {
try (FileChannel fc = FileChannel.open(tmpFile, StandardOpenOption.READ)) {
ByteBuffer ciphertext = ByteBuffer.allocate(654321);
while (fc.read(ciphertext) != -1) {
ciphertext.flip();
decryptor.append(ciphertext);
ciphertext.clear();
}
decryptor.append(FileContentCryptor.EOF);
} catch (Exception e) {
e.printStackTrace();
}
}
decryptor.append(FileContentCryptor.EOF);
});
fileReader.start();

while (decryptor.cleartext() != FileContentCryptor.EOF) {
// no-op
}
}
fileReader.join();
final long decEnd = System.nanoTime();
LOG.debug("Decryption of 128M took {}ms", (decEnd - decStart) / 1000 / 1000);
LOG.debug("Decryption of 100M took {}ms", (decEnd - decStart) / 1000 / 1000);
Files.delete(tmpFile);
}
}

0 comments on commit 9665ca8

Please sign in to comment.