Skip to content

Commit

Permalink
Close RocksDB objects that are AutoCloseable
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Fixes facebook/rocksdb#9378
The `orgs.rocksdb.RocksObject` extends `AutoCloseable` so they should be
closed properly to avoid leaks.

### Why are the changes needed?

According to the [RocksDB
documentation](https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management)
and a sample [PR
discussion](facebook/rocksdb#7608 (comment)),
the objects need to be closed or wrapped in a try-with-resource block.

### Does this PR introduce any user facing changes?

NA

pr-link: #14856
change-id: cid-69a164c547a14e0bb7710fc7a48731aecba4db88
  • Loading branch information
jiacheliu3 committed Jan 21, 2022
1 parent a16bc95 commit 918e739
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 61 deletions.
Expand Up @@ -47,25 +47,27 @@ public class RocksPageStore implements PageStore {

private final long mCapacity;
private final RocksDB mDb;
private final RocksPageStoreOptions mOptions;
private final RocksPageStoreOptions mPageStoreOptions;
private final Options mDbOptions;

/**
* @param options options for the rocks page store
* @param pageStoreOptions options for the rocks page store
* @return a new instance of {@link PageStore} backed by RocksDB
* @throws IOException if I/O error happens
*/
public static RocksPageStore open(RocksPageStoreOptions options) throws IOException {
Preconditions.checkArgument(options.getMaxPageSize() > 0);
public static RocksPageStore open(RocksPageStoreOptions pageStoreOptions) throws IOException {
Preconditions.checkArgument(pageStoreOptions.getMaxPageSize() > 0);
RocksDB.loadLibrary();
Options rocksOptions = new Options();
rocksOptions.setCreateIfMissing(true);
rocksOptions.setWriteBufferSize(options.getWriteBufferSize());
rocksOptions.setCompressionType(options.getCompressionType());
// The RocksObject will be closed together with the RocksPageStore
Options rocksOptions = new Options()
.setCreateIfMissing(true)
.setWriteBufferSize(pageStoreOptions.getWriteBufferSize())
.setCompressionType(pageStoreOptions.getCompressionType());
RocksDB db = null;
try {
db = RocksDB.open(rocksOptions, options.getRootDir());
db = RocksDB.open(rocksOptions, pageStoreOptions.getRootDir());
byte[] confData = db.get(CONF_KEY);
Cache.PRocksPageStoreOptions pOptions = options.toProto();
Cache.PRocksPageStoreOptions pOptions = pageStoreOptions.toProto();
if (confData != null) {
Cache.PRocksPageStoreOptions persistedOptions =
Cache.PRocksPageStoreOptions.parseFrom(confData);
Expand All @@ -79,19 +81,25 @@ public static RocksPageStore open(RocksPageStoreOptions options) throws IOExcept
if (db != null) {
db.close();
}
rocksOptions.close();
throw new IOException("Couldn't open rocksDB database", e);
}
return new RocksPageStore(options, db);
return new RocksPageStore(pageStoreOptions, db, rocksOptions);
}

/**
* Creates a new instance of {@link PageStore} backed by RocksDB.
*
* @param options options for the rocks page store
* @param pageStoreOptions options for the rocks page store
* @param rocksDB RocksDB instance
* @param dbOptions the RocksDB options
*/
private RocksPageStore(RocksPageStoreOptions options, RocksDB rocksDB) {
mOptions = options;
mCapacity = (long) (options.getCacheSize() / (1 + options.getOverheadRatio()));
private RocksPageStore(RocksPageStoreOptions pageStoreOptions, RocksDB rocksDB,
Options dbOptions) {
mPageStoreOptions = pageStoreOptions;
mDbOptions = dbOptions;
mCapacity =
(long) (pageStoreOptions.getCacheSize() / (1 + pageStoreOptions.getOverheadRatio()));
mDb = rocksDB;
}

Expand Down Expand Up @@ -153,7 +161,10 @@ public void delete(PageId pageId) throws PageNotFoundException {

@Override
public void close() {
LOG.info("Closing RocksPageStore and recycling all RocksDB JNI objects");
mDb.close();
mDbOptions.close();
LOG.info("RocksPageStore closed");
}

private static byte[] getKeyFromPageId(PageId pageId) {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.HashLinkedListMemTableConfig;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
Expand Down Expand Up @@ -54,12 +53,15 @@ public class RocksBlockStore implements BlockStore {
private static final String BLOCKS_DB_NAME = "blocks";
private static final String BLOCK_META_COLUMN = "block-meta";
private static final String BLOCK_LOCATIONS_COLUMN = "block-locations";
private static final String ROCKS_STORE_NAME = "BlockStore";

// This is a field instead of a constant because it depends on the call to RocksDB.loadLibrary().
private final WriteOptions mDisableWAL;
private final ReadOptions mIteratorOption;
private final ColumnFamilyOptions mColumnFamilyOptions;

private final RocksStore mRocksStore;
// The handles are closed in RocksStore
private final AtomicReference<ColumnFamilyHandle> mBlockMetaColumn = new AtomicReference<>();
private final AtomicReference<ColumnFamilyHandle> mBlockLocationsColumn = new AtomicReference<>();
private final LongAdder mSize = new LongAdder();
Expand All @@ -74,18 +76,12 @@ public RocksBlockStore(String baseDir) {
mDisableWAL = new WriteOptions().setDisableWAL(true);
mIteratorOption = new ReadOptions().setReadaheadSize(
ServerConfiguration.getBytes(PropertyKey.MASTER_METASTORE_ITERATOR_READAHEAD_SIZE));
ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
mColumnFamilyOptions = new ColumnFamilyOptions()
.setMemTableConfig(new HashLinkedListMemTableConfig())
.setCompressionType(CompressionType.NO_COMPRESSION);
List<ColumnFamilyDescriptor> columns =
Arrays.asList(new ColumnFamilyDescriptor(BLOCK_META_COLUMN.getBytes(), cfOpts),
new ColumnFamilyDescriptor(BLOCK_LOCATIONS_COLUMN.getBytes(), cfOpts));
DBOptions dbOpts = new DBOptions()
// Concurrent memtable write is not supported for hash linked list memtable
.setAllowConcurrentMemtableWrite(false)
.setMaxOpenFiles(-1)
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
List<ColumnFamilyDescriptor> columns = Arrays.asList(
new ColumnFamilyDescriptor(BLOCK_META_COLUMN.getBytes(), mColumnFamilyOptions),
new ColumnFamilyDescriptor(BLOCK_LOCATIONS_COLUMN.getBytes(), mColumnFamilyOptions));
String dbPath = PathUtils.concatPath(baseDir, BLOCKS_DB_NAME);
String backupPath = PathUtils.concatPath(baseDir, BLOCKS_DB_NAME + "-backups");
// Create block store db path if it does not exist.
Expand All @@ -96,7 +92,7 @@ public RocksBlockStore(String baseDir) {
LOG.warn("Failed to create nonexistent db path at: {}. Error:{}", dbPath, e);
}
}
mRocksStore = new RocksStore(dbPath, backupPath, columns, dbOpts,
mRocksStore = new RocksStore(ROCKS_STORE_NAME, dbPath, backupPath, columns,
Arrays.asList(mBlockMetaColumn, mBlockLocationsColumn));
}

Expand Down Expand Up @@ -161,18 +157,29 @@ public long size() {
@Override
public void close() {
mSize.reset();
LOG.info("Closing RocksBlockStore and recycling all RocksDB JNI objects");
mRocksStore.close();
mIteratorOption.close();
mDisableWAL.close();
mColumnFamilyOptions.close();
LOG.info("RocksBlockStore closed");
}

@Override
public List<BlockLocation> getLocations(long id) {
byte[] startKey = RocksUtils.toByteArray(id, 0);
byte[] endKey = RocksUtils.toByteArray(id, Long.MAX_VALUE);

// Explicitly hold a reference to the ReadOptions object from the discussion in
// https://groups.google.com/g/rocksdb/c/PwapmWwyBbc/m/ecl7oW3AAgAJ
final ReadOptions readOptions = new ReadOptions().setIterateUpperBound(new Slice(endKey));
try (RocksIterator iter = db().newIterator(mBlockLocationsColumn.get(), readOptions)) {
// References to the RocksObject need to be held explicitly and kept from GC
// In order to prevent segfaults in the native code execution
// Ref: https://github.com/facebook/rocksdb/issues/9378
// All RocksObject should be closed properly at the end of usage
// When there are multiple resources declared in the try-with-resource block
// They are closed in the opposite order of declaration
// Ref: https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
try (final Slice slice = new Slice(endKey);
final ReadOptions readOptions = new ReadOptions().setIterateUpperBound(slice);
final RocksIterator iter = db().newIterator(mBlockLocationsColumn.get(), readOptions)) {
iter.seek(startKey);
List<BlockLocation> locations = new ArrayList<>();
for (; iter.isValid(); iter.next()) {
Expand Down Expand Up @@ -208,7 +215,9 @@ public void removeLocation(long blockId, long workerId) {

@Override
public Iterator<Block> iterator() {
return RocksUtils.createIterator(db().newIterator(mBlockMetaColumn.get(), mIteratorOption),
// TODO(jiacheng): close the iterator when we iterate the BlockStore for backup
RocksIterator iterator = db().newIterator(mBlockMetaColumn.get(), mIteratorOption);
return RocksUtils.createIterator(iterator,
(iter) -> new Block(Longs.fromByteArray(iter.key()), BlockMeta.parseFrom(iter.value())));
}

Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.HashLinkedListMemTableConfig;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
Expand Down Expand Up @@ -61,13 +60,15 @@ public class RocksInodeStore implements InodeStore {
private static final String INODES_DB_NAME = "inodes";
private static final String INODES_COLUMN = "inodes";
private static final String EDGES_COLUMN = "edges";
private static final String ROCKS_STORE_NAME = "InodeStore";

// These are fields instead of constants because they depend on the call to RocksDB.loadLibrary().
private final WriteOptions mDisableWAL;
private final ReadOptions mReadPrefixSameAsStart;
private final ReadOptions mIteratorOption;

private final RocksStore mRocksStore;
private final ColumnFamilyOptions mColumnFamilyOpts;

private final AtomicReference<ColumnFamilyHandle> mInodesColumn = new AtomicReference<>();
private final AtomicReference<ColumnFamilyHandle> mEdgesColumn = new AtomicReference<>();
Expand All @@ -85,20 +86,14 @@ public RocksInodeStore(String baseDir) {
ServerConfiguration.getBytes(PropertyKey.MASTER_METASTORE_ITERATOR_READAHEAD_SIZE));
String dbPath = PathUtils.concatPath(baseDir, INODES_DB_NAME);
String backupPath = PathUtils.concatPath(baseDir, INODES_DB_NAME + "-backup");
ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
mColumnFamilyOpts = new ColumnFamilyOptions()
.setMemTableConfig(new HashLinkedListMemTableConfig())
.setCompressionType(CompressionType.NO_COMPRESSION)
.useFixedLengthPrefixExtractor(Longs.BYTES); // We always search using the initial long key
List<ColumnFamilyDescriptor> columns = Arrays.asList(
new ColumnFamilyDescriptor(INODES_COLUMN.getBytes(), cfOpts),
new ColumnFamilyDescriptor(EDGES_COLUMN.getBytes(), cfOpts));
DBOptions dbOpts = new DBOptions()
// Concurrent memtable write is not supported for hash linked list memtable
.setAllowConcurrentMemtableWrite(false)
.setMaxOpenFiles(-1)
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
mRocksStore = new RocksStore(dbPath, backupPath, columns, dbOpts,
new ColumnFamilyDescriptor(INODES_COLUMN.getBytes(), mColumnFamilyOpts),
new ColumnFamilyDescriptor(EDGES_COLUMN.getBytes(), mColumnFamilyOpts));
mRocksStore = new RocksStore(ROCKS_STORE_NAME, dbPath, backupPath, columns,
Arrays.asList(mInodesColumn, mEdgesColumn));
}

Expand Down Expand Up @@ -249,6 +244,7 @@ public Set<MutableInode<?>> allInodes() {
* @return an iterator over stored inodes
*/
public Iterator<InodeView> iterator() {
// TODO(jiacheng): close the iterator when we iterate the BlockStore for backup
return RocksUtils.createIterator(db().newIterator(mInodesColumn.get(), mIteratorOption),
(iter) -> getMutable(Longs.fromByteArray(iter.key()), ReadOption.defaults()).get());
}
Expand Down Expand Up @@ -331,7 +327,10 @@ public void close() {

@Override
public void close() {
LOG.info("Closing RocksInodeStore and recycling all RocksDB JNI objects");
mRocksStore.close();
mColumnFamilyOpts.close();
LOG.info("RocksInodeStore closed");
}

private RocksDB db() {
Expand All @@ -344,8 +343,8 @@ private RocksDB db() {
*/
public String toStringEntries() {
StringBuilder sb = new StringBuilder();
try (RocksIterator inodeIter =
db().newIterator(mInodesColumn.get(), new ReadOptions().setTotalOrderSeek(true))) {
try (ReadOptions readOptions = new ReadOptions().setTotalOrderSeek(true);
RocksIterator inodeIter = db().newIterator(mInodesColumn.get(), readOptions)) {
inodeIter.seekToFirst();
while (inodeIter.isValid()) {
MutableInode<?> inode;
Expand Down
Expand Up @@ -50,7 +50,7 @@
public final class RocksStore implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RocksStore.class);
public static final int ROCKS_OPEN_RETRY_TIMEOUT = 20 * Constants.SECOND_MS;

private final String mName;
private final String mDbPath;
private final String mDbCheckpointPath;
private final Collection<ColumnFamilyDescriptor> mColumnFamilyDescriptors;
Expand All @@ -62,20 +62,26 @@ public final class RocksStore implements Closeable {
private List<AtomicReference<ColumnFamilyHandle>> mColumnHandles;

/**
* @param name a name to distinguish what store this is
* @param dbPath a path for the rocks database
* @param checkpointPath a path for taking database checkpoints
* @param columnFamilyDescriptors columns to create within the rocks database
* @param dbOpts db options
* @param columnHandles column handle references to populate
*/
public RocksStore(String dbPath, String checkpointPath,
Collection<ColumnFamilyDescriptor> columnFamilyDescriptors, DBOptions dbOpts,
public RocksStore(String name, String dbPath, String checkpointPath,
Collection<ColumnFamilyDescriptor> columnFamilyDescriptors,
List<AtomicReference<ColumnFamilyHandle>> columnHandles) {
Preconditions.checkState(columnFamilyDescriptors.size() == columnHandles.size());
mName = name;
mDbPath = dbPath;
mDbCheckpointPath = checkpointPath;
mColumnFamilyDescriptors = columnFamilyDescriptors;
mDbOpts = dbOpts;
mDbOpts = new DBOptions()
// Concurrent memtable write is not supported for hash linked list memtable
.setAllowConcurrentMemtableWrite(false)
.setMaxOpenFiles(-1)
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
mColumnHandles = columnHandles;
try {
resetDb();
Expand Down Expand Up @@ -111,12 +117,15 @@ private void resetDb() throws RocksDBException {
}

private void stopDb() {
LOG.info("Closing {} rocks database", mName);
if (mDb != null) {
try {
// Column handles must be closed before closing the db, or an exception gets thrown.
mColumnHandles.forEach(handle -> {
handle.get().close();
handle.set(null);
if (handle != null) {
handle.get().close();
handle.set(null);
}
});
mDb.close();
mCheckpoint.close();
Expand Down Expand Up @@ -217,6 +226,7 @@ public synchronized void restoreFromCheckpoint(CheckpointInputStream input) thro
@Override
public synchronized void close() {
stopDb();
mDbOpts.close();
LOG.info("Closed store at {}", mDbPath);
}
}
Expand Up @@ -118,6 +118,7 @@ public boolean hasNext() {
}

@Override
// TODO(jiacheng): close this iterator properly on finish
public T next() {
try {
return parser.next(rocksIterator);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.HashLinkedListMemTableConfig;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
Expand All @@ -44,20 +43,14 @@ public void backupRestore() throws Exception {
.setMemTableConfig(new HashLinkedListMemTableConfig())
.setCompressionType(CompressionType.NO_COMPRESSION)
.useFixedLengthPrefixExtractor(Longs.BYTES); // We always search using the initial long key
DBOptions dbOpts = new DBOptions()
// Concurrent memtable write is not supported for hash linked list memtable
.setAllowConcurrentMemtableWrite(false)
.setMaxOpenFiles(-1)
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);

List<ColumnFamilyDescriptor> columnDescriptors =
Arrays.asList(new ColumnFamilyDescriptor("test".getBytes(), cfOpts));
String dbDir = mFolder.newFolder("rocks").getAbsolutePath();
String backupsDir = mFolder.newFolder("rocks-backups").getAbsolutePath();
AtomicReference<ColumnFamilyHandle> testColumn = new AtomicReference<>();
RocksStore store =
new RocksStore(dbDir, backupsDir, columnDescriptors, dbOpts, Arrays.asList(testColumn));
new RocksStore("test", dbDir, backupsDir, columnDescriptors, Arrays.asList(testColumn));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RocksDB db = store.getDb();
int count = 10;
Expand All @@ -69,7 +62,8 @@ public void backupRestore() throws Exception {

String newBbDir = mFolder.newFolder("rocks-new").getAbsolutePath();
store =
new RocksStore(newBbDir, backupsDir, columnDescriptors, dbOpts, Arrays.asList(testColumn));
new RocksStore("test-new", newBbDir, backupsDir, columnDescriptors,
Arrays.asList(testColumn));
store.restoreFromCheckpoint(
new CheckpointInputStream(new ByteArrayInputStream(baos.toByteArray())));
db = store.getDb();
Expand Down

0 comments on commit 918e739

Please sign in to comment.