From 72267c47fa98b9110133328a44cbd49cfee827e9 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Thu, 6 Jun 2024 16:55:11 +0800 Subject: [PATCH] [FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compression.per.level' in yaml --- .../state/EmbeddedRocksDBStateBackend.java | 9 +++++- .../state/RocksDBStateBackendConfigTest.java | 28 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index 6d15c53c1a287..3bb5e7f0d3d94 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -64,6 +64,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -814,6 +815,8 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon base = new Configuration(); } Configuration configuration = new Configuration(); + Map baseMap = base.toMap(); + Map onTopMap = onTop.toMap(); for (ConfigOption option : RocksDBConfigurableOptions.CANDIDATE_CONFIGS) { Optional baseValue = base.getOptional(option); Optional topValue = onTop.getOptional(option); @@ -821,7 +824,11 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon if (topValue.isPresent() || baseValue.isPresent()) { Object validValue = topValue.isPresent() ? topValue.get() : baseValue.get(); RocksDBConfigurableOptions.checkArgumentValid(option, validValue); - configuration.setString(option.key(), validValue.toString()); + String valueString = + topValue.isPresent() + ? onTopMap.get(option.key()) + : baseMap.get(option.key()); + configuration.setString(option.key(), valueString); } } return configuration; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 85678474e7ba9..c1c4169864767 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.CloseableRegistry; @@ -71,6 +72,7 @@ import java.io.File; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -287,6 +289,32 @@ public void testConfigureTimerServiceLoadingFromApplication() throws Exception { env.close(); } + @Test + public void testConfigureRocksDBCompressionPerLevel() throws Exception { + GlobalConfiguration.setStandardYaml(false); + final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); + EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); + CompressionType[] compressionTypes = { + CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION + }; + Configuration conf = new Configuration(); + conf.set( + RocksDBConfigurableOptions.COMPRESSION_PER_LEVEL, + new ArrayList<>(Arrays.asList(compressionTypes))); + + rocksDbBackend = + rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader()); + + RocksDBResourceContainer resourceContainer = + rocksDbBackend.createOptionsAndResourceContainer(tempFolder.newFile()); + ColumnFamilyOptions columnFamilyOptions = resourceContainer.getColumnOptions(); + assertArrayEquals(compressionTypes, columnFamilyOptions.compressionPerLevel().toArray()); + + resourceContainer.close(); + env.close(); + GlobalConfiguration.setStandardYaml(true); + } + @Test public void testStoragePathWithFilePrefix() throws Exception { final File folder = tempFolder.newFolder();