Skip to content

Commit

Permalink
[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB b…
Browse files Browse the repository at this point in the history
…ackend.

This also cleans up the generics in the RocksDB state classes.

This closes #1608
  • Loading branch information
StephanEwen committed Feb 9, 2016
1 parent 28c6254 commit 9ee1679
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 175 deletions.
@@ -1,44 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.commons.io.FileUtils;

import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.util.HDFSCopyFromLocal;
import org.apache.flink.util.HDFSCopyToLocal;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.rocksdb.BackupEngine;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.Env;
import org.rocksdb.Options;
import org.rocksdb.RestoreOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.StringAppendOperator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,10 +62,9 @@
* @param <N> The type of the namespace.
* @param <S> The type of {@link State}.
* @param <SD> The type of {@link StateDescriptor}.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
implements KvState<K, N, S, SD, Backend>, State {
public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
implements KvState<K, N, S, SD, RocksDBStateBackend>, State {

private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);

Expand Down Expand Up @@ -95,19 +96,18 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath) {
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
Options options) {

this.keySerializer = requireNonNull(keySerializer);
this.namespaceSerializer = namespaceSerializer;
this.dbPath = dbPath;
this.checkpointPath = checkpointPath;

RocksDB.loadLibrary();

Options options = new Options().setCreateIfMissing(true);
options.setMergeOperator(new StringAppendOperator());

if (!dbPath.exists()) {
if (!dbPath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
Expand All @@ -128,9 +128,6 @@ protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}

options.dispose();

}

/**
Expand All @@ -143,10 +140,11 @@ protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
* @param restorePath The path to a backup directory from which to restore RocksDb database.
*/
protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
String restorePath) {
TypeSerializer<N> namespaceSerializer,
File dbPath,
String checkpointPath,
String restorePath,
Options options) {

RocksDB.loadLibrary();

Expand All @@ -162,9 +160,6 @@ protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
this.dbPath = dbPath;
this.checkpointPath = checkpointPath;

Options options = new Options().setCreateIfMissing(true);
options.setMergeOperator(new StringAppendOperator());

if (!dbPath.exists()) {
if (!dbPath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory.");
Expand All @@ -176,8 +171,6 @@ protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
} catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e);
}

options.dispose();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -211,12 +204,10 @@ final public void setCurrentNamespace(N namespace) {
this.currentNamespace = namespace;
}

protected abstract KvStateSnapshot<K, N, S, SD, Backend> createRocksDBSnapshot(URI backupUri, long checkpointId);
protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId);

@Override
final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
long checkpointId,
long timestamp) throws Exception {
public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long checkpointId, long timestamp) throws Exception {
boolean success = false;

final File localBackupPath = new File(dbPath, "backup-" + checkpointId);
Expand All @@ -234,7 +225,7 @@ final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
}

HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
KvStateSnapshot<K, N, S, SD, Backend> result = createRocksDBSnapshot(backupUri, checkpointId);
AbstractRocksDBSnapshot<K, N, S, SD> result = createRocksDBSnapshot(backupUri, checkpointId);
success = true;
return result;
} finally {
Expand All @@ -256,7 +247,9 @@ final public void dispose() {
}
}

public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> implements KvStateSnapshot<K, N, S, SD, Backend> {
public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend>
{
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
Expand Down Expand Up @@ -293,12 +286,13 @@ public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD e
protected final SD stateDesc;

public AbstractRocksDBSnapshot(File dbPath,
String checkpointPath,
URI backupUri,
long checkpointId,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc) {
String checkpointPath,
URI backupUri,
long checkpointId,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc) {

this.dbPath = dbPath;
this.checkpointPath = checkpointPath;
this.backupUri = backupUri;
Expand All @@ -309,19 +303,21 @@ public AbstractRocksDBSnapshot(File dbPath,
this.namespaceSerializer = namespaceSerializer;
}

protected abstract KvState<K, N, S, SD, Backend> createRocksDBState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception;
protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception;

@Override
public final KvState<K, N, S, SD, Backend> restoreState(
Backend stateBackend,
TypeSerializer<K> keySerializer,
ClassLoader classLoader,
long recoveryTimestamp) throws Exception {
public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
RocksDBStateBackend stateBackend,
TypeSerializer<K> keySerializer,
ClassLoader classLoader,
long recoveryTimestamp) throws Exception {

// validity checks
if (!this.keySerializer.equals(keySerializer)) {
Expand Down Expand Up @@ -352,7 +348,8 @@ public final KvState<K, N, S, SD, Backend> restoreState(
}

HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, localBackupPath.getAbsolutePath());
return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
}

@Override
Expand Down
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.rocksdb.Options;

/**
* A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
* Options have to be created lazily by this factory, because the {@code Options}
* class is not serializable and holds pointers to native code.
*/
public interface OptionsFactory extends java.io.Serializable {

Options createOptions();
}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -15,16 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;

import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import java.io.ByteArrayInputStream;
Expand All @@ -44,10 +45,9 @@
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the values in the list state.
* @param <Backend> The type of the backend that snapshots this key/value state.
*/
public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, Backend>
public class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
implements ListState<V> {

/** Serializer for the values */
Expand All @@ -66,11 +66,13 @@ public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBListState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath);
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
Options options) {

super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
}
Expand All @@ -85,12 +87,14 @@ protected RocksDBListState(TypeSerializer<K> keySerializer,
* @param dbPath The path on the local system where RocksDB data should be stored.
*/
protected RocksDBListState(TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) {
super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath);
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) {

super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
this.stateDesc = requireNonNull(stateDesc);
this.valueSerializer = stateDesc.getSerializer();
}
Expand Down Expand Up @@ -143,13 +147,16 @@ public void add(V value) throws IOException {
}

@Override
protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {
protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> createRocksDBSnapshot(
URI backupUri,
long checkpointId) {

return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
}

private static class Snapshot<K, N, V, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, Backend> {
private static class Snapshot<K, N, V> extends
AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
{
private static final long serialVersionUID = 1L;

public Snapshot(File dbPath,
Expand All @@ -169,14 +176,17 @@ public Snapshot(File dbPath,
}

@Override
protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, Backend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath) throws Exception {
return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<V> stateDesc,
File dbPath,
String backupPath,
String restorePath,
Options options) throws Exception {

return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,
checkpointPath, restorePath, options);
}
}
}
Expand Down

0 comments on commit 9ee1679

Please sign in to comment.