Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Transaction::GetForUpdate with do_validate #4680

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Expand Up @@ -3,6 +3,7 @@
### New Features

### Public API Change
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.

### Bug Fixes
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.
Expand Down
44 changes: 29 additions & 15 deletions include/rocksdb/utilities/transaction.h
Expand Up @@ -208,8 +208,10 @@ class Transaction {
// Read this key and ensure that this transaction will only
// be able to be committed if this key is not written outside this
// transaction after it has first been read (or after the snapshot if a
// snapshot is set in this transaction). The transaction behavior is the
// same regardless of whether the key exists or not.
// snapshot is set in this transaction and do_validate is true). If
// do_validate is false, ReadOptions::snapshot is expected to be nullptr so
// that GetForUpdate returns the latest committed value. The transaction
// behavior is the same regardless of whether the key exists or not.
//
// Note: Currently, this function will return Status::MergeInProgress
// if the most recent write to the queried key in this batch is a Merge.
Expand All @@ -234,27 +236,31 @@ class Transaction {
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool exclusive = true) = 0;
bool exclusive = true,
bool do_validate = true) = 0;

// An overload of the above method that receives a PinnableSlice
// For backward compatibility a default implementation is provided
virtual Status GetForUpdate(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val,
bool exclusive = true) {
bool exclusive = true,
const bool do_validate = true) {
if (pinnable_val == nullptr) {
std::string* null_str = nullptr;
return GetForUpdate(options, column_family, key, null_str, exclusive);
return GetForUpdate(options, column_family, key, null_str, exclusive,
do_validate);
} else {
auto s = GetForUpdate(options, column_family, key,
pinnable_val->GetSelf(), exclusive);
pinnable_val->GetSelf(), exclusive, do_validate);
pinnable_val->PinSelf();
return s;
}
}

virtual Status GetForUpdate(const ReadOptions& options, const Slice& key,
std::string* value, bool exclusive = true) = 0;
std::string* value, bool exclusive = true,
const bool do_validate = true) = 0;

virtual std::vector<Status> MultiGetForUpdate(
const ReadOptions& options,
Expand Down Expand Up @@ -287,6 +293,9 @@ class Transaction {
// functions in WriteBatch, but will also do conflict checking on the
// keys being written.
//
// assume_tracked=false expects the key be already tracked. If valid then it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant assume_tracked=true here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. will fix it.

yoshinorim marked this conversation as resolved.
Show resolved Hide resolved
// skips ValidateSnapshot. Returns error otherwise.
//
// If this Transaction was created on an OptimisticTransactionDB, these
// functions should always return Status::OK().
//
Expand All @@ -299,28 +308,33 @@ class Transaction {
// (See max_write_buffer_number_to_maintain)
// or other errors on unexpected failures.
virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
const Slice& value, const bool assume_tracked = false) = 0;
virtual Status Put(const Slice& key, const Slice& value) = 0;
virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) = 0;
const SliceParts& value,
const bool assume_tracked = false) = 0;
virtual Status Put(const SliceParts& key, const SliceParts& value) = 0;

virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0;
const Slice& value,
const bool assume_tracked = false) = 0;
virtual Status Merge(const Slice& key, const Slice& value) = 0;

virtual Status Delete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
const bool assume_tracked = false) = 0;
virtual Status Delete(const Slice& key) = 0;
virtual Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
const SliceParts& key,
const bool assume_tracked = false) = 0;
virtual Status Delete(const SliceParts& key) = 0;

virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
const Slice& key,
const bool assume_tracked = false) = 0;
virtual Status SingleDelete(const Slice& key) = 0;
virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) = 0;
const SliceParts& key,
const bool assume_tracked = false) = 0;
virtual Status SingleDelete(const SliceParts& key) = 0;

// PutUntracked() will write a Put to the batch of operations to be committed
Expand Down
104 changes: 57 additions & 47 deletions java/rocksjni/transaction.cc
Expand Up @@ -418,36 +418,38 @@ jobjectArray Java_org_rocksdb_Transaction_multiGet__JJ_3_3B(
/*
* Class: org_rocksdb_Transaction
* Method: getForUpdate
* Signature: (JJ[BIJZ)[B
* Signature: (JJ[BIJZZ)[B
*/
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZ(
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIJZZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_part_len, jlong jcolumn_family_handle,
jboolean jexclusive) {
jboolean jexclusive, jboolean jdo_validate) {
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
FnGet fn_get_for_update = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
const rocksdb::ReadOptions&, rocksdb::ColumnFamilyHandle*,
const rocksdb::Slice&, std::string*, bool)>(
const rocksdb::Slice&, std::string*, bool, bool)>(
&rocksdb::Transaction::GetForUpdate, txn, _1, column_family_handle, _2,
_3, jexclusive);
_3, jexclusive, jdo_validate);
return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey,
jkey_part_len);
}

