Skip to content

Commit

Permalink
simplified code
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Stenzel committed Aug 2, 2015
1 parent ed7dc60 commit ecb178d
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 124 deletions.
Expand Up @@ -21,21 +21,9 @@
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


import javax.crypto.BadPaddingException; import javax.crypto.BadPaddingException;
import javax.crypto.Cipher; import javax.crypto.Cipher;
Expand Down Expand Up @@ -428,22 +416,11 @@ public Long decryptFile(SeekableByteChannel encryptedFile, OutputStream plaintex
final Long fileSize = sensitiveHeaderContentBuf.getLong(); final Long fileSize = sensitiveHeaderContentBuf.getLong();
sensitiveHeaderContentBuf.get(fileKeyBytes); sensitiveHeaderContentBuf.get(fileKeyBytes);


// content decryption: // prepare content decryption:
encryptedFile.position(104l);
final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM); final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM);

// prepare some crypto workers:
final int numWorkers = Runtime.getRuntime().availableProcessors();
final Lock lock = new ReentrantLock();
final Condition blockDone = lock.newCondition();
final AtomicLong currentBlock = new AtomicLong();
final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
final LengthLimitingOutputStream paddingRemovingOutputStream = new LengthLimitingOutputStream(plaintextFile, fileSize); final LengthLimitingOutputStream paddingRemovingOutputStream = new LengthLimitingOutputStream(plaintextFile, fileSize);
final List<DecryptWorker> workers = new ArrayList<>(); final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> {
final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers); return new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {
final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (int i = 0; i < numWorkers; i++) {
final DecryptWorker worker = new DecryptWorker(lock, blockDone, currentBlock, inputQueue, authenticate, Channels.newChannel(paddingRemovingOutputStream)) {


@Override @Override
protected Cipher initCipher(long startBlockNum) { protected Cipher initCipher(long startBlockNum) {
Expand Down Expand Up @@ -483,48 +460,33 @@ protected void decrypt(Cipher cipher, ByteBuffer ciphertextBuf, ByteBuffer plain
} }


}; };
workers.add(worker); });
completionService.submit(worker);
}


// reading ciphered input and MACs interleaved: // read as many blocks from file as possible, but wait if queue is full:
encryptedFile.position(104l);
final int maxNumBlocks = 64;
int numBlocks = 1;
int bytesRead = 0; int bytesRead = 0;
long blockNumber = 0; long blockNumber = 0;
try { do {
// read as many blocks from file as possible, but wait if queue is full: if (numBlocks < maxNumBlocks) {
final int maxNumBlocks = 128; numBlocks++;
int numBlocks = 0;
do {
if (numBlocks < maxNumBlocks) {
numBlocks++;
}
final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32);
final ByteBuffer buf = ByteBuffer.allocate(inBufSize);
bytesRead = encryptedFile.read(buf);
buf.flip();
final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32));
final boolean consumedInTime = inputQueue.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
if (!consumedInTime) {
// interrupt read loop and make room for some poisons:
inputQueue.clear();
break;
}
blockNumber += numBlocks;
} while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32));

// each worker has to swallow some poison:
for (int i = 0; i < numWorkers; i++) {
inputQueue.put(CryptoWorker.POISON);
} }
} catch (InterruptedException e) { final int inBufSize = numBlocks * (CONTENT_MAC_BLOCK + 32);
LOG.error("Thread interrupted", e); final ByteBuffer buf = ByteBuffer.allocate(inBufSize);
} bytesRead = encryptedFile.read(buf);
buf.flip();
final int blocksRead = (int) Math.ceil(bytesRead / (double) (CONTENT_MAC_BLOCK + 32));
final boolean consumedInTime = executor.offer(new BlocksData(buf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
if (!consumedInTime) {
break;
}
blockNumber += numBlocks;
} while (bytesRead == numBlocks * (CONTENT_MAC_BLOCK + 32));


// wait for decryption workers to finish: // wait for decryption workers to finish:
try { try {
for (int i = 0; i < numWorkers; i++) { executor.waitUntilDone();
completionService.take().get();
}
} catch (ExecutionException e) { } catch (ExecutionException e) {
final Throwable cause = e.getCause(); final Throwable cause = e.getCause();
if (cause instanceof IOException) { if (cause instanceof IOException) {
Expand All @@ -534,14 +496,10 @@ protected void decrypt(Cipher cipher, ByteBuffer ciphertextBuf, ByteBuffer plain
} else { } else {
LOG.error("Unexpected exception", e); LOG.error("Unexpected exception", e);
} }
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
} finally { } finally {
// shutdown either after normal decryption or if ANY worker threw an exception: destroyQuietly(fileKey);
executorService.shutdownNow();
} }


destroyQuietly(fileKey);
return paddingRemovingOutputStream.getBytesWritten(); return paddingRemovingOutputStream.getBytesWritten();
} }


