Skip to content

Commit

Permalink
[FLINK-3359] Make RocksDB File Copies Asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Feb 12, 2016
1 parent c47cb7a commit 31310cd
Show file tree
Hide file tree
Showing 16 changed files with 584 additions and 160 deletions.
15 changes: 12 additions & 3 deletions flink-contrib/flink-statebackend-rocksdb/pom.xml
Expand Up @@ -57,15 +57,24 @@ under the License.
<artifactId>rocksdbjni</artifactId> <artifactId>rocksdbjni</artifactId>
<version>4.1.0</version> <version>4.1.0</version>
</dependency> </dependency>

<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.10</artifactId> <artifactId>flink-runtime_2.10</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies> </dependencies>

</project> </project>
Expand Up @@ -24,11 +24,12 @@
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.state.KvStateSnapshot;

import org.apache.flink.util.HDFSCopyFromLocal; import org.apache.flink.util.HDFSCopyFromLocal;
import org.apache.flink.util.HDFSCopyToLocal; import org.apache.flink.util.HDFSCopyToLocal;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
Expand All @@ -48,6 +49,7 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.UUID;


import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


Expand All @@ -58,6 +60,11 @@
* checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The
* concrete subclasses just use the RocksDB handle to store/retrieve state. * concrete subclasses just use the RocksDB handle to store/retrieve state.
* *
* <p>State is checkpointed asynchronously. The synchronous part is drawing the actual backup
* from RocksDB, this is done in {@link #snapshot(long, long)}. This will return a
* {@link AsyncRocksDBSnapshot} that will perform the copying of the backup to the remote
* file system.
*
* @param <K> The type of the key. * @param <K> The type of the key.
* @param <N> The type of the namespace. * @param <N> The type of the namespace.
* @param <S> The type of {@link State}. * @param <S> The type of {@link State}.
Expand All @@ -81,51 +88,59 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
protected N currentNamespace; protected N currentNamespace;


/** Store it so that we can clean up in dispose() */ /** Store it so that we can clean up in dispose() */
protected final File dbPath; protected final File basePath;


/** FileSystem path where checkpoints are stored */
protected final String checkpointPath; protected final String checkpointPath;


/** Directory in "basePath" where the actual RocksDB data base instance stores its files */
protected final File rocksDbPath;

/** Our RocksDB instance */ /** Our RocksDB instance */
protected final RocksDB db; protected final RocksDB db;



/** /**
* Creates a new RocksDB backed state. * Creates a new RocksDB backed state.
* *
* @param keySerializer The serializer for the keys. * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace. * @param namespaceSerializer The serializer for the namespace.
* @param dbPath The path on the local system where RocksDB data should be stored. * @param basePath The path on the local system where RocksDB data should be stored.
*/ */
protected AbstractRocksDBState( protected AbstractRocksDBState(
TypeSerializer<K> keySerializer, TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer, TypeSerializer<N> namespaceSerializer,
File dbPath, File basePath,
String checkpointPath, String checkpointPath,
Options options) { Options options) {


rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());

this.keySerializer = requireNonNull(keySerializer); this.keySerializer = requireNonNull(keySerializer);
this.namespaceSerializer = namespaceSerializer; this.namespaceSerializer = namespaceSerializer;
this.dbPath = dbPath; this.basePath = basePath;
this.checkpointPath = checkpointPath; this.checkpointPath = checkpointPath;


RocksDB.loadLibrary(); RocksDB.loadLibrary();


if (!dbPath.exists()) { if (!basePath.exists()) {
if (!dbPath.mkdirs()) { if (!basePath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory."); throw new RuntimeException("Could not create RocksDB data directory.");
} }
} }


// clean it, this will remove the last part of the path but RocksDB will recreate it // clean it, this will remove the last part of the path but RocksDB will recreate it
try { try {
File db = new File(dbPath, "db"); if (rocksDbPath.exists()) {
LOG.warn("Deleting already existing db directory {}.", db); LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
FileUtils.deleteDirectory(db); FileUtils.deleteDirectory(rocksDbPath);
}
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Error cleaning RocksDB data directory.", e); throw new RuntimeException("Error cleaning RocksDB data directory.", e);
} }


try { try {
db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath()); db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e); throw new RuntimeException("Error while opening RocksDB instance.", e);
} }
Expand All @@ -137,39 +152,56 @@ protected AbstractRocksDBState(
* *
* @param keySerializer The serializer for the keys. * @param keySerializer The serializer for the keys.
* @param namespaceSerializer The serializer for the namespace. * @param namespaceSerializer The serializer for the namespace.
* @param dbPath The path on the local system where RocksDB data should be stored. * @param basePath The path on the local system where RocksDB data should be stored.
* @param restorePath The path to a backup directory from which to restore RocksDb database. * @param restorePath The path to a backup directory from which to restore RocksDb database.
*/ */
protected AbstractRocksDBState( protected AbstractRocksDBState(
TypeSerializer<K> keySerializer, TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer, TypeSerializer<N> namespaceSerializer,
File dbPath, File basePath,
String checkpointPath, String checkpointPath,
String restorePath, String restorePath,
Options options) { Options options) {


rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());

RocksDB.loadLibrary(); RocksDB.loadLibrary();


// clean it, this will remove the last part of the path but RocksDB will recreate it
try {
if (rocksDbPath.exists()) {
LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
FileUtils.deleteDirectory(rocksDbPath);
}
} catch (IOException e) {
throw new RuntimeException("Error cleaning RocksDB data directory.", e);
}

try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"))) { try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"))) {
backupEngine.restoreDbFromLatestBackup(new File(dbPath, "db").getAbsolutePath(), new File(dbPath, "db").getAbsolutePath(), new RestoreOptions(true)); backupEngine.restoreDbFromLatestBackup(rocksDbPath.getAbsolutePath(), rocksDbPath.getAbsolutePath(), new RestoreOptions(true));
FileUtils.deleteDirectory(new File(restorePath)); } catch (RocksDBException|IllegalArgumentException e) {
} catch (RocksDBException|IOException|IllegalArgumentException e) {
throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e); throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e);
} finally {
try {
FileUtils.deleteDirectory(new File(restorePath));
} catch (IOException e) {
LOG.error("Error cleaning up local restore directory " + restorePath, e);
}
} }


