From 918e739cba89b3bed54a24fde0890ba726b1164b Mon Sep 17 00:00:00 2001 From: Jiacheng Liu Date: Fri, 21 Jan 2022 11:27:48 +0800 Subject: [PATCH] Close RocksDB objects that are AutoCloseable ### What changes are proposed in this pull request? Fixes https://github.com/facebook/rocksdb/issues/9378 The `orgs.rocksdb.RocksObject` extends `AutoCloseable` so they should be closed properly to avoid leaks. ### Why are the changes needed? According to the [RocksDB documentation](https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management) and a sample [PR discussion](https://github.com/facebook/rocksdb/pull/7608#discussion_r515159270), the objects need to be closed or wrapped in a try-with-resource block. ### Does this PR introduce any user facing changes? NA pr-link: Alluxio/alluxio#14856 change-id: cid-69a164c547a14e0bb7710fc7a48731aecba4db88 --- .../file/cache/store/RocksPageStore.java | 41 +++++++++++------- .../metastore/rocks/RocksBlockStore.java | 43 +++++++++++-------- .../metastore/rocks/RocksInodeStore.java | 25 ++++++----- .../master/metastore/rocks/RocksStore.java | 24 ++++++++--- .../master/metastore/rocks/RocksUtils.java | 1 + .../metastore/rocks/RocksStoreTest.java | 12 ++---- 6 files changed, 85 insertions(+), 61 deletions(-) diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java index e6fc28c4ac6f..b5cd9392b76c 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java @@ -47,25 +47,27 @@ public class RocksPageStore implements PageStore { private final long mCapacity; private final RocksDB mDb; - private final RocksPageStoreOptions mOptions; + private final RocksPageStoreOptions mPageStoreOptions; + private final Options mDbOptions; /** - * @param options options for the rocks page store + * @param pageStoreOptions options for the rocks page store * @return a new instance of {@link PageStore} backed by RocksDB * @throws IOException if I/O error happens */ - public static RocksPageStore open(RocksPageStoreOptions options) throws IOException { - Preconditions.checkArgument(options.getMaxPageSize() > 0); + public static RocksPageStore open(RocksPageStoreOptions pageStoreOptions) throws IOException { + Preconditions.checkArgument(pageStoreOptions.getMaxPageSize() > 0); RocksDB.loadLibrary(); - Options rocksOptions = new Options(); - rocksOptions.setCreateIfMissing(true); - rocksOptions.setWriteBufferSize(options.getWriteBufferSize()); - rocksOptions.setCompressionType(options.getCompressionType()); + // The RocksObject will be closed together with the RocksPageStore + Options rocksOptions = new Options() + .setCreateIfMissing(true) + .setWriteBufferSize(pageStoreOptions.getWriteBufferSize()) + .setCompressionType(pageStoreOptions.getCompressionType()); RocksDB db = null; try { - db = RocksDB.open(rocksOptions, options.getRootDir()); + db = RocksDB.open(rocksOptions, pageStoreOptions.getRootDir()); byte[] confData = db.get(CONF_KEY); - Cache.PRocksPageStoreOptions pOptions = options.toProto(); + Cache.PRocksPageStoreOptions pOptions = pageStoreOptions.toProto(); if (confData != null) { Cache.PRocksPageStoreOptions persistedOptions = Cache.PRocksPageStoreOptions.parseFrom(confData); @@ -79,19 +81,25 @@ public static RocksPageStore open(RocksPageStoreOptions options) throws IOExcept if (db != null) { db.close(); } + rocksOptions.close(); throw new IOException("Couldn't open rocksDB database", e); } - return new RocksPageStore(options, db); + return new RocksPageStore(pageStoreOptions, db, rocksOptions); } /** * Creates a new instance of {@link PageStore} backed by RocksDB. * - * @param options options for the rocks page store + * @param pageStoreOptions options for the rocks page store + * @param rocksDB RocksDB instance + * @param dbOptions the RocksDB options */ - private RocksPageStore(RocksPageStoreOptions options, RocksDB rocksDB) { - mOptions = options; - mCapacity = (long) (options.getCacheSize() / (1 + options.getOverheadRatio())); + private RocksPageStore(RocksPageStoreOptions pageStoreOptions, RocksDB rocksDB, + Options dbOptions) { + mPageStoreOptions = pageStoreOptions; + mDbOptions = dbOptions; + mCapacity = + (long) (pageStoreOptions.getCacheSize() / (1 + pageStoreOptions.getOverheadRatio())); mDb = rocksDB; } @@ -153,7 +161,10 @@ public void delete(PageId pageId) throws PageNotFoundException { @Override public void close() { + LOG.info("Closing RocksPageStore and recycling all RocksDB JNI objects"); mDb.close(); + mDbOptions.close(); + LOG.info("RocksPageStore closed"); } private static byte[] getKeyFromPageId(PageId pageId) { diff --git a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksBlockStore.java b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksBlockStore.java index d95418dfb2f8..b7be141116bb 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksBlockStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksBlockStore.java @@ -24,7 +24,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompressionType; -import org.rocksdb.DBOptions; import org.rocksdb.HashLinkedListMemTableConfig; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -54,12 +53,15 @@ public class RocksBlockStore implements BlockStore { private static final String BLOCKS_DB_NAME = "blocks"; private static final String BLOCK_META_COLUMN = "block-meta"; private static final String BLOCK_LOCATIONS_COLUMN = "block-locations"; + private static final String ROCKS_STORE_NAME = "BlockStore"; // This is a field instead of a constant because it depends on the call to RocksDB.loadLibrary(). private final WriteOptions mDisableWAL; private final ReadOptions mIteratorOption; + private final ColumnFamilyOptions mColumnFamilyOptions; private final RocksStore mRocksStore; + // The handles are closed in RocksStore private final AtomicReference mBlockMetaColumn = new AtomicReference<>(); private final AtomicReference mBlockLocationsColumn = new AtomicReference<>(); private final LongAdder mSize = new LongAdder(); @@ -74,18 +76,12 @@ public RocksBlockStore(String baseDir) { mDisableWAL = new WriteOptions().setDisableWAL(true); mIteratorOption = new ReadOptions().setReadaheadSize( ServerConfiguration.getBytes(PropertyKey.MASTER_METASTORE_ITERATOR_READAHEAD_SIZE)); - ColumnFamilyOptions cfOpts = new ColumnFamilyOptions() + mColumnFamilyOptions = new ColumnFamilyOptions() .setMemTableConfig(new HashLinkedListMemTableConfig()) .setCompressionType(CompressionType.NO_COMPRESSION); - List columns = - Arrays.asList(new ColumnFamilyDescriptor(BLOCK_META_COLUMN.getBytes(), cfOpts), - new ColumnFamilyDescriptor(BLOCK_LOCATIONS_COLUMN.getBytes(), cfOpts)); - DBOptions dbOpts = new DBOptions() - // Concurrent memtable write is not supported for hash linked list memtable - .setAllowConcurrentMemtableWrite(false) - .setMaxOpenFiles(-1) - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); + List columns = Arrays.asList( + new ColumnFamilyDescriptor(BLOCK_META_COLUMN.getBytes(), mColumnFamilyOptions), + new ColumnFamilyDescriptor(BLOCK_LOCATIONS_COLUMN.getBytes(), mColumnFamilyOptions)); String dbPath = PathUtils.concatPath(baseDir, BLOCKS_DB_NAME); String backupPath = PathUtils.concatPath(baseDir, BLOCKS_DB_NAME + "-backups"); // Create block store db path if it does not exist. @@ -96,7 +92,7 @@ public RocksBlockStore(String baseDir) { LOG.warn("Failed to create nonexistent db path at: {}. Error:{}", dbPath, e); } } - mRocksStore = new RocksStore(dbPath, backupPath, columns, dbOpts, + mRocksStore = new RocksStore(ROCKS_STORE_NAME, dbPath, backupPath, columns, Arrays.asList(mBlockMetaColumn, mBlockLocationsColumn)); } @@ -161,7 +157,12 @@ public long size() { @Override public void close() { mSize.reset(); + LOG.info("Closing RocksBlockStore and recycling all RocksDB JNI objects"); mRocksStore.close(); + mIteratorOption.close(); + mDisableWAL.close(); + mColumnFamilyOptions.close(); + LOG.info("RocksBlockStore closed"); } @Override @@ -169,10 +170,16 @@ public List getLocations(long id) { byte[] startKey = RocksUtils.toByteArray(id, 0); byte[] endKey = RocksUtils.toByteArray(id, Long.MAX_VALUE); - // Explicitly hold a reference to the ReadOptions object from the discussion in - // https://groups.google.com/g/rocksdb/c/PwapmWwyBbc/m/ecl7oW3AAgAJ - final ReadOptions readOptions = new ReadOptions().setIterateUpperBound(new Slice(endKey)); - try (RocksIterator iter = db().newIterator(mBlockLocationsColumn.get(), readOptions)) { + // References to the RocksObject need to be held explicitly and kept from GC + // In order to prevent segfaults in the native code execution + // Ref: https://github.com/facebook/rocksdb/issues/9378 + // All RocksObject should be closed properly at the end of usage + // When there are multiple resources declared in the try-with-resource block + // They are closed in the opposite order of declaration + // Ref: https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html + try (final Slice slice = new Slice(endKey); + final ReadOptions readOptions = new ReadOptions().setIterateUpperBound(slice); + final RocksIterator iter = db().newIterator(mBlockLocationsColumn.get(), readOptions)) { iter.seek(startKey); List locations = new ArrayList<>(); for (; iter.isValid(); iter.next()) { @@ -208,7 +215,9 @@ public void removeLocation(long blockId, long workerId) { @Override public Iterator iterator() { - return RocksUtils.createIterator(db().newIterator(mBlockMetaColumn.get(), mIteratorOption), + // TODO(jiacheng): close the iterator when we iterate the BlockStore for backup + RocksIterator iterator = db().newIterator(mBlockMetaColumn.get(), mIteratorOption); + return RocksUtils.createIterator(iterator, (iter) -> new Block(Longs.fromByteArray(iter.key()), BlockMeta.parseFrom(iter.value()))); } diff --git a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksInodeStore.java b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksInodeStore.java index 4795b7481fde..8684c5f336fa 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksInodeStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksInodeStore.java @@ -30,7 +30,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompressionType; -import org.rocksdb.DBOptions; import org.rocksdb.HashLinkedListMemTableConfig; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -61,6 +60,7 @@ public class RocksInodeStore implements InodeStore { private static final String INODES_DB_NAME = "inodes"; private static final String INODES_COLUMN = "inodes"; private static final String EDGES_COLUMN = "edges"; + private static final String ROCKS_STORE_NAME = "InodeStore"; // These are fields instead of constants because they depend on the call to RocksDB.loadLibrary(). private final WriteOptions mDisableWAL; @@ -68,6 +68,7 @@ public class RocksInodeStore implements InodeStore { private final ReadOptions mIteratorOption; private final RocksStore mRocksStore; + private final ColumnFamilyOptions mColumnFamilyOpts; private final AtomicReference mInodesColumn = new AtomicReference<>(); private final AtomicReference mEdgesColumn = new AtomicReference<>(); @@ -85,20 +86,14 @@ public RocksInodeStore(String baseDir) { ServerConfiguration.getBytes(PropertyKey.MASTER_METASTORE_ITERATOR_READAHEAD_SIZE)); String dbPath = PathUtils.concatPath(baseDir, INODES_DB_NAME); String backupPath = PathUtils.concatPath(baseDir, INODES_DB_NAME + "-backup"); - ColumnFamilyOptions cfOpts = new ColumnFamilyOptions() + mColumnFamilyOpts = new ColumnFamilyOptions() .setMemTableConfig(new HashLinkedListMemTableConfig()) .setCompressionType(CompressionType.NO_COMPRESSION) .useFixedLengthPrefixExtractor(Longs.BYTES); // We always search using the initial long key List columns = Arrays.asList( - new ColumnFamilyDescriptor(INODES_COLUMN.getBytes(), cfOpts), - new ColumnFamilyDescriptor(EDGES_COLUMN.getBytes(), cfOpts)); - DBOptions dbOpts = new DBOptions() - // Concurrent memtable write is not supported for hash linked list memtable - .setAllowConcurrentMemtableWrite(false) - .setMaxOpenFiles(-1) - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); - mRocksStore = new RocksStore(dbPath, backupPath, columns, dbOpts, + new ColumnFamilyDescriptor(INODES_COLUMN.getBytes(), mColumnFamilyOpts), + new ColumnFamilyDescriptor(EDGES_COLUMN.getBytes(), mColumnFamilyOpts)); + mRocksStore = new RocksStore(ROCKS_STORE_NAME, dbPath, backupPath, columns, Arrays.asList(mInodesColumn, mEdgesColumn)); } @@ -249,6 +244,7 @@ public Set> allInodes() { * @return an iterator over stored inodes */ public Iterator iterator() { + // TODO(jiacheng): close the iterator when we iterate the BlockStore for backup return RocksUtils.createIterator(db().newIterator(mInodesColumn.get(), mIteratorOption), (iter) -> getMutable(Longs.fromByteArray(iter.key()), ReadOption.defaults()).get()); } @@ -331,7 +327,10 @@ public void close() { @Override public void close() { + LOG.info("Closing RocksInodeStore and recycling all RocksDB JNI objects"); mRocksStore.close(); + mColumnFamilyOpts.close(); + LOG.info("RocksInodeStore closed"); } private RocksDB db() { @@ -344,8 +343,8 @@ private RocksDB db() { */ public String toStringEntries() { StringBuilder sb = new StringBuilder(); - try (RocksIterator inodeIter = - db().newIterator(mInodesColumn.get(), new ReadOptions().setTotalOrderSeek(true))) { + try (ReadOptions readOptions = new ReadOptions().setTotalOrderSeek(true); + RocksIterator inodeIter = db().newIterator(mInodesColumn.get(), readOptions)) { inodeIter.seekToFirst(); while (inodeIter.isValid()) { MutableInode inode; diff --git a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksStore.java b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksStore.java index 87493e565cb1..5d901d299ce6 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksStore.java @@ -50,7 +50,7 @@ public final class RocksStore implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(RocksStore.class); public static final int ROCKS_OPEN_RETRY_TIMEOUT = 20 * Constants.SECOND_MS; - + private final String mName; private final String mDbPath; private final String mDbCheckpointPath; private final Collection mColumnFamilyDescriptors; @@ -62,20 +62,26 @@ public final class RocksStore implements Closeable { private List> mColumnHandles; /** + * @param name a name to distinguish what store this is * @param dbPath a path for the rocks database * @param checkpointPath a path for taking database checkpoints * @param columnFamilyDescriptors columns to create within the rocks database - * @param dbOpts db options * @param columnHandles column handle references to populate */ - public RocksStore(String dbPath, String checkpointPath, - Collection columnFamilyDescriptors, DBOptions dbOpts, + public RocksStore(String name, String dbPath, String checkpointPath, + Collection columnFamilyDescriptors, List> columnHandles) { Preconditions.checkState(columnFamilyDescriptors.size() == columnHandles.size()); + mName = name; mDbPath = dbPath; mDbCheckpointPath = checkpointPath; mColumnFamilyDescriptors = columnFamilyDescriptors; - mDbOpts = dbOpts; + mDbOpts = new DBOptions() + // Concurrent memtable write is not supported for hash linked list memtable + .setAllowConcurrentMemtableWrite(false) + .setMaxOpenFiles(-1) + .setCreateIfMissing(true) + .setCreateMissingColumnFamilies(true); mColumnHandles = columnHandles; try { resetDb(); @@ -111,12 +117,15 @@ private void resetDb() throws RocksDBException { } private void stopDb() { + LOG.info("Closing {} rocks database", mName); if (mDb != null) { try { // Column handles must be closed before closing the db, or an exception gets thrown. mColumnHandles.forEach(handle -> { - handle.get().close(); - handle.set(null); + if (handle != null) { + handle.get().close(); + handle.set(null); + } }); mDb.close(); mCheckpoint.close(); @@ -217,6 +226,7 @@ public synchronized void restoreFromCheckpoint(CheckpointInputStream input) thro @Override public synchronized void close() { stopDb(); + mDbOpts.close(); LOG.info("Closed store at {}", mDbPath); } } diff --git a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksUtils.java b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksUtils.java index 59724bfe5106..3f33b767bc76 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksUtils.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/rocks/RocksUtils.java @@ -118,6 +118,7 @@ public boolean hasNext() { } @Override + // TODO(jiacheng): close this iterator properly on finish public T next() { try { return parser.next(rocksIterator); diff --git a/core/server/master/src/test/java/alluxio/master/metastore/rocks/RocksStoreTest.java b/core/server/master/src/test/java/alluxio/master/metastore/rocks/RocksStoreTest.java index 22a5daf11cbf..cd6be08c6cdd 100644 --- a/core/server/master/src/test/java/alluxio/master/metastore/rocks/RocksStoreTest.java +++ b/core/server/master/src/test/java/alluxio/master/metastore/rocks/RocksStoreTest.java @@ -23,7 +23,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompressionType; -import org.rocksdb.DBOptions; import org.rocksdb.HashLinkedListMemTableConfig; import org.rocksdb.RocksDB; import org.rocksdb.WriteOptions; @@ -44,12 +43,6 @@ public void backupRestore() throws Exception { .setMemTableConfig(new HashLinkedListMemTableConfig()) .setCompressionType(CompressionType.NO_COMPRESSION) .useFixedLengthPrefixExtractor(Longs.BYTES); // We always search using the initial long key - DBOptions dbOpts = new DBOptions() - // Concurrent memtable write is not supported for hash linked list memtable - .setAllowConcurrentMemtableWrite(false) - .setMaxOpenFiles(-1) - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); List columnDescriptors = Arrays.asList(new ColumnFamilyDescriptor("test".getBytes(), cfOpts)); @@ -57,7 +50,7 @@ public void backupRestore() throws Exception { String backupsDir = mFolder.newFolder("rocks-backups").getAbsolutePath(); AtomicReference testColumn = new AtomicReference<>(); RocksStore store = - new RocksStore(dbDir, backupsDir, columnDescriptors, dbOpts, Arrays.asList(testColumn)); + new RocksStore("test", dbDir, backupsDir, columnDescriptors, Arrays.asList(testColumn)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); RocksDB db = store.getDb(); int count = 10; @@ -69,7 +62,8 @@ public void backupRestore() throws Exception { String newBbDir = mFolder.newFolder("rocks-new").getAbsolutePath(); store = - new RocksStore(newBbDir, backupsDir, columnDescriptors, dbOpts, Arrays.asList(testColumn)); + new RocksStore("test-new", newBbDir, backupsDir, columnDescriptors, + Arrays.asList(testColumn)); store.restoreFromCheckpoint( new CheckpointInputStream(new ByteArrayInputStream(baos.toByteArray()))); db = store.getDb();