Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract merge operator in Java + Thread JNI attachment automatic management + loader management for static variables #3432

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ make_config.mk
CMakeCache.txt
CMakeFiles/
build/

cmake-build-debug/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed? Looks like it got included anyway.

ldb
manifest_dump
sst_dump
Expand Down
2 changes: 1 addition & 1 deletion java/rocksjni/abstract_not_associative_merge_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ namespace rocksdb {
JNIAbstractNotAssociativeMergeOperator::merge = env->GetMethodID(cls, "partialMerge", "([B[B[B)[B");

JNIAbstractNotAssociativeMergeOperator::multi_merge = env->GetMethodID(cls, "partialMultiMerge",
"([B[B[[B)[B");
"([B[[B)[B");

JNIAbstractNotAssociativeMergeOperator::should_merge = env->GetMethodID(cls, "shouldMerge", "([[B)Z");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public AbstractNotAssociativeMergeOperator(boolean allowSingleOperand,boolean al
protected final native void disposeInternal(final long handle);

abstract public byte[] fullMerge(byte[] key, byte[] oldvalue,byte[][] operands) throws RocksDBException;
abstract public byte[] partialMultiMerge(byte[] key, byte[] oldvalue,byte[][] operands)throws RocksDBException;
abstract public byte[] partialMultiMerge(byte[] key, byte[][] operands)throws RocksDBException;

abstract public byte[] partialMerge(byte[] key, byte[] left,byte[] right)throws RocksDBException;

Expand Down
50 changes: 29 additions & 21 deletions java/src/test/java/org/rocksdb/RocksDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public byte[] merge(byte[] key, byte[] oldvalue, byte[] newvalue) {
sb.append(',');
//if (1==1)throw new IndexOutOfBoundsException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debug / commented lines in this file

sb.append(new String(newvalue));
System.out.println("----execute merge---");
System.out.println("----execute merge---"+new String(newvalue));
return sb.toString().getBytes();
}
}
Expand All @@ -233,7 +233,7 @@ private class M1 extends AbstractNotAssociativeMergeOperator{


public M1() throws RocksDBException {
super(true,false,false);
super(true,true,false);
}

private byte[] collect(byte[][] operands){
Expand All @@ -250,32 +250,36 @@ private byte[] collect(byte[][] operands){

@Override
public byte[] fullMerge(byte[] key, byte[] oldvalue, byte[][] operands) throws RocksDBException {
System.out.println("execute fullMerge"+oldvalue);
if (oldvalue==null) return collect(operands);

return (new String(oldvalue)+','+new String(collect(operands))).getBytes();
}

@Override
public byte[] partialMultiMerge(byte[] key, byte[] oldvalue, byte[][] operands) {
if (oldvalue==null) return collect(operands);
public byte[] partialMultiMerge(byte[] key, byte[][] operands) {
System.out.println("execute partialMultiMerge");

return (new String(oldvalue)+','+new String(collect(operands))).getBytes();

return (new String(collect(operands))).getBytes();

}

@Override
public byte[] partialMerge(byte[] key, byte[] left, byte[] right) {

System.out.println("execute partialMerge");
StringBuffer sb=new StringBuffer(new String(left));
sb.append(',');
sb.append(new String(right));
System.out.println("----execute merge---");

return sb.toString().getBytes();
}

@Override
public boolean shouldMerge(byte[][] operands) {
return false;
System.out.println("execute shouldMerge");

return true;
}
}

Expand All @@ -298,19 +302,17 @@ public void run() {


db.put("key1".getBytes(), "value".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo(
"value".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo("value".getBytes());
// merge key1 with another value portion
db.merge("key1".getBytes(), "value2".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2".getBytes());
System.out.println(new String(db.get("key1".getBytes())));
// assertThat(db.get("key1".getBytes())).isEqualTo("value,value2".getBytes());
// merge key1 with another value portion
db.merge(wOpt, "key1".getBytes(), "value3".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2,value3".getBytes());
// assertThat(db.get("key1".getBytes())).isEqualTo("value,value2,value3".getBytes());
db.merge(wOpt, "key1".getBytes(), "value4".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2,value3,value4".getBytes());
System.out.println(new String(db.get("key1".getBytes())));
//assertThat(db.get("key1".getBytes())).isEqualTo("value,value2,value3,value4".getBytes());
// merge on non existent key shall insert the value
db.merge(wOpt, "key2".getBytes(), "xxxx".getBytes());
assertThat(db.get("key2".getBytes())).isEqualTo(
Expand All @@ -320,7 +322,7 @@ public void run() {
}catch (Exception e){
throw new RuntimeException(e);
}finally {
org.rocksdb.util.Environment.detachCurrentThreadIfPossible();

}

}
Expand All @@ -330,7 +332,7 @@ public void run() {

t.setDaemon(false);
t.start();
org.rocksdb.util.Environment.detachCurrentThreadIfPossible();

t.join();

}
Expand All @@ -353,18 +355,24 @@ public void run() {
) {


db.put("key1".getBytes(), "value".getBytes());
db.merge("key1".getBytes(), "value".getBytes());
assertThat(db.get("key1".getBytes())).isEqualTo(
"value".getBytes());
// merge key1 with another value portion
System.out.println("->merge value2");
db.merge("key1".getBytes(), "value2".getBytes());
System.out.println("->get");
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2".getBytes());
System.out.println("->merge value3");
// merge key1 with another value portion
db.merge(wOpt, "key1".getBytes(), "value3".getBytes());
System.out.println("->get");
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2,value3".getBytes());
System.out.println("->merge value4");
db.merge(wOpt, "key1".getBytes(), "value4".getBytes());
System.out.println("->get");
assertThat(db.get("key1".getBytes())).isEqualTo(
"value,value2,value3,value4".getBytes());
// merge on non existent key shall insert the value
Expand All @@ -376,7 +384,7 @@ public void run() {
}catch (Exception e){
throw new RuntimeException(e);
}finally {
org.rocksdb.util.Environment.detachCurrentThreadIfPossible();

}

}
Expand All @@ -386,7 +394,7 @@ public void run() {

t.setDaemon(false);
t.start();
org.rocksdb.util.Environment.detachCurrentThreadIfPossible();

t.join();

}
Expand Down