Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract merge operator in Java + Thread JNI attachment automatic management + loader management for static variables #3432

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define EXT4_SUPER_MAGIC 0xEF53
#endif

#include <iostream>
namespace rocksdb {

namespace {
Expand Down Expand Up @@ -127,6 +128,7 @@ class PosixEnv : public Env {
PosixEnv();

virtual ~PosixEnv() {

for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
Expand Down Expand Up @@ -923,6 +925,7 @@ struct StartThreadState {
};

static void* StartThreadWrapper(void* arg) {

StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
Expand All @@ -942,6 +945,7 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
}

void PosixEnv::WaitForJoin() {

for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
}
Expand Down
64 changes: 47 additions & 17 deletions java/rocksjni/abstract_associative_merge_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* */


//#include <iostream>
#include <iostream>
#include <jni.h>
#include <rocksjni/init.h>
#include <assert.h>
Expand All @@ -24,6 +24,9 @@ namespace rocksdb {
namespace JNIAbstractAssociativeMergeOperator {

static jmethodID method;
static jclass rtClass;
static jfieldID rtField;
static jmethodID rtConstructor;


class JNIMergeOperator : public AssociativeMergeOperator {
Expand Down Expand Up @@ -63,22 +66,24 @@ namespace rocksdb {
buf2 = (jbyte *) value.data();
jb2 = env->NewByteArray(s2);
env->SetByteArrayRegion(jb2, 0, s2, buf2);



jbyteArray jresult = (jbyteArray) env->CallObjectMethod(obj, rocksdb::JNIAbstractAssociativeMergeOperator::method, jb0, jb1, jb2);
jobject rtobject = env->NewObject( rtClass, rtConstructor);
jbyteArray jresult = (jbyteArray) env->CallObjectMethod(obj, rocksdb::JNIAbstractAssociativeMergeOperator::method, jb0, jb1, jb2,rtobject);
jthrowable ex = env->ExceptionOccurred();

env->ReleaseByteArrayElements(jb0, buf0, JNI_COMMIT);
// env->DeleteLocalRef(jb0);
if (jb1 != NULL) env->ReleaseByteArrayElements(jb1, buf1, JNI_COMMIT);
env->ReleaseByteArrayElements(jb2, buf2, JNI_COMMIT);

env->DeleteLocalRef(rtobject);

if (ex) {

if (jresult!= nullptr) {
char *result = (char *) env->GetByteArrayElements(jresult, 0);
env->ReleaseByteArrayElements(jresult, (jbyte*)result, JNI_ABORT);
jboolean rtr=env->GetBooleanField(rtobject, rtField);
env->ReleaseByteArrayElements(jresult, (jbyte*)result, rtr?JNI_COMMIT:JNI_ABORT);
}
env->Throw(ex);

Expand All @@ -88,7 +93,8 @@ namespace rocksdb {
char *result = (char *) env->GetByteArrayElements(jresult, 0);
new_value->clear();
new_value->assign(result, len);
env->ReleaseByteArrayElements(jresult, (jbyte*)result, JNI_ABORT);
jboolean rtr=env->GetBooleanField(rtobject, rtField);
env->ReleaseByteArrayElements(jresult, (jbyte*)result, rtr?JNI_COMMIT:JNI_ABORT);

return true;
}
Expand All @@ -102,14 +108,14 @@ namespace rocksdb {

void destroy(JNIEnv *env) {

env->DeleteWeakGlobalRef(obj);
env->DeleteGlobalRef(obj);


}

void init(JNIEnv *e, jobject hook) {

obj = e->NewWeakGlobalRef(hook);
obj = e->NewGlobalRef(hook);

}

Expand All @@ -119,28 +125,52 @@ namespace rocksdb {

};// end of class

static void staticDestroy(JNIEnv *env){
env->DeleteGlobalRef(reinterpret_cast<jobject>(JNIAbstractAssociativeMergeOperator::rtClass));

}


static void staticInit(JNIEnv *env) {
static void staticInit(JNIEnv *env) {

jclass jc = env->FindClass("org/rocksdb/AbstractAssociativeMergeOperator");
if (jc==nullptr) {
jc = env->FindClass("java/lang/Error");
env->ThrowNew(jc, "unable to find AbstractAssociativeMergeOperator");
jclass cls = env->FindClass("org/rocksdb/AbstractAssociativeMergeOperator");
if (cls==nullptr) {
cls = env->FindClass("java/lang/Error");
env->ThrowNew(cls, "unable to find AbstractAssociativeMergeOperator");
}
method = env->GetMethodID(jc, "merge", "([B[B[B)[B");
method = env->GetMethodID(cls, "merge", "([B[B[BLorg/rocksdb/ReturnType;)[B");
if (method == 0) {
jc = env->FindClass("java/lang/Error");
env->ThrowNew(jc, "unable to find method merge");
cls = env->FindClass("java/lang/Error");
env->ThrowNew(cls, "unable to find method merge");
}
jclass a = env->FindClass("Lorg/rocksdb/ReturnType;");
if (a==0){
cls = env->FindClass("java/lang/Error");
env->ThrowNew(cls, "unable to find object org.rocksdb.ReturnType");

} else JNIAbstractAssociativeMergeOperator::rtClass= reinterpret_cast<jclass>(env->NewGlobalRef(a));
rtConstructor = env->GetMethodID( rtClass, "<init>", "()V");
if (rtConstructor==0){
cls = env->FindClass("java/lang/Error");
env->ThrowNew(cls, "unable to find field ReturnType.<init>");

}


JNIAbstractAssociativeMergeOperator::rtField = env->GetFieldID( rtClass, "isArgumentReference", "Z");
if (JNIAbstractAssociativeMergeOperator::rtField==0){
cls = env->FindClass("java/lang/Error");
env->ThrowNew(cls, "unable to find field ReturnType.isArgumentReference");

}
}
}

}
initialize {

rocksdb::setLoader( &rocksdb::JNIAbstractAssociativeMergeOperator::staticInit);

rocksdb::setUnloader( &rocksdb::JNIAbstractAssociativeMergeOperator::staticDestroy);
}


Expand Down