Skip to content

Commit

Permalink
[FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory f…
Browse files Browse the repository at this point in the history
…rom OptionsFactory
  • Loading branch information
carp84 committed Jun 11, 2020
1 parent 4aa6524 commit 389cda9
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.annotation.VisibleForTesting;

import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.util.ArrayList;

/**
* A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}.
*/
public class OptionsFactoryAdapter implements OptionsFactory {

private static final long serialVersionUID = 1L;

private final RocksDBOptionsFactory rocksDBOptionsFactory;

OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) {
this.rocksDBOptionsFactory = rocksDBOptionsFactory;
}
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return rocksDBOptionsFactory.createDBOptions(currentOptions, new ArrayList<>());
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>());
}

@VisibleForTesting
RocksDBOptionsFactory getRocksDBOptionsFactory() {
return rocksDBOptionsFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.util.ArrayList;
import java.util.Collection;

/**
Expand All @@ -32,7 +31,7 @@
* <p>A typical pattern to use this OptionsFactory is as follows:
*
* <pre>{@code
* rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
* rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
*
* public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
* return currentOptions.setMaxOpenFiles(1024);
Expand All @@ -49,8 +48,7 @@
* });
* }</pre>
*/
@SuppressWarnings("deprecation")
public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable {
public interface RocksDBOptionsFactory extends java.io.Serializable {

/**
* This method should set the additional options on top of the current options object.
Expand Down Expand Up @@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}

// ------------------------------------------------------------------------
// for compatibility
// ------------------------------------------------------------------------

/**
* Do not override these methods, they are only to maintain interface compatibility with
* prior versions. They will be removed in one of the next versions.
*/
@Override
default DBOptions createDBOptions(DBOptions currentOptions) {
return createDBOptions(currentOptions, new ArrayList<>());
}

/**
* Do not override these methods, they are only to maintain interface compatibility with
* prior versions. They will be removed in one of the next versions.
*/
@Override
default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return createColumnOptions(currentOptions, new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public RocksDBOptionsFactory configure(ReadableConfig configuration) {
}

@Nullable
public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
return factory instanceof RocksDBOptionsFactoryAdapter
? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory
: factory;
: new OptionsFactoryAdapter(factory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,7 @@ public RocksDBOptionsFactory getRocksDBOptions() {
*/
@Deprecated
public void setOptions(OptionsFactory optionsFactory) {
this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory
? (RocksDBOptionsFactory) optionsFactory
: new RocksDBOptionsFactoryAdapter(optionsFactory);
this.rocksDbOptionsFactory = new RocksDBOptionsFactoryAdapter(optionsFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,14 @@ public void testOptionsFactory() throws Exception {
rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());

assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory);
OptionsFactory optionsFactory = rocksDbBackend.getOptions();
if (optionsFactory instanceof OptionsFactoryAdapter) {
RocksDBOptionsFactory rocksDBOptionsFactory =
((OptionsFactoryAdapter) optionsFactory).getRocksDBOptionsFactory();
assertTrue(rocksDBOptionsFactory instanceof TestOptionsFactory);
} else {
assertTrue(optionsFactory instanceof TestOptionsFactory);
}

try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) {
DBOptions dbOptions = optionsContainer.getDbOptions();
Expand Down Expand Up @@ -640,7 +647,7 @@ public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {

assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
assertEquals(original.getOptions(), copy.getOptions());
assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions());
assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());

FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
Expand Down

0 comments on commit 389cda9

Please sign in to comment.