Expand Down Expand Up @@ -674,24 +632,10 @@ public Long encryptFile(InputStream plaintextFile, SeekableByteChannel encrypted
headerBuf.limit(104); headerBuf.limit(104);
encryptedFile.write(headerBuf); encryptedFile.write(headerBuf);


// add random length padding to obfuscate file length: // prepare content encryption:
final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH);
final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding);

// content encryption:
final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM); final SecretKey fileKey = new SecretKeySpec(fileKeyBytes, AES_KEY_ALGORITHM);

final CryptoWorkerExecutor executor = new CryptoWorkerExecutor(Runtime.getRuntime().availableProcessors(), (lock, blockDone, currentBlock, inputQueue) -> {
// prepare some crypto workers: return new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {
final int numWorkers = Runtime.getRuntime().availableProcessors();
final Lock lock = new ReentrantLock();
final Condition blockDone = lock.newCondition();
final AtomicLong currentBlock = new AtomicLong();
final BlockingQueue<BlocksData> inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
final List<EncryptWorker> workers = new ArrayList<>();
final ExecutorService executorService = Executors.newFixedThreadPool(numWorkers);
final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (int i = 0; i < numWorkers; i++) {
final EncryptWorker worker = new EncryptWorker(lock, blockDone, currentBlock, inputQueue, encryptedFile) {


@Override @Override
protected Cipher initCipher(long startBlockNum) { protected Cipher initCipher(long startBlockNum) {
Expand Down Expand Up @@ -725,49 +669,35 @@ protected void encrypt(Cipher cipher, ByteBuffer plaintextBuf, ByteBuffer cipher
} }
} }
}; };
workers.add(worker); });
completionService.submit(worker);
}


// writing ciphered output and MACs interleaved: // read as many blocks from file as possible, but wait if queue is full:
final byte[] randomPadding = this.randomData(AES_BLOCK_LENGTH);
final LengthObfuscatingInputStream in = new LengthObfuscatingInputStream(plaintextFile, randomPadding);
final ReadableByteChannel channel = Channels.newChannel(in);
int bytesRead = 0; int bytesRead = 0;
long blockNumber = 0; long blockNumber = 0;
try { final int maxNumBlocks = 64;
final ReadableByteChannel channel = Channels.newChannel(in); int numBlocks = 0;
// read as many blocks from file as possible, but wait if queue is full: do {
final int maxNumBlocks = 128; if (numBlocks < maxNumBlocks) {
int numBlocks = 0; numBlocks++;
do {
if (numBlocks < maxNumBlocks) {
numBlocks++;
}
final int inBufSize = numBlocks * CONTENT_MAC_BLOCK;
final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize);
bytesRead = channel.read(inBuf);
inBuf.flip();
final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK);
final boolean consumedInTime = inputQueue.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
if (!consumedInTime) {
// interrupt read loop and make room for some poisons:
inputQueue.clear();
break;
}
blockNumber += numBlocks;
} while (bytesRead == numBlocks * CONTENT_MAC_BLOCK);

// each worker has to swallow some poison:
for (int i = 0; i < numWorkers; i++) {
inputQueue.put(CryptoWorker.POISON);
} }
} catch (InterruptedException e) { final int inBufSize = numBlocks * CONTENT_MAC_BLOCK;
LOG.error("Thread interrupted", e); final ByteBuffer inBuf = ByteBuffer.allocate(inBufSize);
} bytesRead = channel.read(inBuf);
inBuf.flip();
final int blocksRead = (int) Math.ceil(bytesRead / (double) CONTENT_MAC_BLOCK);
final boolean consumedInTime = executor.offer(new BlocksData(inBuf.asReadOnlyBuffer(), blockNumber, blocksRead), 1, TimeUnit.SECONDS);
if (!consumedInTime) {
break;
}
blockNumber += numBlocks;
} while (bytesRead == numBlocks * CONTENT_MAC_BLOCK);


