Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18242][state-backend-rocksdb] Separate RocksDBOptionsFactory from OptionsFactory #12605

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions flink-python/pyflink/datastream/state_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,11 @@ def set_options(self, options_factory_class_name):
The options factory must have a default constructor.
"""
gateway = get_gateway()
JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.OptionsFactory
JOptionsFactory = gateway.jvm.org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory
j_options_factory_clz = load_java_class(options_factory_class_name)
if not get_java_class(JOptionsFactory).isAssignableFrom(j_options_factory_clz):
raise ValueError("The input class not implements OptionsFactory.")
self._j_rocks_db_state_backend.setOptions(j_options_factory_clz.newInstance())
raise ValueError("The input class not implements RocksDBOptionsFactory.")
self._j_rocks_db_state_backend.setRocksDBOptions(j_options_factory_clz.newInstance())

def get_options(self):
"""
Expand All @@ -650,7 +650,7 @@ def get_options(self):

:return: The fully-qualified class name of the options factory in Java.
"""
j_options_factory = self._j_rocks_db_state_backend.getOptions()
j_options_factory = self._j_rocks_db_state_backend.getRocksDBOptions()
if j_options_factory is not None:
return j_options_factory.getClass().getName()
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.annotation.VisibleForTesting;

import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.util.ArrayList;

/**
* A conversion from {@link RocksDBOptionsFactory} to {@link OptionsFactory}.
*/
public class OptionsFactoryAdapter implements OptionsFactory {

private static final long serialVersionUID = 1L;

private final RocksDBOptionsFactory rocksDBOptionsFactory;

OptionsFactoryAdapter(RocksDBOptionsFactory rocksDBOptionsFactory) {
this.rocksDBOptionsFactory = rocksDBOptionsFactory;
}

@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return rocksDBOptionsFactory.createDBOptions(currentOptions, new ArrayList<>());
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return rocksDBOptionsFactory.createColumnOptions(currentOptions, new ArrayList<>());
}

@VisibleForTesting
RocksDBOptionsFactory getRocksDBOptionsFactory() {
return rocksDBOptionsFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.util.ArrayList;
import java.util.Collection;

/**
Expand All @@ -32,7 +31,7 @@
* <p>A typical pattern to use this OptionsFactory is as follows:
*
* <pre>{@code
* rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
* rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
*
* public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
* return currentOptions.setMaxOpenFiles(1024);
Expand All @@ -49,8 +48,7 @@
* });
* }</pre>
*/
@SuppressWarnings("deprecation")
public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable {
public interface RocksDBOptionsFactory extends java.io.Serializable {

/**
* This method should set the additional options on top of the current options object.
Expand Down Expand Up @@ -92,26 +90,4 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}

// ------------------------------------------------------------------------
// for compatibility
// ------------------------------------------------------------------------

/**
* Do not override these methods, they are only to maintain interface compatibility with
* prior versions. They will be removed in one of the next versions.
*/
@Override
default DBOptions createDBOptions(DBOptions currentOptions) {
return createDBOptions(currentOptions, new ArrayList<>());
}

/**
* Do not override these methods, they are only to maintain interface compatibility with
* prior versions. They will be removed in one of the next versions.
*/
@Override
default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
return createColumnOptions(currentOptions, new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public RocksDBOptionsFactory configure(ReadableConfig configuration) {
}

@Nullable
public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
return factory instanceof RocksDBOptionsFactoryAdapter
? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory
: factory;
: new OptionsFactoryAdapter(factory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,7 @@ public RocksDBOptionsFactory getRocksDBOptions() {
*/
@Deprecated
public void setOptions(OptionsFactory optionsFactory) {
this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory
? (RocksDBOptionsFactory) optionsFactory
: new RocksDBOptionsFactoryAdapter(optionsFactory);
this.rocksDbOptionsFactory = new RocksDBOptionsFactoryAdapter(optionsFactory);
}

/**
Expand All @@ -824,7 +822,11 @@ public void setOptions(OptionsFactory optionsFactory) {
*/
@Deprecated
public OptionsFactory getOptions() {
return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
if (rocksDbOptionsFactory == null) {
return null;
} else {
return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,14 @@ public void testOptionsFactory() throws Exception {
rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());

assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory);
OptionsFactory optionsFactory = rocksDbBackend.getOptions();
if (optionsFactory instanceof OptionsFactoryAdapter) {
RocksDBOptionsFactory rocksDBOptionsFactory =
((OptionsFactoryAdapter) optionsFactory).getRocksDBOptionsFactory();
assertTrue(rocksDBOptionsFactory instanceof TestOptionsFactory);
} else {
assertTrue(optionsFactory instanceof TestOptionsFactory);
}

try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) {
DBOptions dbOptions = optionsContainer.getDbOptions();
Expand Down Expand Up @@ -640,7 +647,7 @@ public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {

assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
assertEquals(original.getOptions(), copy.getOptions());
assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions());
assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());

FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
Expand Down