From 1a9f3ecb9e973deded01ab465baca26977a95db4 Mon Sep 17 00:00:00 2001 From: Claus Ibsen Date: Tue, 16 Jan 2018 15:18:04 +0100 Subject: [PATCH] CAMEL-12148: Reworked FileIdempontentRepository so LRUCache is only act as quick lookup. And in case of 1st-level miss the file store is checked. File store also writes the entries in the same order they are added. --- .../idempotent/FileIdempotentRepository.java | 254 ++++++++++++++---- .../FileIdempotentTrunkStoreTest.java | 15 ++ .../FileIdempotentStoreOrderingTest.java | 151 +++++++++++ 3 files changed, 375 insertions(+), 45 deletions(-) create mode 100644 camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java index 4343331dfe228..339838b6ccbe1 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java @@ -19,6 +19,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,13 +41,13 @@ /** * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}. *

- * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a - * memory leak. - *

- * The default cache used is {@link LRUCache} which keeps the most used entries in the cache. - * When this cache is being used and the state of the cache is stored to file via {@link #trunkStore()} - * then the entries stored are not guaranteed to be in the exact order the entries were added to the cache. - * If you need exact ordering, then you need to provide a custom {@link Map} implementation that does that + * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most + * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used + * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance. + * So try to find the right balance of the size of the 1st-level cache, the default size is 1000. + * The file store has a maximum capacity of 32mb by default. If the file store grows bigger, then + * the {@link #getDropOldestFileStore()} number of entries from the file store is dropped to reduce + * the file store and make room for newer entries. * * @version */ @@ -53,10 +55,13 @@ public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository { private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class); private static final String STORE_DELIMITER = "\n"; + + private final AtomicBoolean init = new AtomicBoolean(); + private Map cache; private File fileStore; - private long maxFileStoreSize = 1024 * 1000L; // 1mb store file - private final AtomicBoolean init = new AtomicBoolean(); + private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file + private long dropOldestFileStore = 1000; public FileIdempotentRepository() { } @@ -123,12 +128,21 @@ public boolean add(String key) { if (cache.containsKey(key)) { return false; } else { + // always register the most used keys in the LRUCache cache.put(key, key); - if (fileStore.length() < maxFileStoreSize) { - // just append to store - appendToStore(key); - } else { - // trunk store and flush the cache + + // now check the file store + boolean containsInFile = containsStore(key); + if (containsInFile) { + return false; + } + + // its a new key so append to file store + appendToStore(key); + + // check if we hit maximum capacity and report a warning about this + if (fileStore.length() > maxFileStoreSize) { + LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore); trunkStore(); } @@ -140,7 +154,8 @@ public boolean add(String key) { @ManagedOperation(description = "Does the store contain the given key") public boolean contains(String key) { synchronized (cache) { - return cache.containsKey(key); + // check 1st-level first and then fallback to check the actual file + return cache.containsKey(key) || containsStore(key); } } @@ -149,8 +164,8 @@ public boolean remove(String key) { boolean answer; synchronized (cache) { answer = cache.remove(key) != null; - // trunk store and flush the cache on remove - trunkStore(); + // remove from file cache also + removeFromStore(key); } return answer; } @@ -160,13 +175,15 @@ public boolean confirm(String key) { return true; } - @ManagedOperation(description = "Clear the store") + @ManagedOperation(description = "Clear the store (danger this removes all entries)") public void clear() { synchronized (cache) { cache.clear(); if (cache instanceof LRUCache) { ((LRUCache) cache).cleanUp(); } + // clear file store + clearStore(); } } @@ -199,15 +216,30 @@ public long getMaxFileStoreSize() { /** * Sets the maximum file size for the file store in bytes. *

