From e24e7b9dc0a984ff5247b6a4131e6c353669f2bf Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Thu, 15 Mar 2018 23:57:11 +0800 Subject: [PATCH 1/4] pull the creation of readOptions out of loop to avoid native resource leak. --- .../streaming/state/RocksDBKeyedStateBackend.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 6a231818ff574..cdeb6087d0360 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1952,15 +1952,16 @@ private void writeKVStateMetaData() throws IOException { this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size()); int kvStateId = 0; + + //retrieve iterator for this k/v states + readOptions = new ReadOptions(); + readOptions.setSnapshot(snapshot); + for (Tuple2> column : kvStateInformationCopy) { metaInfoSnapshots.add(column.f1.snapshot()); - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); - kvStateIterators.add( new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId)); From 5229f102dd7b7fd0601069f6cd653211802bf6d8 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Fri, 16 Mar 2018 23:07:54 +0800 Subject: [PATCH 2/4] fix concurrency problem in full checkpoint. --- .../state/RocksDBKeyedStateBackend.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index cdeb6087d0360..31b9d99c1f2da 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1836,7 +1836,13 @@ static class RocksDBFullSnapshotOperation private Snapshot snapshot; private ReadOptions readOptions; - private List>> kvStateInformationCopy; + + /** The state meta data. */ + private List> stateMetaInfoSnapshots; + + /** The copied column handle. */ + private List copiedColumnFamilyHandles; + private List> kvStateIterators; private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider; @@ -1860,7 +1866,19 @@ static class RocksDBFullSnapshotOperation */ public void takeDBSnapShot() { Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values()); + + this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size()); + + this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size()); + + for (Tuple2> tuple2 : + stateBackend.kvStateInformation.values()) { + // snapshot meta info + this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot()); + + // copy column family handle + this.copiedColumnFamilyHandles.add(tuple2.f0); + } this.snapshot = stateBackend.db.getSnapshot(); } @@ -1946,10 +1964,7 @@ public void releaseSnapshotResources() { private void writeKVStateMetaData() throws IOException { - List> metaInfoSnapshots = - new ArrayList<>(kvStateInformationCopy.size()); - - this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size()); + this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size()); int kvStateId = 0; @@ -1957,13 +1972,10 @@ private void writeKVStateMetaData() throws IOException { readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); - for (Tuple2> column : - kvStateInformationCopy) { - - metaInfoSnapshots.add(column.f1.snapshot()); + for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) { kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId)); + new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId)); ++kvStateId; } @@ -1971,7 +1983,7 @@ private void writeKVStateMetaData() throws IOException { KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy<>( stateBackend.getKeySerializer(), - metaInfoSnapshots, + stateMetaInfoSnapshots, !Objects.equals( UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); From 793de05881d1edf826579dab17cf0f0545bdf675 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Sat, 17 Mar 2018 23:45:02 +0800 Subject: [PATCH 3/4] fix some minor bugs in tests. --- .../streaming/state/RocksDBStateBackendConfigTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 2dd67f5c2d9b9..65d5b2e6f9780 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -299,7 +299,9 @@ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOption }); assertNotNull(rocksDbBackend.getOptions()); - assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle()); + try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) { + assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle()); + } } @Test @@ -324,7 +326,9 @@ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOption assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); assertNotNull(rocksDbBackend.getOptions()); - assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle()); + try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) { + assertEquals(CompactionStyle.UNIVERSAL, colCreated.compactionStyle()); + } } @Test From 2b83575981b69824356a63f826d1f69c9588a772 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Wed, 28 Mar 2018 00:43:55 +0800 Subject: [PATCH 4/4] use try-with-resources to ensure RocksIterator can be closed in RocksDBMapState. --- .../state/RocksDBKeyedStateBackend.java | 16 +++-- .../streaming/state/RocksDBMapState.java | 71 ++++++++++--------- 2 files changed, 47 insertions(+), 40 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 31b9d99c1f2da..3000667c48cae 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1313,10 +1313,10 @@ static final class RocksDBMergeIterator implements AutoCloseable { private static final List> COMPARATORS; static { - int maxBytes = 4; + int maxBytes = 2; COMPARATORS = new ArrayList<>(maxBytes); for (int i = 0; i < maxBytes; ++i) { - final int currentBytes = i; + final int currentBytes = i + 1; COMPARATORS.add(new Comparator() { @Override public int compare(MergeIterator o1, MergeIterator o2) { @@ -1330,9 +1330,11 @@ public int compare(MergeIterator o1, MergeIterator o2) { RocksDBMergeIterator(List> kvStateIterators, final int keyGroupPrefixByteCount) { Preconditions.checkNotNull(kvStateIterators); + Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); + this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; - Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); + Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1); if (kvStateIterators.size() > 0) { PriorityQueue iteratorPriorityQueue = @@ -1837,10 +1839,14 @@ static class RocksDBFullSnapshotOperation private Snapshot snapshot; private ReadOptions readOptions; - /** The state meta data. */ + /** + * The state meta data. + */ private List> stateMetaInfoSnapshots; - /** The copied column handle. */ + /** + * The copied column handle. + */ private List copiedColumnFamilyHandles; private List> kvStateIterators; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index c75a2edf13229..baa90fac5d09c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -540,52 +540,53 @@ private void loadCache() { return; } - RocksIterator iterator = db.newIterator(columnFamily); - - /* - * The iteration starts from the prefix bytes at the first loading. The cache then is - * reloaded when the next entry to return is the last one in the cache. At that time, - * we will start the iterating from the last returned entry. - */ - RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); - byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); - - cacheEntries.clear(); - cacheIndex = 0; - - iterator.seek(startBytes); - - /* - * If the last returned entry is not deleted, it will be the first entry in the - * iterating. Skip it to avoid redundant access in such cases. - */ - if (lastEntry != null && !lastEntry.deleted) { - iterator.next(); - } - - while (true) { - if (!iterator.isValid() || !underSameKey(iterator.key())) { - expired = true; - break; + // use try-with-resources to ensure RocksIterator can be release even some runtime exception + // occurred in the below code block. + try (RocksIterator iterator = db.newIterator(columnFamily)) { + + /* + * The iteration starts from the prefix bytes at the first loading. The cache then is + * reloaded when the next entry to return is the last one in the cache. At that time, + * we will start the iterating from the last returned entry. + */ + RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); + byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); + + cacheEntries.clear(); + cacheIndex = 0; + + iterator.seek(startBytes); + + /* + * If the last returned entry is not deleted, it will be the first entry in the + * iterating. Skip it to avoid redundant access in such cases. + */ + if (lastEntry != null && !lastEntry.deleted) { + iterator.next(); } - if (cacheEntries.size() >= CACHE_SIZE_LIMIT) { - break; - } + while (true) { + if (!iterator.isValid() || !underSameKey(iterator.key())) { + expired = true; + break; + } - RocksDBMapEntry entry = new RocksDBMapEntry( + if (cacheEntries.size() >= CACHE_SIZE_LIMIT) { + break; + } + + RocksDBMapEntry entry = new RocksDBMapEntry( db, iterator.key(), iterator.value(), keySerializer, valueSerializer); - cacheEntries.add(entry); + cacheEntries.add(entry); - iterator.next(); + iterator.next(); + } } - - iterator.close(); } private boolean underSameKey(byte[] rawKeyBytes) {