Skip to content

Commit

Permalink
CAMEL-12148: Reworked FileIdempontentRepository so LRUCache is only a…
Browse files Browse the repository at this point in the history
…ct as quick lookup. And in case of 1st-level miss the file store is checked. File store also writes the entries in the same order they are added.
  • Loading branch information
davsclaus committed Jan 16, 2018
1 parent d05ee46 commit 1a9f3ec
Show file tree
Hide file tree
Showing 3 changed files with 375 additions and 45 deletions.
Expand Up @@ -19,6 +19,8 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -39,24 +41,27 @@
/**
* A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
* <p/>
* Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
* memory leak.
* <p/>
* The default cache used is {@link LRUCache} which keeps the most used entries in the cache.
* When this cache is being used and the state of the cache is stored to file via {@link #trunkStore()}
* then the entries stored are not guaranteed to be in the exact order the entries were added to the cache.
* If you need exact ordering, then you need to provide a custom {@link Map} implementation that does that
* This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most
* frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used
* then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance.
* So try to find the right balance of the size of the 1st-level cache, the default size is 1000.
* The file store has a maximum capacity of 32mb by default. If the file store grows bigger, then
* the {@link #getDropOldestFileStore()} number of entries from the file store is dropped to reduce
* the file store and make room for newer entries.
*
* @version
*/
@ManagedResource(description = "File based idempotent repository")
public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
private static final String STORE_DELIMITER = "\n";

private final AtomicBoolean init = new AtomicBoolean();

private Map<String, Object> cache;
private File fileStore;
private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
private final AtomicBoolean init = new AtomicBoolean();
private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file
private long dropOldestFileStore = 1000;