/*
* Class: org_rocksdb_Transaction
* Method: getForUpdate
* Signature: (JJ[BIZ)[B
* Signature: (JJ[BIZZ)[B
*/
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZ(
jbyteArray Java_org_rocksdb_Transaction_getForUpdate__JJ_3BIZZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jlong jread_options_handle,
jbyteArray jkey, jint jkey_part_len, jboolean jexclusive) {
jbyteArray jkey, jint jkey_part_len, jboolean jexclusive,
jboolean jdo_validate) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
FnGet fn_get_for_update = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
const rocksdb::ReadOptions&, const rocksdb::Slice&, std::string*, bool)>(
&rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive);
const rocksdb::ReadOptions&, const rocksdb::Slice&, std::string*, bool,
bool)>(&rocksdb::Transaction::GetForUpdate, txn, _1, _2, _3, jexclusive,
jdo_validate);
return txn_get_helper(env, fn_get_for_update, jread_options_handle, jkey,
jkey_part_len);
}
Expand Down Expand Up @@ -568,19 +570,20 @@ void txn_write_kv_helper(JNIEnv* env, const FnWriteKV& fn_write_kv,
/*
* Class: org_rocksdb_Transaction
* Method: put
* Signature: (J[BI[BIJ)V
* Signature: (J[BI[BIJZ)V
*/
void Java_org_rocksdb_Transaction_put__J_3BI_3BIJ(
void Java_org_rocksdb_Transaction_put__J_3BI_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jbyteArray jval, jint jval_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKV fn_put = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&,
const rocksdb::Slice&)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2);
const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_helper(env, fn_put, jkey, jkey_part_len, jval, jval_len);
}

Expand Down Expand Up @@ -706,20 +709,21 @@ void txn_write_kv_parts_helper(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: put
* Signature: (J[[BI[[BIJ)V
* Signature: (J[[BI[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJ(
void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jobjectArray jvalue_parts, jint jvalue_parts_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKVParts fn_put_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&,
const rocksdb::SliceParts&)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2);
const rocksdb::SliceParts&, bool)>(&rocksdb::Transaction::Put, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_parts_helper(env, fn_put_parts, jkey_parts, jkey_parts_len,
jvalue_parts, jvalue_parts_len);
}
Expand All @@ -744,19 +748,20 @@ void Java_org_rocksdb_Transaction_put__J_3_3BI_3_3BI(
/*
* Class: org_rocksdb_Transaction
* Method: merge
* Signature: (J[BI[BIJ)V
* Signature: (J[BI[BIJZ)V
*/
void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJ(
void Java_org_rocksdb_Transaction_merge__J_3BI_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jbyteArray jval, jint jval_len,
jlong jcolumn_family_handle) {
jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKV fn_merge = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&,
const rocksdb::Slice&)>(&rocksdb::Transaction::Merge, txn,
column_family_handle, _1, _2);
const rocksdb::Slice&, bool)>(&rocksdb::Transaction::Merge, txn,
column_family_handle, _1, _2,
jassume_tracked);
txn_write_kv_helper(env, fn_merge, jkey, jkey_part_len, jval, jval_len);
}

Expand Down Expand Up @@ -803,18 +808,18 @@ void txn_write_k_helper(JNIEnv* env, const FnWriteK& fn_write_k,
/*
* Class: org_rocksdb_Transaction
* Method: delete
* Signature: (J[BIJ)V
* Signature: (J[BIJZ)V
*/
void Java_org_rocksdb_Transaction_delete__J_3BIJ(JNIEnv* env, jobject /*jobj*/,
jlong jhandle, jbyteArray jkey,
jint jkey_part_len,
jlong jcolumn_family_handle) {
void Java_org_rocksdb_Transaction_delete__J_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteK fn_delete = std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_helper(env, fn_delete, jkey, jkey_part_len);
}

Expand Down Expand Up @@ -892,18 +897,20 @@ void txn_write_k_parts_helper(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: delete
* Signature: (J[[BIJ)V
* Signature: (J[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_delete__J_3_3BIJ(
void Java_org_rocksdb_Transaction_delete__J_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jlong jcolumn_family_handle) {
jint jkey_parts_len, jlong jcolumn_family_handle,
jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKParts fn_delete_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>(
&rocksdb::Transaction::Delete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_parts_helper(env, fn_delete_parts, jkey_parts, jkey_parts_len);
}

Expand All @@ -926,18 +933,19 @@ void Java_org_rocksdb_Transaction_delete__J_3_3BI(JNIEnv* env, jobject /*jobj*/,
/*
* Class: org_rocksdb_Transaction
* Method: singleDelete
* Signature: (J[BIJ)V
* Signature: (J[BIJZ)V
*/
void Java_org_rocksdb_Transaction_singleDelete__J_3BIJ(
void Java_org_rocksdb_Transaction_singleDelete__J_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jbyteArray jkey,
jint jkey_part_len, jlong jcolumn_family_handle) {
jint jkey_part_len, jlong jcolumn_family_handle, jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteK fn_single_delete =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::Slice&, bool)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_helper(env, fn_single_delete, jkey, jkey_part_len);
}

Expand All @@ -961,18 +969,20 @@ void Java_org_rocksdb_Transaction_singleDelete__J_3BI(JNIEnv* env,
/*
* Class: org_rocksdb_Transaction
* Method: singleDelete
* Signature: (J[[BIJ)V
* Signature: (J[[BIJZ)V
*/
void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJ(
void Java_org_rocksdb_Transaction_singleDelete__J_3_3BIJZ(
JNIEnv* env, jobject /*jobj*/, jlong jhandle, jobjectArray jkey_parts,
jint jkey_parts_len, jlong jcolumn_family_handle) {
jint jkey_parts_len, jlong jcolumn_family_handle,
jboolean jassume_tracked) {
auto* txn = reinterpret_cast<rocksdb::Transaction*>(jhandle);
auto* column_family_handle =
reinterpret_cast<rocksdb::ColumnFamilyHandle*>(jcolumn_family_handle);
FnWriteKParts fn_single_delete_parts =
std::bind<rocksdb::Status (rocksdb::Transaction::*)(
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1);
rocksdb::ColumnFamilyHandle*, const rocksdb::SliceParts&, bool)>(
&rocksdb::Transaction::SingleDelete, txn, column_family_handle, _1,
jassume_tracked);
txn_write_k_parts_helper(env, fn_single_delete_parts, jkey_parts,
jkey_parts_len);
}
Expand Down