// wait for encryption workers to finish: // wait for encryption workers to finish:
try { try {
for (int i = 0; i < numWorkers; i++) { executor.waitUntilDone();
completionService.take().get();
}
} catch (ExecutionException e) { } catch (ExecutionException e) {
final Throwable cause = e.getCause(); final Throwable cause = e.getCause();
if (cause instanceof IOException) { if (cause instanceof IOException) {
Expand All @@ -777,13 +707,9 @@ protected void encrypt(Cipher cipher, ByteBuffer plaintextBuf, ByteBuffer cipher
} else { } else {
LOG.error("Unexpected exception", e); LOG.error("Unexpected exception", e);
} }
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
} finally { } finally {
// shutdown either after normal encryption or if ANY worker threw an exception: destroyQuietly(fileKey);
executorService.shutdownNow();
} }
destroyQuietly(fileKey);


// create and write header: // create and write header:
final long plaintextSize = in.getRealInputLength(); final long plaintextSize = in.getRealInputLength();
Expand Down
@@ -0,0 +1,112 @@
package org.cryptomator.crypto.aes256;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CryptoWorkerExecutor {

private static final Logger LOG = LoggerFactory.getLogger(CryptoWorkerExecutor.class);

private final int numWorkers;
private final Lock lock;
private final Condition blockDone;
private final AtomicLong currentBlock;
private final BlockingQueue<BlocksData> inputQueue;
private final ExecutorService executorService;
private final CompletionService<Void> completionService;
private boolean acceptWork;

/**
* Starts as many {@link CryptoWorker} as specified in the constructor, that start working immediately on the items submitted via {@link #offer(BlocksData, long, TimeUnit)}.
*/
public CryptoWorkerExecutor(int numWorkers, WorkerFactory workerFactory) {
this.numWorkers = numWorkers;
this.lock = new ReentrantLock();
this.blockDone = lock.newCondition();
this.currentBlock = new AtomicLong();
this.inputQueue = new LinkedBlockingQueue<>(numWorkers * 2); // one cycle read-ahead
this.executorService = Executors.newFixedThreadPool(numWorkers);
this.completionService = new ExecutorCompletionService<>(executorService);
this.acceptWork = true;

// start workers:
for (int i = 0; i < numWorkers; i++) {
final CryptoWorker worker = workerFactory.createWorker(lock, blockDone, currentBlock, inputQueue);
completionService.submit(worker);
}
}

/**
* Adds work to the work queue. On timeout all workers will be shut down.
*
* @see BlockingQueue#offer(Object, long, TimeUnit)
* @return <code>true</code> if the work has been added in time. <code>false</code> in any other case.
*/
public boolean offer(BlocksData data, long timeout, TimeUnit unit) {
if (!acceptWork) {
return false;
}
try {
final boolean success = inputQueue.offer(data, timeout, unit);
if (!success) {
this.acceptWork = false;
inputQueue.clear();
poisonWorkers();
}
return success;
} catch (InterruptedException e) {
LOG.error("Interrupted thread.", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
return false;
}

/**
* Graceful shutdown of this executor, waiting for all jobs to finish (normally or by throwing exceptions).
*
* @throws ExecutionException If any of the workers failed.
*/
public void waitUntilDone() throws ExecutionException {
this.acceptWork = false;
try {
poisonWorkers();
// now workers will one after another finish their work, potentially throwing an ExecutionException:
for (int i = 0; i < numWorkers; i++) {
completionService.take().get();
}
} catch (InterruptedException e) {
LOG.error("Interrupted thread.", e);
Thread.currentThread().interrupt();
} finally {
// shutdown either after normal decryption or if ANY worker threw an exception:
executorService.shutdownNow();
}
}

private void poisonWorkers() throws InterruptedException {
// add enough poison for each worker:
for (int i = 0; i < numWorkers; i++) {
inputQueue.put(CryptoWorker.POISON);
}
}

@FunctionalInterface
interface WorkerFactory {
CryptoWorker createWorker(Lock lock, Condition blockDone, AtomicLong currentBlock, BlockingQueue<BlocksData> inputQueue);
}

}

0 comments on commit ecb178d

Please sign in to comment.