public FileIdempotentRepository() {
}
Expand Down Expand Up @@ -123,12 +128,21 @@ public boolean add(String key) {
if (cache.containsKey(key)) {
return false;
} else {
// always register the most used keys in the LRUCache
cache.put(key, key);
if (fileStore.length() < maxFileStoreSize) {
// just append to store
appendToStore(key);
} else {
// trunk store and flush the cache

// now check the file store
boolean containsInFile = containsStore(key);
if (containsInFile) {
return false;
}

// its a new key so append to file store
appendToStore(key);

// check if we hit maximum capacity and report a warning about this
if (fileStore.length() > maxFileStoreSize) {
LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore);
trunkStore();
}

Expand All @@ -140,7 +154,8 @@ public boolean add(String key) {
@ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
synchronized (cache) {
return cache.containsKey(key);
// check 1st-level first and then fallback to check the actual file
return cache.containsKey(key) || containsStore(key);
}
}

Expand All @@ -149,8 +164,8 @@ public boolean remove(String key) {
boolean answer;
synchronized (cache) {
answer = cache.remove(key) != null;
// trunk store and flush the cache on remove
trunkStore();
// remove from file cache also
removeFromStore(key);
}
return answer;
}
Expand All @@ -160,13 +175,15 @@ public boolean confirm(String key) {
return true;
}

@ManagedOperation(description = "Clear the store")
@ManagedOperation(description = "Clear the store (danger this removes all entries)")
public void clear() {
synchronized (cache) {
cache.clear();
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
// clear file store
clearStore();
}
}

Expand Down Expand Up @@ -199,15 +216,30 @@ public long getMaxFileStoreSize() {
/**
* Sets the maximum file size for the file store in bytes.
* <p/>
* The default is 1mb.
* The default is 32mb.
*/
@ManagedAttribute(description = "The maximum file size for the file store in bytes")
public void setMaxFileStoreSize(long maxFileStoreSize) {
this.maxFileStoreSize = maxFileStoreSize;
}

public long getDropOldestFileStore() {
return dropOldestFileStore;
}

/**
* Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce
* disk space to allow room for new entries.
* <p/>
* The default is 1000.
*/
@ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached")
public void setDropOldestFileStore(long dropOldestFileStore) {
this.dropOldestFileStore = dropOldestFileStore;
}

/**
* Sets the cache size.
* Sets the 1st-level cache size.
*
* Setting cache size is only possible when using the default {@link LRUCache} cache implementation.
*/
Expand All @@ -222,7 +254,7 @@ public void setCacheSize(int size) {
cache = LRUCacheFactory.newLRUCache(size);
}

@ManagedAttribute(description = "The current cache size")
@ManagedAttribute(description = "The current 1st-level cache size")
public int getCacheSize() {
if (cache != null) {
return cache.size();
Expand All @@ -231,38 +263,68 @@ public int getCacheSize() {
}

/**
* Reset and clears the store to force it to reload from file
* Reset and clears the 1st-level cache to force it to reload from file
*/
@ManagedOperation(description = "Reset and reloads the file store")
public synchronized void reset() throws IOException {
synchronized (cache) {
// trunk and clear, before we reload the store
trunkStore();
cache.clear();
// run the cleanup task first
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
cache.clear();
loadStore();
}
}

/**
* Appends the given message id to the file store
* Checks the file store if the key exists
*
* @param messageId the message id
* @param key the key
* @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise
*/
protected void appendToStore(final String messageId) {
LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
protected boolean containsStore(final String key) {
if (fileStore == null || !fileStore.exists()) {
return false;
}

Scanner scanner = null;
try {
scanner = new Scanner(fileStore);
scanner.useDelimiter(STORE_DELIMITER);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (line.equals(key)) {
return true;
}
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
if (scanner != null) {
scanner.close();
}
}
return false;
}

/**
* Appends the given key to the file store
*
* @param key the key
*/
protected void appendToStore(final String key) {
LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore);
FileOutputStream fos = null;
try {
// create store parent directory if missing
File storeParentDirectory = fileStore.getParentFile();
if (storeParentDirectory != null && !storeParentDirectory.exists()) {
LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
if (fileStore.getParentFile().mkdirs()) {
LOG.info("Parent directory of file store {} successfully created.", fileStore);
LOG.info("Parent directory of filestore: {} successfully created.", fileStore);
} else {
LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore);
}
}
// create store if missing
Expand All @@ -271,7 +333,7 @@ protected void appendToStore(final String messageId) {
}
// append to store
fos = new FileOutputStream(fileStore, true);
fos.write(messageId.getBytes());
fos.write(key.getBytes());
fos.write(STORE_DELIMITER.getBytes());
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
Expand All @@ -280,23 +342,125 @@ protected void appendToStore(final String messageId) {
}
}

protected synchronized void removeFromStore(String key) {
LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore);

// we need to re-load the entire file and remove the key and then re-write the file
List<String> lines = new ArrayList<>();

boolean found = false;
Scanner scanner = null;
try {
scanner = new Scanner(fileStore);
scanner.useDelimiter(STORE_DELIMITER);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (key.equals(line)) {
found = true;
} else {
lines.add(line);
}
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
if (scanner != null) {
scanner.close();
}
}

if (found) {
// rewrite file
LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(fileStore);
for (String line : lines) {
fos.write(line.getBytes());
fos.write(STORE_DELIMITER.getBytes());
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
}
}
}

/**
* Trunks the file store when the max store size is hit by rewriting the 1st level cache
* to the file store.
* Clears the file-store (danger this deletes all entries)
*/
protected void trunkStore() {
LOG.info("Trunking idempotent filestore: {}", fileStore);
FileOutputStream fos = null;
protected void clearStore() {
try {
FileUtil.deleteFile(fileStore);
FileUtil.createNewFile(fileStore);
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}

/**
* Trunks the file store when the max store size is hit by dropping the most oldest entries.
*/
protected synchronized void trunkStore() {
if (fileStore == null || !fileStore.exists()) {
return;
}

LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore);

// we need to re-load the entire file and remove the key and then re-write the file
List<String> lines = new ArrayList<>();

Scanner scanner = null;
int count = 0;
try {
fos = new FileOutputStream(fileStore);
for (String key : cache.keySet()) {
fos.write(key.getBytes());
fos.write(STORE_DELIMITER.getBytes());
scanner = new Scanner(fileStore);
scanner.useDelimiter(STORE_DELIMITER);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
count++;
if (count > dropOldestFileStore) {
lines.add(line);
}
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
IOHelper.close(fos, "Trunking file idempotent repository", LOG);
if (scanner != null) {
scanner.close();
}
}

if (!lines.isEmpty()) {
// rewrite file
LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size());
FileOutputStream fos = null;
try {
fos = new FileOutputStream(fileStore);
for (String line : lines) {
fos.write(line.getBytes());
fos.write(STORE_DELIMITER.getBytes());
}
} catch (IOException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} finally {
IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
}
} else {
// its a small file so recreate the file
LOG.debug("Clearing idempotent filestore: {}", fileStore);
clearStore();
}
}

/**
* Cleanup the 1st-level cache.
*/
protected void cleanup() {
// run the cleanup task first
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}
}

Expand Down Expand Up @@ -357,12 +521,12 @@ protected void doStart() throws Exception {

@Override
protected void doStop() throws Exception {
// reset will trunk and clear the cache
trunkStore();
cache.clear();
// run the cleanup task first
if (cache instanceof LRUCache) {
((LRUCache) cache).cleanUp();
}

cache.clear();
init.set(false);
}

Expand Down

0 comments on commit 1a9f3ec

Please sign in to comment.