Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrent CF iteration and drop #6005

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/column_family.cc
Expand Up @@ -106,6 +106,10 @@ const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
return cfd()->user_comparator();
}

ColumnFamilyHandle* ColumnFamilyHandleImpl::CloneHandle() const {
return new ColumnFamilyHandleImpl(this->cfd_, this->db_, this->mutex_);
}

void GetIntTblPropCollectorFactory(
const ImmutableCFOptions& ioptions,
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Expand Up @@ -169,6 +169,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual const std::string& GetName() const override;
virtual Status GetDescriptor(ColumnFamilyDescriptor* desc) override;
virtual const Comparator* GetComparator() const override;
virtual ColumnFamilyHandle* CloneHandle() const override;

private:
ColumnFamilyData* cfd_;
Expand Down
3 changes: 3 additions & 0 deletions db/write_batch_test.cc
Expand Up @@ -612,6 +612,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
const Comparator* GetComparator() const override {
return BytewiseComparator();
}
ColumnFamilyHandle* CloneHandle() const override {
return new ColumnFamilyHandleImplDummy(id_);
}

private:
uint32_t id_;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/db.h
Expand Up @@ -88,6 +88,8 @@ class ColumnFamilyHandle {
// Returns the comparator of the column family associated with the
// current handle.
virtual const Comparator* GetComparator() const = 0;
// Returns a copy of this handle used to pin column family
virtual ColumnFamilyHandle* CloneHandle() const = 0;
};

static const int kMajorVersion = __ROCKSDB_MAJOR__;
Expand Down
13 changes: 13 additions & 0 deletions java/rocksjni/columnfamilyhandle.cc
Expand Up @@ -58,6 +58,19 @@ jobject Java_org_rocksdb_ColumnFamilyHandle_getDescriptor(JNIEnv* env,
}
}

