From 2cfc54c8e77b840acc0d176cea8eabda4d1c4b03 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 14 Aug 2017 10:46:08 -0400 Subject: [PATCH 1/5] KAFKA-5733: ensure clean RocksDB directory before setting prepareForBulkload settings --- .../streams/state/internals/RocksDBStore.java | 21 +++++++++++++++---- .../state/internals/RocksDBStoreTest.java | 19 +++++++++++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index f8e9002f7653..65fe90ee21c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -47,6 +47,7 @@ import org.rocksdb.WriteOptions; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.file.Files; import java.util.Collection; @@ -143,10 +144,6 @@ public void openDB(ProcessorContext context) { // (this could be a bug in the RocksDB code and their devs have been contacted). options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); - if (prepareForBulkload) { - options.prepareForBulkLoad(); - } - wOptions = new WriteOptions(); wOptions.setDisableWAL(true); @@ -172,6 +169,11 @@ public void openDB(ProcessorContext context) { valueSerde == null ? (Serde) context.valueSerde() : valueSerde); this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); + + if (!hasSstFiles(dbDir) && prepareForBulkload) { + options.prepareForBulkLoad(); + } + try { this.db = openDB(this.dbDir, this.options, TTL_SECONDS); } catch (IOException e) { @@ -530,6 +532,17 @@ public synchronized boolean hasNext() { } } + private boolean hasSstFiles(File dbDir) { + String[] sstFileNames = dbDir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.matches(".*\\.sst"); + } + }); + + return sstFileNames != null && sstFileNames.length > 0; + } + private static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { private final RocksDBStore rocksDBStore; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 553a134197dc..3afb79926c61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -75,6 +75,25 @@ public void tearDown() throws Exception { subject.close(); } + @Test + public void shouldNotThrowExceptionOnRestoreWithBulkLoad() throws Exception { + String message = "how can a 4 ounce bird carry a 2lb coconut"; + subject.init(context, subject); + int intKey = 1; + for (int i = 0; i < 2000000; i++) { + subject.put("theKeyIs" + intKey++, message); + } + + List> restoreBytes = new ArrayList<>(); + byte[] restoredKey = "restoredKey".getBytes("UTF-8"); + byte[] restoredValue = "restoredValue".getBytes("UTF-8"); + restoreBytes.add(KeyValue.pair(restoredKey, restoredValue)); + + context.restore("test", restoreBytes); + + assertThat(subject.get("restoredKey"), equalTo("restoredValue")); + } + @Test public void canSpecifyConfigSetterAsClass() throws Exception { final Map configs = new HashMap<>(); From f468147455c9f8eaf640f2821b7849b7b370968d Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 14 Aug 2017 12:57:29 -0400 Subject: [PATCH 2/5] KAFKA-5733: address comments --- .../kafka/streams/state/internals/RocksDBStore.java | 4 ++-- .../streams/state/internals/RocksDBStoreTest.java | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 65fe90ee21c5..2f6f7421ee6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -532,8 +532,8 @@ public synchronized boolean hasNext() { } } - private boolean hasSstFiles(File dbDir) { - String[] sstFileNames = dbDir.list(new FilenameFilter() { + private boolean hasSstFiles(final File dbDir) { + final String[] sstFileNames = dbDir.list(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return name.matches(".*\\.sst"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 3afb79926c61..c08858701264 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -76,17 +76,19 @@ public void tearDown() throws Exception { } @Test - public void shouldNotThrowExceptionOnRestoreWithBulkLoad() throws Exception { - String message = "how can a 4 ounce bird carry a 2lb coconut"; + public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() throws Exception { subject.init(context, subject); + + final String message = "how can a 4 ounce bird carry a 2lb coconut"; int intKey = 1; for (int i = 0; i < 2000000; i++) { subject.put("theKeyIs" + intKey++, message); } - List> restoreBytes = new ArrayList<>(); - byte[] restoredKey = "restoredKey".getBytes("UTF-8"); - byte[] restoredValue = "restoredValue".getBytes("UTF-8"); + final List> restoreBytes = new ArrayList<>(); + + final byte[] restoredKey = "restoredKey".getBytes("UTF-8"); + final byte[] restoredValue = "restoredValue".getBytes("UTF-8"); restoreBytes.add(KeyValue.pair(restoredKey, restoredValue)); context.restore("test", restoreBytes); From 149634b405cd5d73ec0e27c9a55b16f9971abf1f Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 14 Aug 2017 14:20:05 -0400 Subject: [PATCH 3/5] KAFKA-5733: address additional comments, enforce not opening and closing if not a clean RocksDB dir on restore --- .../streams/state/internals/RocksDBStore.java | 34 +++++++++++-- .../state/internals/RocksDBStoreTest.java | 51 +++++++++++++------ 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 2f6f7421ee6d..0321a67d2038 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -102,6 +102,7 @@ public class RocksDBStore implements KeyValueStore { private FlushOptions fOptions; private volatile boolean prepareForBulkload = false; + private volatile boolean hasPreExistingSstFiles = false; private ProcessorContext internalProcessorContext; // visible for testing volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null; @@ -170,7 +171,7 @@ public void openDB(ProcessorContext context) { this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); - if (!hasSstFiles(dbDir) && prepareForBulkload) { + if (!hasPreExistingSstFiles && prepareForBulkload) { options.prepareForBulkLoad(); } @@ -532,6 +533,11 @@ public synchronized boolean hasNext() { } } + boolean hasSstFiles() { + hasPreExistingSstFiles = hasSstFiles(dbDir); + return hasPreExistingSstFiles; + } + private boolean hasSstFiles(final File dbDir) { final String[] sstFileNames = dbDir.list(new FilenameFilter() { @Override @@ -543,9 +549,14 @@ public boolean accept(File dir, String name) { return sstFileNames != null && sstFileNames.length > 0; } - private static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { + // not private for testing + static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { private final RocksDBStore rocksDBStore; + private boolean needsReopenAtEnd = false; + + // for testing + private boolean wasReOpenedAfterRestore = false; RocksDBBatchingRestoreCallback(final RocksDBStore rocksDBStore) { this.rocksDBStore = rocksDBStore; @@ -561,14 +572,29 @@ public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { - rocksDBStore.toggleDbForBulkLoading(true); + + wasReOpenedAfterRestore = false; + + if (!rocksDBStore.hasSstFiles()) { + rocksDBStore.toggleDbForBulkLoading(true); + needsReopenAtEnd = true; + } } @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { - rocksDBStore.toggleDbForBulkLoading(false); + if (needsReopenAtEnd) { + rocksDBStore.toggleDbForBulkLoading(false); + needsReopenAtEnd = false; + wasReOpenedAfterRestore = true; + } + } + + // testing and findbugs + boolean isWasReOpenedAfterRestore() { + return wasReOpenedAfterRestore; } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index c08858701264..41f1ff1f1799 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; @@ -38,6 +37,7 @@ import java.io.File; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -150,21 +150,42 @@ public void shouldPutAll() { @Test public void shouldTogglePrepareForBulkloadSetting() { subject.init(context, subject); - StateRestoreListener restoreListener = (StateRestoreListener) subject.batchingStateRestoreCallback; + RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = + (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback; restoreListener.onRestoreStart(null, null, 0, 0); assertTrue("Should have set bulk loading to true", subject.isPrepareForBulkload()); restoreListener.onRestoreEnd(null, null, 0); assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload()); + + assertTrue("Should have been re-opened", restoreListener.isWasReOpenedAfterRestore()); + } + + @Test + public void shouldNotTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception { + final List> entries = getKeyValueEntries(); + + subject.init(context, subject); + context.restore(subject.name(), entries); + + RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = + (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback; + + restoreListener.onRestoreStart(null, null, 0, 0); + + // since pre-existing sst files, should not have called the toggleForBulkload method, hence, + // prepareForBulkLoad is never set to true; + assertFalse("Should have not set bulk loading to true", subject.isPrepareForBulkload()); + + restoreListener.onRestoreEnd(null, null, 0); + + assertFalse("Should not have have been re-opened", restoreListener.isWasReOpenedAfterRestore()); } @Test public void shouldRestoreAll() throws Exception { - final List> entries = new ArrayList<>(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + final List> entries = getKeyValueEntries(); subject.init(context, subject); context.restore(subject.name(), entries); @@ -177,10 +198,7 @@ public void shouldRestoreAll() throws Exception { @Test public void shouldHandleDeletesOnRestoreAll() throws Exception { - final List> entries = new ArrayList<>(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + final List> entries = getKeyValueEntries(); entries.add(new KeyValue<>("1".getBytes("UTF-8"), (byte[]) null)); subject.init(context, subject); @@ -226,11 +244,7 @@ public void shouldHandleDeletesAndPutbackOnRestoreAll() throws Exception { @Test public void shouldRestoreThenDeleteOnRestoreAll() throws Exception { - - final List> entries = new ArrayList<>(); - entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); - entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); - entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + final List> entries = getKeyValueEntries(); subject.init(context, subject); @@ -322,6 +336,13 @@ public void setConfig(final String storeName, final Options options, final Map> getKeyValueEntries() throws UnsupportedEncodingException { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>("1".getBytes("UTF-8"), "a".getBytes("UTF-8"))); + entries.add(new KeyValue<>("2".getBytes("UTF-8"), "b".getBytes("UTF-8"))); + entries.add(new KeyValue<>("3".getBytes("UTF-8"), "c".getBytes("UTF-8"))); + return entries; + } private static class ConfigurableProcessorContext extends MockProcessorContext { final Map configs; From d0ada11eb8e97fbd5efdfb4c66160a81970575e4 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 16 Aug 2017 15:45:16 -0700 Subject: [PATCH 4/5] use compactRange to still enable bulk loading --- .../streams/state/internals/RocksDBStore.java | 76 ++++++++----------- .../state/internals/RocksDBStoreTest.java | 12 +-- 2 files changed, 36 insertions(+), 52 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0321a67d2038..0f3006314ad8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -102,7 +102,6 @@ public class RocksDBStore implements KeyValueStore { private FlushOptions fOptions; private volatile boolean prepareForBulkload = false; - private volatile boolean hasPreExistingSstFiles = false; private ProcessorContext internalProcessorContext; // visible for testing volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null; @@ -145,6 +144,10 @@ public void openDB(ProcessorContext context) { // (this could be a bug in the RocksDB code and their devs have been contacted). options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); + if (prepareForBulkload) { + options.prepareForBulkLoad(); + } + wOptions = new WriteOptions(); wOptions.setDisableWAL(true); @@ -171,10 +174,6 @@ public void openDB(ProcessorContext context) { this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); - if (!hasPreExistingSstFiles && prepareForBulkload) { - options.prepareForBulkLoad(); - } - try { this.db = openDB(this.dbDir, this.options, TTL_SECONDS); } catch (IOException e) { @@ -257,6 +256,32 @@ private byte[] getInternal(byte[] rawKey) { } private void toggleDbForBulkLoading(boolean prepareForBulkload) { + + if (prepareForBulkload) { + // if the store is not empty, we need to compact to get around the num.levels check + // for bulk loading + final String[] sstFileNames = dbDir.list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.matches(".*\\.sst"); + } + }); + + if (sstFileNames != null && sstFileNames.length > 0) { + try { + this.db.compactRange(true, 1, 0); + + } catch (RocksDBException e) { + throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e); + } + + // we need to re-open with the old num.levels again, this is a workaround + // until https://github.com/facebook/rocksdb/pull/2740 is merged in rocksdb + close(); + openDB(internalProcessorContext); + } + } + close(); this.prepareForBulkload = prepareForBulkload; openDB(internalProcessorContext); @@ -438,7 +463,7 @@ public synchronized void close() { } private void closeOpenIterators() { - HashSet iterators = null; + HashSet iterators; synchronized (openIterators) { iterators = new HashSet<>(openIterators); } @@ -533,30 +558,10 @@ public synchronized boolean hasNext() { } } - boolean hasSstFiles() { - hasPreExistingSstFiles = hasSstFiles(dbDir); - return hasPreExistingSstFiles; - } - - private boolean hasSstFiles(final File dbDir) { - final String[] sstFileNames = dbDir.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.matches(".*\\.sst"); - } - }); - - return sstFileNames != null && sstFileNames.length > 0; - } - // not private for testing static class RocksDBBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback { private final RocksDBStore rocksDBStore; - private boolean needsReopenAtEnd = false; - - // for testing - private boolean wasReOpenedAfterRestore = false; RocksDBBatchingRestoreCallback(final RocksDBStore rocksDBStore) { this.rocksDBStore = rocksDBStore; @@ -572,29 +577,14 @@ public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) { - - wasReOpenedAfterRestore = false; - - if (!rocksDBStore.hasSstFiles()) { - rocksDBStore.toggleDbForBulkLoading(true); - needsReopenAtEnd = true; - } + rocksDBStore.toggleDbForBulkLoading(true); } @Override public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) { - if (needsReopenAtEnd) { - rocksDBStore.toggleDbForBulkLoading(false); - needsReopenAtEnd = false; - wasReOpenedAfterRestore = true; - } - } - - // testing and findbugs - boolean isWasReOpenedAfterRestore() { - return wasReOpenedAfterRestore; + rocksDBStore.toggleDbForBulkLoading(false); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 41f1ff1f1799..148762d249fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -158,12 +158,10 @@ public void shouldTogglePrepareForBulkloadSetting() { restoreListener.onRestoreEnd(null, null, 0); assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload()); - - assertTrue("Should have been re-opened", restoreListener.isWasReOpenedAfterRestore()); } @Test - public void shouldNotTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception { + public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() throws Exception { final List> entries = getKeyValueEntries(); subject.init(context, subject); @@ -173,14 +171,10 @@ public void shouldNotTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() thr (RocksDBStore.RocksDBBatchingRestoreCallback) subject.batchingStateRestoreCallback; restoreListener.onRestoreStart(null, null, 0, 0); - - // since pre-existing sst files, should not have called the toggleForBulkload method, hence, - // prepareForBulkLoad is never set to true; - assertFalse("Should have not set bulk loading to true", subject.isPrepareForBulkload()); + assertTrue("Should have not set bulk loading to true", subject.isPrepareForBulkload()); restoreListener.onRestoreEnd(null, null, 0); - - assertFalse("Should not have have been re-opened", restoreListener.isWasReOpenedAfterRestore()); + assertFalse("Should have set bulk loading to false", subject.isPrepareForBulkload()); } @Test From 08da1f16c8aeb587dd8b1ac7b5192f1ab3395cd7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 16 Aug 2017 16:53:02 -0700 Subject: [PATCH 5/5] minor fixes --- .../apache/kafka/streams/state/internals/RocksDBStore.java | 6 ++---- .../org/apache/kafka/streams/state/internals/Segment.java | 2 -- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 0f3006314ad8..f142431d286e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -179,6 +179,8 @@ public void openDB(ProcessorContext context) { } catch (IOException e) { throw new ProcessorStateException(e); } + + open = true; } public void init(ProcessorContext context, StateStore root) { @@ -190,8 +192,6 @@ public void init(ProcessorContext context, StateStore root) { // value getter should always read directly from rocksDB // since it is only for values that are already flushed context.register(root, false, this.batchingStateRestoreCallback); - - open = true; } private RocksDB openDB(File dir, Options options, int ttl) throws IOException { @@ -270,7 +270,6 @@ public boolean accept(File dir, String name) { if (sstFileNames != null && sstFileNames.length > 0) { try { this.db.compactRange(true, 1, 0); - } catch (RocksDBException e) { throw new ProcessorStateException("Error while range compacting during restoring store " + this.name, e); } @@ -285,7 +284,6 @@ public boolean accept(File dir, String name) { close(); this.prepareForBulkload = prepareForBulkload; openDB(internalProcessorContext); - open = true; } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index 1311a2781dd2..b9cc8b7bad28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -40,8 +40,6 @@ public void openDB(final ProcessorContext context) { super.openDB(context); // skip the registering step - - open = true; } @Override