Skip to content

Commit

Permalink
[FLINK-35537] Fix exception when setting 'state.backend.rocksdb.compr…
Browse files Browse the repository at this point in the history
…ession.per.level' in yaml
  • Loading branch information
Zakelly committed Jun 7, 2024
1 parent 0305111 commit 20816c9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -890,14 +891,20 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon
base = new Configuration();
}
Configuration configuration = new Configuration();
Map<String, String> baseMap = base.toMap();
Map<String, String> onTopMap = onTop.toMap();
for (ConfigOption<?> option : RocksDBConfigurableOptions.CANDIDATE_CONFIGS) {
Optional<?> baseValue = base.getOptional(option);
Optional<?> topValue = onTop.getOptional(option);

if (topValue.isPresent() || baseValue.isPresent()) {
Object validValue = topValue.isPresent() ? topValue.get() : baseValue.get();
RocksDBConfigurableOptions.checkArgumentValid(option, validValue);
configuration.setString(option.key(), validValue.toString());
String valueString =
topValue.isPresent()
? onTopMap.get(option.key())
: baseMap.get(option.key());
configuration.setString(option.key(), valueString);
}
}
return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
Expand Down Expand Up @@ -71,6 +72,7 @@

import java.io.File;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -287,6 +289,32 @@ public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
env.close();
}

@Test
public void testConfigureRocksDBCompressionPerLevel() throws Exception {
GlobalConfiguration.setStandardYaml(false);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
CompressionType[] compressionTypes = {
CompressionType.NO_COMPRESSION, CompressionType.SNAPPY_COMPRESSION
};
Configuration conf = new Configuration();
conf.set(
RocksDBConfigurableOptions.COMPRESSION_PER_LEVEL,
new ArrayList<>(Arrays.asList(compressionTypes)));

rocksDbBackend =
rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());

RocksDBResourceContainer resourceContainer =
rocksDbBackend.createOptionsAndResourceContainer(tempFolder.newFile());
ColumnFamilyOptions columnFamilyOptions = resourceContainer.getColumnOptions();
assertArrayEquals(compressionTypes, columnFamilyOptions.compressionPerLevel().toArray());

resourceContainer.close();
env.close();
GlobalConfiguration.setStandardYaml(true);
}

@Test
public void testStoragePathWithFilePrefix() throws Exception {
final File folder = tempFolder.newFolder();
Expand Down

0 comments on commit 20816c9

Please sign in to comment.