From de191cdd8ab6073cd6c06c3c3f0174f15c08369a Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Mon, 19 Feb 2018 20:13:58 +0800 Subject: [PATCH] fix in concurrency problem for full Checkpoint. --- .../state/RocksDBKeyedStateBackend.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 55073393e70d9..142589d122392 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -510,6 +510,12 @@ static final class RocksDBFullSnapshotOperation { private CheckpointStreamFactory.CheckpointStateOutputStream outStream; private DataOutputView outputView; + /** The state meta data. */ + private final List> stateMetaInfoSnapshots = new ArrayList<>(); + + /** The copied column handle. */ + private final List copiedColumnFamilyHandles = new ArrayList<>(); + RocksDBFullSnapshotOperation( RocksDBKeyedStateBackend stateBackend, CheckpointStreamFactory checkpointStreamFactory, @@ -534,6 +540,16 @@ public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { this.checkpointId = checkpointId; this.checkpointTimeStamp = checkpointTimeStamp; this.snapshot = stateBackend.db.getSnapshot(); + + // save meta data + for (Map.Entry>> stateMetaInfoEntry + : stateBackend.kvStateInformation.entrySet()) { + // copy column family handle + copiedColumnFamilyHandles.add(stateMetaInfoEntry.getValue().f0); + + // snapshot meta info + stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); + } } /** @@ -615,21 +631,15 @@ public void releaseSnapshotResources() { private void writeKVStateMetaData() throws IOException { - List> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); - int kvStateId = 0; - for (Map.Entry>> column : - stateBackend.kvStateInformation.entrySet()) { - - metaInfoSnapshots.add(column.getValue().f1.snapshot()); + for (ColumnFamilyHandle column : copiedColumnFamilyHandles) { //retrieve iterator for this k/v states readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); + new Tuple2<>(stateBackend.db.newIterator(column, readOptions), kvStateId)); ++kvStateId; } @@ -637,7 +647,7 @@ private void writeKVStateMetaData() throws IOException { KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy<>( stateBackend.getKeySerializer(), - metaInfoSnapshots, + stateMetaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); serializationProxy.write(outputView);