- * The default is 1mb. + * The default is 32mb. */ @ManagedAttribute(description = "The maximum file size for the file store in bytes") public void setMaxFileStoreSize(long maxFileStoreSize) { this.maxFileStoreSize = maxFileStoreSize; } + public long getDropOldestFileStore() { + return dropOldestFileStore; + } + + /** + * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce + * disk space to allow room for new entries. + *

+ * The default is 1000. + */ + @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached") + public void setDropOldestFileStore(long dropOldestFileStore) { + this.dropOldestFileStore = dropOldestFileStore; + } + /** - * Sets the cache size. + * Sets the 1st-level cache size. * * Setting cache size is only possible when using the default {@link LRUCache} cache implementation. */ @@ -222,7 +254,7 @@ public void setCacheSize(int size) { cache = LRUCacheFactory.newLRUCache(size); } - @ManagedAttribute(description = "The current cache size") + @ManagedAttribute(description = "The current 1st-level cache size") public int getCacheSize() { if (cache != null) { return cache.size(); @@ -231,28 +263,58 @@ public int getCacheSize() { } /** - * Reset and clears the store to force it to reload from file + * Reset and clears the 1st-level cache to force it to reload from file */ @ManagedOperation(description = "Reset and reloads the file store") public synchronized void reset() throws IOException { synchronized (cache) { - // trunk and clear, before we reload the store - trunkStore(); - cache.clear(); + // run the cleanup task first if (cache instanceof LRUCache) { ((LRUCache) cache).cleanUp(); } + cache.clear(); loadStore(); } } /** - * Appends the given message id to the file store + * Checks the file store if the key exists * - * @param messageId the message id + * @param key the key + * @return true if exists in the file, false otherwise */ - protected void appendToStore(final String messageId) { - LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore); + protected boolean containsStore(final String key) { + if (fileStore == null || !fileStore.exists()) { + return false; + } + + Scanner scanner = null; + try { + scanner = new Scanner(fileStore); + scanner.useDelimiter(STORE_DELIMITER); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (line.equals(key)) { + return true; + } + } + } catch (IOException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + if (scanner != null) { + scanner.close(); + } + } + return false; + } + + /** + * Appends the given key to the file store + * + * @param key the key + */ + protected void appendToStore(final String key) { + LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore); FileOutputStream fos = null; try { // create store parent directory if missing @@ -260,9 +322,9 @@ protected void appendToStore(final String messageId) { if (storeParentDirectory != null && !storeParentDirectory.exists()) { LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore); if (fileStore.getParentFile().mkdirs()) { - LOG.info("Parent directory of file store {} successfully created.", fileStore); + LOG.info("Parent directory of filestore: {} successfully created.", fileStore); } else { - LOG.warn("Parent directory of file store {} cannot be created.", fileStore); + LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore); } } // create store if missing @@ -271,7 +333,7 @@ protected void appendToStore(final String messageId) { } // append to store fos = new FileOutputStream(fileStore, true); - fos.write(messageId.getBytes()); + fos.write(key.getBytes()); fos.write(STORE_DELIMITER.getBytes()); } catch (IOException e) { throw ObjectHelper.wrapRuntimeCamelException(e); @@ -280,23 +342,125 @@ protected void appendToStore(final String messageId) { } } + protected synchronized void removeFromStore(String key) { + LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore); + + // we need to re-load the entire file and remove the key and then re-write the file + List lines = new ArrayList<>(); + + boolean found = false; + Scanner scanner = null; + try { + scanner = new Scanner(fileStore); + scanner.useDelimiter(STORE_DELIMITER); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (key.equals(line)) { + found = true; + } else { + lines.add(line); + } + } + } catch (IOException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + if (scanner != null) { + scanner.close(); + } + } + + if (found) { + // rewrite file + LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(fileStore); + for (String line : lines) { + fos.write(line.getBytes()); + fos.write(STORE_DELIMITER.getBytes()); + } + } catch (IOException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + IOHelper.close(fos, "Rewriting file idempotent repository", LOG); + } + } + } + /** - * Trunks the file store when the max store size is hit by rewriting the 1st level cache - * to the file store. + * Clears the file-store (danger this deletes all entries) */ - protected void trunkStore() { - LOG.info("Trunking idempotent filestore: {}", fileStore); - FileOutputStream fos = null; + protected void clearStore() { + try { + FileUtil.deleteFile(fileStore); + FileUtil.createNewFile(fileStore); + } catch (IOException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + /** + * Trunks the file store when the max store size is hit by dropping the most oldest entries. + */ + protected synchronized void trunkStore() { + if (fileStore == null || !fileStore.exists()) { + return; + } + + LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore); + + // we need to re-load the entire file and remove the key and then re-write the file + List lines = new ArrayList<>(); + + Scanner scanner = null; + int count = 0; try { - fos = new FileOutputStream(fileStore); - for (String key : cache.keySet()) { - fos.write(key.getBytes()); - fos.write(STORE_DELIMITER.getBytes()); + scanner = new Scanner(fileStore); + scanner.useDelimiter(STORE_DELIMITER); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + count++; + if (count > dropOldestFileStore) { + lines.add(line); + } } } catch (IOException e) { throw ObjectHelper.wrapRuntimeCamelException(e); } finally { - IOHelper.close(fos, "Trunking file idempotent repository", LOG); + if (scanner != null) { + scanner.close(); + } + } + + if (!lines.isEmpty()) { + // rewrite file + LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size()); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(fileStore); + for (String line : lines) { + fos.write(line.getBytes()); + fos.write(STORE_DELIMITER.getBytes()); + } + } catch (IOException e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + IOHelper.close(fos, "Rewriting file idempotent repository", LOG); + } + } else { + // its a small file so recreate the file + LOG.debug("Clearing idempotent filestore: {}", fileStore); + clearStore(); + } + } + + /** + * Cleanup the 1st-level cache. + */ + protected void cleanup() { + // run the cleanup task first + if (cache instanceof LRUCache) { + ((LRUCache) cache).cleanUp(); } } @@ -357,12 +521,12 @@ protected void doStart() throws Exception { @Override protected void doStop() throws Exception { - // reset will trunk and clear the cache - trunkStore(); - cache.clear(); + // run the cleanup task first if (cache instanceof LRUCache) { ((LRUCache) cache).cleanUp(); } + + cache.clear(); init.set(false); } diff --git a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java index be3dc36deed99..0055d57d5256f 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/FileIdempotentTrunkStoreTest.java @@ -17,6 +17,10 @@ package org.apache.camel.processor; import java.io.File; +import java.nio.file.Files; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.camel.ContextTestSupport; import org.apache.camel.Endpoint; @@ -27,6 +31,8 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.idempotent.FileIdempotentRepository; import org.apache.camel.spi.IdempotentRepository; +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.Assert; /** * @version @@ -60,6 +66,15 @@ public void testTrunkFileStore() throws Exception { resultEndpoint.assertIsSatisfied(); assertTrue(repo.contains("XXXXXXXXXX")); + + // check the file should only have the last 2 entries as it was trunked + Stream fileContent = Files.lines(store.toPath()); + List fileEntries = fileContent.collect(Collectors.toList()); + fileContent.close(); + //expected order + Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains( + "ZZZZZZZZZZ", + "XXXXXXXXXX")); } protected void sendMessage(final Object messageId, final Object body) { diff --git a/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java new file mode 100644 index 0000000000000..da7d5722336fc --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/idempotent/FileIdempotentStoreOrderingTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.idempotent; + +import java.io.File; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.camel.TestSupport.createDirectory; +import static org.apache.camel.TestSupport.deleteDirectory; + +public class FileIdempotentStoreOrderingTest { + + private FileIdempotentRepository fileIdempotentRepository; + private List files; + + @Before + public void setup() { + files = Arrays.asList( + "file1.txt.20171123", + "file2.txt.20171123", + "file1.txt.20171124", + "file3.txt.20171125", + "file2.txt.20171126", + "fixed.income.lamr.out.20171126", + "pricing.px.20171126", + "test.out.20171126", + "processing.source.lamr.out.20171126"); + this.fileIdempotentRepository = new FileIdempotentRepository(); + } + + @Test + public void testTrunkStoreNotMaxHit() throws Exception { + // ensure empty folder + deleteDirectory("target/mystore"); + createDirectory("target/mystore"); + + //given + File fileStore = new File("target/mystore/data.dat"); + fileIdempotentRepository.setFileStore(fileStore); + fileIdempotentRepository.setCacheSize(10); + fileIdempotentRepository.start(); + files.forEach(e -> fileIdempotentRepository.add(e)); + + //when (will rebalance) + fileIdempotentRepository.stop(); + + //then + Stream fileContent = Files.lines(fileStore.toPath()); + List fileEntries = fileContent.collect(Collectors.toList()); + fileContent.close(); + //expected order + Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains( + "file1.txt.20171123", + "file2.txt.20171123", + "file1.txt.20171124", + "file3.txt.20171125", + "file2.txt.20171126", + "fixed.income.lamr.out.20171126", + "pricing.px.20171126", + "test.out.20171126", + "processing.source.lamr.out.20171126")); + } + + @Test + public void testTrunkStoreFirstLevelMaxHit() throws Exception { + // ensure empty folder + deleteDirectory("target/mystore"); + createDirectory("target/mystore"); + + //given + File fileStore = new File("target/mystore/data.dat"); + fileIdempotentRepository.setFileStore(fileStore); + fileIdempotentRepository.setCacheSize(5); + fileIdempotentRepository.start(); + files.forEach(e -> fileIdempotentRepository.add(e)); + + //when (will rebalance) + fileIdempotentRepository.stop(); + + //then + Stream fileContent = Files.lines(fileStore.toPath()); + List fileEntries = fileContent.collect(Collectors.toList()); + fileContent.close(); + //expected order + Assert.assertThat(fileEntries, IsIterableContainingInOrder.contains( + "file1.txt.20171123", + "file2.txt.20171123", + "file1.txt.20171124", + "file3.txt.20171125", + "file2.txt.20171126", + "fixed.income.lamr.out.20171126", + "pricing.px.20171126", + "test.out.20171126", + "processing.source.lamr.out.20171126")); + } + + @Test + public void testTrunkStoreFileMaxHit() throws Exception { + // ensure empty folder + deleteDirectory("target/mystore"); + createDirectory("target/mystore"); + + //given + File fileStore = new File("target/mystore/data.dat"); + fileIdempotentRepository.setFileStore(fileStore); + fileIdempotentRepository.setCacheSize(5); + fileIdempotentRepository.setMaxFileStoreSize(128); + fileIdempotentRepository.setDropOldestFileStore(1000); + + fileIdempotentRepository.start(); + files.forEach(e -> fileIdempotentRepository.add(e)); + + // force cleanup and trunk + fileIdempotentRepository.cleanup(); + fileIdempotentRepository.trunkStore(); + + fileIdempotentRepository.stop(); + + //then + Stream fileContent = Files.lines(fileStore.toPath()); + List fileEntries = fileContent.collect(Collectors.toList()); + fileContent.close(); + + // all old entries is removed + Assert.assertEquals(0, fileEntries.size()); + } + +} \ No newline at end of file