this.keySerializer = requireNonNull(keySerializer); this.keySerializer = requireNonNull(keySerializer);
this.namespaceSerializer = namespaceSerializer; this.namespaceSerializer = namespaceSerializer;
this.dbPath = dbPath; this.basePath = basePath;
this.checkpointPath = checkpointPath; this.checkpointPath = checkpointPath;


if (!dbPath.exists()) { if (!basePath.exists()) {
if (!dbPath.mkdirs()) { if (!basePath.mkdirs()) {
throw new RuntimeException("Could not create RocksDB data directory."); throw new RuntimeException("Could not create RocksDB data directory.");
} }
} }


try { try {
db = RocksDB.open(options, new File(dbPath, "db").getAbsolutePath()); db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
} catch (RocksDBException e) { } catch (RocksDBException e) {
throw new RuntimeException("Error while opening RocksDB instance.", e); throw new RuntimeException("Error while opening RocksDB instance.", e);
} }
Expand Down Expand Up @@ -209,49 +241,41 @@ final public void setCurrentNamespace(N namespace) {
protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId); protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId);


@Override @Override
public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long checkpointId, long timestamp) throws Exception { public final KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(final long checkpointId, long timestamp) throws Exception {
boolean success = false;


final File localBackupPath = new File(dbPath, "backup-" + checkpointId); final File localBackupPath = new File(basePath, "local-chk-" + checkpointId);
final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId); final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId);


try {
if (!localBackupPath.exists()) {
if (!localBackupPath.mkdirs()) {
throw new RuntimeException("Could not create local backup path " + localBackupPath);
}
}


try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) { if (!localBackupPath.exists()) {
backupEngine.createNewBackup(db); if (!localBackupPath.mkdirs()) {
throw new RuntimeException("Could not create local backup path " + localBackupPath);
} }
}


HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri); try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) {
AbstractRocksDBSnapshot<K, N, S, SD> result = createRocksDBSnapshot(backupUri, checkpointId); backupEngine.createNewBackup(db);
success = true;
return result;
} finally {
FileUtils.deleteDirectory(localBackupPath);
if (!success) {
FileSystem fs = FileSystem.get(backupUri, new Configuration());
fs.delete(new Path(backupUri), true);
}
} }

return new AsyncRocksDBSnapshot<>(
localBackupPath,
backupUri,
checkpointId,
this);
} }


@Override @Override
final public void dispose() { final public void dispose() {
db.dispose(); db.dispose();
try { try {
FileUtils.deleteDirectory(dbPath); FileUtils.deleteDirectory(basePath);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Error disposing RocksDB data directory.", e); throw new RuntimeException("Error disposing RocksDB data directory.", e);
} }
} }