/*
* Class: org_rocksdb_ColumnFamilyHandle
* Method: cloneHandle
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_org_rocksdb_ColumnFamilyHandle_cloneHandle(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jhandle) {
auto* cfh = reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jhandle);
auto* clone = cfh->CloneHandle();
return reinterpret_cast<jlong>(clone);
}

/*
* Class: org_rocksdb_ColumnFamilyHandle
* Method: disposeInternal
Expand Down
18 changes: 14 additions & 4 deletions java/src/main/java/org/rocksdb/AbstractRocksIterator.java
Expand Up @@ -21,11 +21,18 @@
*/
public abstract class AbstractRocksIterator<P extends RocksObject>
extends RocksObject implements RocksIteratorInterface {
final RocksObject owner_;
final P parent_;

protected AbstractRocksIterator(final P parent,
protected AbstractRocksIterator(final P parent, final long nativeHandle) {
this(null, parent, nativeHandle);
}

protected AbstractRocksIterator(final RocksObject owner, final P parent,
final long nativeHandle) {
super(nativeHandle);
// owner for pinning underlying C++ owner object, or it can be null
owner_ = owner;
// parent must point to a valid RocksDB instance.
assert (parent != null);
// RocksIterator must hold a reference to the related parent instance
Expand Down Expand Up @@ -92,9 +99,12 @@ public void status() throws RocksDBException {
*/
@Override
protected void disposeInternal() {
if (parent_.isOwningHandle()) {
disposeInternal(nativeHandle_);
}
if (parent_.isOwningHandle()) {
disposeInternal(nativeHandle_);
}
if (owner_ != null) {
owner_.close();
}
}

abstract boolean isValid0(long handle);
Expand Down
33 changes: 28 additions & 5 deletions java/src/main/java/org/rocksdb/ColumnFamilyHandle.java
Expand Up @@ -13,8 +13,9 @@
* ColumnFamily Pointers.
*/
public class ColumnFamilyHandle extends RocksObject {
ColumnFamilyHandle(final RocksDB rocksDB,
final long nativeHandle) {
final RocksDB rocksDB_;

ColumnFamilyHandle(final RocksDB rocksDB, final long nativeHandle) {
super(nativeHandle);
// rocksDB must point to a valid RocksDB instance;
assert(rocksDB != null);
Expand All @@ -32,6 +33,7 @@ public class ColumnFamilyHandle extends RocksObject {
* @throws RocksDBException if an error occurs whilst retrieving the name.
*/
public byte[] getName() throws RocksDBException {
assert(isOwningHandle() || isDefaultColumnFamily());
return getName(nativeHandle_);
}

Expand All @@ -41,6 +43,7 @@ public byte[] getName() throws RocksDBException {
* @return the ID of the Column Family.
*/
public int getID() {
assert(isOwningHandle() || isDefaultColumnFamily());
return getID(nativeHandle_);
}

Expand All @@ -59,10 +62,22 @@ public int getID() {
* descriptor.
*/
public ColumnFamilyDescriptor getDescriptor() throws RocksDBException {
assert(isOwningHandle());
assert(isOwningHandle() || isDefaultColumnFamily());
return getDescriptor(nativeHandle_);
}

/**
* Gets a copy of the Column Family Handle, used to prevent Column Family
* from being released before some operations based on the Column Family,
* such as iterators for querying the Column Family.
*
* @return a copy of ColumnFamilyHandle
*/
public ColumnFamilyHandle cloneHandle() {
assert(isOwningHandle() || isDefaultColumnFamily());
return new ColumnFamilyHandle(rocksDB_, cloneHandle(nativeHandle_));
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -91,6 +106,10 @@ public int hashCode() {
}
}

protected boolean isDefaultColumnFamily() {
return nativeHandle_ == rocksDB_.getDefaultColumnFamily().nativeHandle_;
}

/**
* <p>Deletes underlying C++ iterator pointer.</p>
*
Expand All @@ -101,15 +120,19 @@ public int hashCode() {
*/
@Override
protected void disposeInternal() {
if (iter_ != null) {
iter_.close();
}
if(rocksDB_.isOwningHandle()) {
disposeInternal(nativeHandle_);
}
}

protected AbstractRocksIterator iter_ = null;

private native byte[] getName(final long handle) throws RocksDBException;
private native int getID(final long handle);
private native ColumnFamilyDescriptor getDescriptor(final long handle) throws RocksDBException;
private native long cloneHandle(final long handle);
@Override protected final native void disposeInternal(final long handle);

private final RocksDB rocksDB_;
}
28 changes: 16 additions & 12 deletions java/src/main/java/org/rocksdb/RocksDB.java
Expand Up @@ -2406,8 +2406,9 @@ public RocksIterator newIterator(final ReadOptions readOptions) {
*/
public RocksIterator newIterator(
final ColumnFamilyHandle columnFamilyHandle) {
return new RocksIterator(this, iteratorCF(nativeHandle_,
columnFamilyHandle.nativeHandle_));
ColumnFamilyHandle cfHandle = columnFamilyHandle.cloneHandle();
return new RocksIterator(cfHandle,
iteratorCF(nativeHandle_, cfHandle.nativeHandle_));
}

/**
Expand All @@ -2427,8 +2428,9 @@ public RocksIterator newIterator(
*/
public RocksIterator newIterator(final ColumnFamilyHandle columnFamilyHandle,
final ReadOptions readOptions) {
return new RocksIterator(this, iteratorCF(nativeHandle_,
columnFamilyHandle.nativeHandle_, readOptions.nativeHandle_));
ColumnFamilyHandle cfHandle = columnFamilyHandle.cloneHandle();
return new RocksIterator(cfHandle, iteratorCF(nativeHandle_,
cfHandle.nativeHandle_, readOptions.nativeHandle_));
}

/**
Expand Down Expand Up @@ -2468,18 +2470,20 @@ public List<RocksIterator> newIterators(
final List<ColumnFamilyHandle> columnFamilyHandleList,
final ReadOptions readOptions) throws RocksDBException {

final long[] columnFamilyHandles = new long[columnFamilyHandleList.size()];
for (int i = 0; i < columnFamilyHandleList.size(); i++) {
columnFamilyHandles[i] = columnFamilyHandleList.get(i).nativeHandle_;
int size = columnFamilyHandleList.size();
final ColumnFamilyHandle[] cloneCfHandles = new ColumnFamilyHandle[size];
final long[] cfRefs = new long[size];
for (int i = 0; i < size; i++) {
cloneCfHandles[i] = columnFamilyHandleList.get(i).cloneHandle();
cfRefs[i] = cloneCfHandles[i].nativeHandle_;
}

final long[] iteratorRefs = iterators(nativeHandle_, columnFamilyHandles,
final long[] iteratorRefs = iterators(nativeHandle_, cfRefs,
readOptions.nativeHandle_);

final List<RocksIterator> iterators = new ArrayList<>(
columnFamilyHandleList.size());
for (int i=0; i<columnFamilyHandleList.size(); i++){
iterators.add(new RocksIterator(this, iteratorRefs[i]));
final List<RocksIterator> iterators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
iterators.add(new RocksIterator(cloneCfHandles[i], iteratorRefs[i]));
}
return iterators;
}
Expand Down
15 changes: 14 additions & 1 deletion java/src/main/java/org/rocksdb/RocksIterator.java
Expand Up @@ -19,10 +19,23 @@
* @see org.rocksdb.RocksObject
*/
public class RocksIterator extends AbstractRocksIterator<RocksDB> {
protected RocksIterator(RocksDB rocksDB, long nativeHandle) {

protected RocksIterator(final RocksDB rocksDB, final long nativeHandle) {
super(rocksDB, nativeHandle);
}

protected RocksIterator(final ColumnFamilyHandle cfHandle, final long nativeHandle) {
super(cfHandle, cfHandle.rocksDB_, nativeHandle);
cfHandle.iter_ = this;
}

protected RocksIterator(final RocksIterator base, final long nativeHandle) {
super(base.owner_, base.parent_, nativeHandle);
if (base.owner_ != null) {
((ColumnFamilyHandle) base.owner_).iter_ = this;
}
}

/**
* <p>Return the key for the current entry. The underlying storage for
* the returned slice is valid only until the next modification of
Expand Down
5 changes: 2 additions & 3 deletions java/src/main/java/org/rocksdb/WriteBatchWithIndex.java
Expand Up @@ -129,9 +129,8 @@ public WBWIRocksIterator newIterator() {
public RocksIterator newIteratorWithBase(
final ColumnFamilyHandle columnFamilyHandle,
final RocksIterator baseIterator) {
RocksIterator iterator = new RocksIterator(baseIterator.parent_,
iteratorWithBase(
nativeHandle_, columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
RocksIterator iterator = new RocksIterator(baseIterator, iteratorWithBase(
nativeHandle_, columnFamilyHandle.nativeHandle_, baseIterator.nativeHandle_));
// when the iterator is deleted it will also delete the baseIterator
baseIterator.disOwnNativeHandle();
return iterator;
Expand Down
62 changes: 60 additions & 2 deletions java/src/test/java/org/rocksdb/RocksIteratorTest.java
Expand Up @@ -10,6 +10,7 @@
import org.junit.rules.TemporaryFolder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

public class RocksIteratorTest {

Expand All @@ -23,8 +24,8 @@ public class RocksIteratorTest {
@Test
public void rocksIterator() throws RocksDBException {
try (final Options options = new Options()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options,
dbFolder.getRoot().getAbsolutePath())) {
db.put("key1".getBytes(), "value1".getBytes());
Expand Down Expand Up @@ -97,4 +98,61 @@ public void rocksIterator() throws RocksDBException {
}
}
}

@Test
public void rocksIteratorReleaseAfterCfClose() throws RocksDBException {
try (final Options options = new Options()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true);
final RocksDB db = RocksDB.open(options,
this.dbFolder.getRoot().getAbsolutePath())) {
db.put("key".getBytes(), "value".getBytes());

// Release iterator after default CF close
try (final RocksIterator iterator = db.newIterator()) {
// In fact, calling close() on default CF has no effect
db.getDefaultColumnFamily().close();

iterator.seekToFirst();
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key".getBytes());
assertThat(iterator.value()).isEqualTo("value".getBytes());
}

// Release iterator after custom CF close
ColumnFamilyDescriptor cfd1 = new ColumnFamilyDescriptor("cf1".getBytes());
ColumnFamilyHandle cfHandle1 = db.createColumnFamily(cfd1);
db.put(cfHandle1, "key1".getBytes(), "value1".getBytes());

try (final RocksIterator iterator = db.newIterator(cfHandle1)) {
cfHandle1.close();

iterator.seekToFirst();
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key1".getBytes());
assertThat(iterator.value()).isEqualTo("value1".getBytes());
}

// Release iterator after custom CF drop & close
ColumnFamilyDescriptor cfd2 = new ColumnFamilyDescriptor("cf2".getBytes());
ColumnFamilyHandle cfHandle2 = db.createColumnFamily(cfd2);
db.put(cfHandle2, "key2".getBytes(), "value2".getBytes());

try (final RocksIterator iterator = db.newIterator(cfHandle2)) {
db.dropColumnFamily(cfHandle2);
cfHandle2.close();

iterator.seekToFirst();
assertThat(iterator.isValid()).isTrue();
assertThat(iterator.key()).isEqualTo("key2".getBytes());
assertThat(iterator.value()).isEqualTo("value2".getBytes());
}

try (RocksIterator iter = db.newIterator(cfHandle2)){
fail();
} catch (AssertionError e) {
// assert(isOwningHandle())
}
}
}
}
Expand Up @@ -31,6 +31,9 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
comparator_(comparator) {}
uint32_t GetID() const override { return id_; }
const Comparator* GetComparator() const override { return comparator_; }
ColumnFamilyHandle* CloneHandle() const override {
return new ColumnFamilyHandleImplDummy(id_, comparator_);
}

private:
uint32_t id_;
Expand Down