diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 4dd4c26c8fc60..8b9903620a891 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -66,6 +66,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -890,6 +891,8 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon base = new Configuration(); } Configuration configuration = new Configuration(); + Map baseMap = base.toMap(); + Map onTopMap = onTop.toMap(); for (ConfigOption option : RocksDBConfigurableOptions.CANDIDATE_CONFIGS) { Optional baseValue = base.getOptional(option); Optional topValue = onTop.getOptional(option); @@ -897,7 +900,11 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon if (topValue.isPresent() || baseValue.isPresent()) { Object validValue = topValue.isPresent() ? topValue.get() : baseValue.get(); RocksDBConfigurableOptions.checkArgumentValid(option, validValue); - configuration.setString(option.key(), validValue.toString()); + String valueString = + topValue.isPresent() + ? onTopMap.get(option.key()) + : baseMap.get(option.key()); + configuration.setString(option.key(), valueString); } } return configuration; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 484b24e4dc5aa..bcd9023845c87 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.CloseableRegistry; @@ -71,6 +72,7 @@ import java.io.File; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -287,6 +289,32 @@ public void testConfigureTimerServiceLoadingFromApplication() throws Exception { env.close(); } + @Test + public void testConfigureRocksDBCompressionPerLevel() throws Exception { + GlobalConfiguration.setStandardYaml(false); + final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); + CompressionType[] compressionTypes = { + CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION + }; + Configuration conf = new Configuration(); + conf.set( + RocksDBConfigurableOptions.COMPRESSION_PER_LEVEL, + new ArrayList<>(Arrays.asList(compressionTypes))); + + rocksDbBackend = + rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader()); + + RocksDBResourceContainer resourceContainer = + rocksDbBackend.createOptionsAndResourceContainer(tempFolder.newFile()); + ColumnFamilyOptions columnFamilyOptions = resourceContainer.getColumnOptions(); + assertArrayEquals(compressionTypes, columnFamilyOptions.compressionPerLevel().toArray()); + + resourceContainer.close(); + env.close(); + GlobalConfiguration.setStandardYaml(true); + } + @Test public void testStoragePathWithFilePrefix() throws Exception { final File folder = tempFolder.newFolder();