public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> protected static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
Expand Down Expand Up @@ -287,6 +311,9 @@ public static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD e
/** Hash of the StateDescriptor, for sanity checks */ /** Hash of the StateDescriptor, for sanity checks */
protected final SD stateDesc; protected final SD stateDesc;


/**
* Creates a new snapshot from the given state parameters.
*/
public AbstractRocksDBSnapshot(File dbPath, public AbstractRocksDBSnapshot(File dbPath,
String checkpointPath, String checkpointPath,
URI backupUri, URI backupUri,
Expand All @@ -305,6 +332,9 @@ public AbstractRocksDBSnapshot(File dbPath,
this.namespaceSerializer = namespaceSerializer; this.namespaceSerializer = namespaceSerializer;
} }


/**
* Subclasses must implement this for creating a concrete RocksDB state.
*/
protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState( protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
TypeSerializer<K> keySerializer, TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer, TypeSerializer<N> namespaceSerializer,
Expand Down Expand Up @@ -336,8 +366,6 @@ public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
} }
} }


FileSystem fs = FileSystem.get(backupUri, new Configuration());

final File localBackupPath = new File(dbPath, "chk-" + checkpointId); final File localBackupPath = new File(dbPath, "chk-" + checkpointId);


if (localBackupPath.exists()) { if (localBackupPath.exists()) {
Expand Down Expand Up @@ -365,5 +393,42 @@ public final long getStateSize() throws Exception {
return 0; return 0;
} }
} }

/**
* Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
* copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}
* of this class.
*/
private static class AsyncRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> extends AsynchronousKvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
private static final long serialVersionUID = 1L;
private final File localBackupPath;
private final URI backupUri;
private final long checkpointId;
private transient AbstractRocksDBState<K, N, S, SD> state;

public AsyncRocksDBSnapshot(File localBackupPath,
URI backupUri,
long checkpointId,
AbstractRocksDBState<K, N, S, SD> state) {
this.localBackupPath = localBackupPath;
this.backupUri = backupUri;
this.checkpointId = checkpointId;
this.state = state;
}

@Override
public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> materialize() throws Exception {
try {
HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
return state.createRocksDBSnapshot(backupUri, checkpointId);
} catch (Exception e) {
FileSystem fs = FileSystem.get(backupUri, new Configuration());
fs.delete(new Path(backupUri), true);
throw e;
} finally {
FileUtils.deleteQuietly(localBackupPath);
}
}
}
} }


Expand Up @@ -156,7 +156,7 @@ public void add(T value) throws IOException {
protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot( protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot(
URI backupUri, long checkpointId) { URI backupUri, long checkpointId) {


return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
} }


private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
Expand Down
Expand Up @@ -154,7 +154,7 @@ protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> cr
URI backupUri, URI backupUri,
long checkpointId) { long checkpointId) {


return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
} }


private static class Snapshot<K, N, V> extends private static class Snapshot<K, N, V> extends
Expand Down
Expand Up @@ -152,7 +152,7 @@ protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescripto
URI backupUri, URI backupUri,
long checkpointId) { long checkpointId) {


return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
} }


private static class Snapshot<K, N, V> extends private static class Snapshot<K, N, V> extends
Expand Down
Expand Up @@ -68,19 +68,17 @@ public class RocksDBStateBackend extends AbstractStateBackend {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);




/** The checkpoint directory that copy the RocksDB backups to. */ /** The checkpoint directory that we copy the RocksDB backups to. */
private final Path checkpointDirectory; private final Path checkpointDirectory;


/** The state backend that stores the non-partitioned state */ /** The state backend that stores the non-partitioned state */
private final AbstractStateBackend nonPartitionedStateBackend; private final AbstractStateBackend nonPartitionedStateBackend;



/** Operator identifier that is used to uniqueify the RocksDB storage path. */ /** Operator identifier that is used to uniqueify the RocksDB storage path. */
private String operatorIdentifier; private String operatorIdentifier;


/** JobID for uniquifying backup paths. */ /** JobID for uniquifying backup paths. */
private JobID jobId; private JobID jobId;



// DB storage directories // DB storage directories


Expand Down
Expand Up @@ -140,7 +140,7 @@ protected AbstractRocksDBSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>>
URI backupUri, URI backupUri,
long checkpointId) { long checkpointId) {


return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
} }


private static class Snapshot<K, N, V> private static class Snapshot<K, N, V>
Expand Down

0 comments on commit 31310cd

Please sign in to comment.