Skip to content

Commit

Permalink
add public WriteWithCallback API and rename PostWalWriteCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed May 24, 2024
1 parent b4e2291 commit cbce77d
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 64 deletions.
19 changes: 12 additions & 7 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
Expand Down Expand Up @@ -231,6 +232,10 @@ class DBImpl : public DB {
using DB::Write;
Status Write(const WriteOptions& options, WriteBatch* updates) override;

using DB::WriteWithCallback;
Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
WalWriteCallback* wal_write_cb) override;

using DB::Get;
Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -686,12 +691,12 @@ class DBImpl : public DB {

// Similar to Write() but will call the callback once on the single write
// thread to determine whether it is safe to perform the write.
// And will call the `post_wal_write_cb` after wal write finishes in pipelined
// And will call the `wal_write_cb` after wal write finishes in pipelined
// write mode.
virtual Status WriteWithCallback(
const WriteOptions& write_options, WriteBatch* my_batch,
WriteCallback* callback,
PostWalWriteCallback* post_wal_write_cb = nullptr);
virtual Status WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback,
WalWriteCallback* wal_write_cb = nullptr);

// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current
Expand Down Expand Up @@ -1500,7 +1505,7 @@ class DBImpl : public DB {
// batch that does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
PostWalWriteCallback* post_wal_write_cb = nullptr,
WalWriteCallback* wal_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
Expand All @@ -1509,7 +1514,7 @@ class DBImpl : public DB {

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
PostWalWriteCallback* post_wal_write_cb = nullptr,
WalWriteCallback* wal_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
Expand Down
51 changes: 34 additions & 17 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,41 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,
/*post_wal_write_cb=*/nullptr,
/*wal_write_cb=*/nullptr,
/*log_used=*/nullptr);
}
return s;
}

Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
PostWalWriteCallback* post_wal_write_cb) {
WalWriteCallback* wal_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, callback, post_wal_write_cb);
s = WriteImpl(write_options, my_batch, callback, wal_write_cb);
}
return s;
}

Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WalWriteCallback* wal_write_cb) {
if (!immutable_db_options_.enable_pipelined_write &&
wal_write_cb != nullptr) {
return Status::InvalidArgument(
"Only the pipelined write mode takes a wal write callback.");
}
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, wal_write_cb);
}
return s;
}
Expand All @@ -180,10 +199,9 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
PostWalWriteCallback* post_wal_write_cb,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used,
size_t batch_cnt,
WalWriteCallback* wal_write_cb, uint64_t* log_used,
uint64_t log_ref, bool disable_memtable,
uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback,
PostMemTableCallback* post_memtable_callback) {
assert(!seq_per_batch_ || batch_cnt != 0);
Expand Down Expand Up @@ -320,15 +338,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback,
post_wal_write_cb, log_used, log_ref,
disable_memtable, seq_used);
return PipelinedWriteImpl(write_options, my_batch, callback, wal_write_cb,
log_used, log_ref, disable_memtable, seq_used);
}

PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback,
/*post_wal_write_cb=*/nullptr, log_ref,
disable_memtable, batch_cnt, pre_release_callback,
/*wal_write_cb=*/nullptr, log_ref, disable_memtable,
batch_cnt, pre_release_callback,
post_memtable_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

Expand Down Expand Up @@ -683,15 +700,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,

Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
PostWalWriteCallback* post_wal_write_cb,
WalWriteCallback* wal_write_cb,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteContext write_context;

WriteThread::Writer w(write_options, my_batch, callback, post_wal_write_cb,
WriteThread::Writer w(write_options, my_batch, callback, wal_write_cb,
log_ref, disable_memtable, /*_batch_cnt=*/0,
/*_pre_release_callback=*/nullptr);
write_thread_.JoinBatchGroup(&w);
Expand Down Expand Up @@ -880,7 +897,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteThread::Writer w(write_options, my_batch, callback,
/*post_wal_write_cb=*/nullptr, log_ref,
/*wal_write_cb=*/nullptr, log_ref,
false /*disable_memtable*/);

if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
Expand Down Expand Up @@ -936,8 +953,8 @@ Status DBImpl::WriteImplWALOnly(
const PublishLastSeq publish_last_seq, const bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback,
/*post_wal_write_cb=*/nullptr, log_ref,
disable_memtable, sub_batch_cnt, pre_release_callback);
/*wal_write_cb=*/nullptr, log_ref, disable_memtable,
sub_batch_cnt, pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread->JoinBatchGroup(&w);
Expand Down
10 changes: 0 additions & 10 deletions db/write_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,4 @@ class WriteCallback {
virtual bool AllowWriteBatching() = 0;
};

class PostWalWriteCallback {
public:
virtual ~PostWalWriteCallback() {}

// Will be called after wal write finishes.
// Note: this callback is only invoked in pipelined write mode after a write
// group's WAL write is finished.
virtual void OnWalWriteFinish() = 0;
};

} // namespace ROCKSDB_NAMESPACE
49 changes: 31 additions & 18 deletions db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include "db/write_callback.h"

#include <atomic>
Expand All @@ -15,6 +13,7 @@
#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/write_batch.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
Expand Down Expand Up @@ -84,17 +83,17 @@ class MockWriteCallback : public WriteCallback {
bool AllowWriteBatching() override { return allow_batching_; }
};

class MockPostWalWriteCallback : public PostWalWriteCallback {
class MockWalWriteCallback : public WalWriteCallback {
public:
std::atomic<bool> was_called_{false};
std::atomic<bool> wal_write_done_{false};

MockPostWalWriteCallback() = default;
MockWalWriteCallback() = default;

MockPostWalWriteCallback(const MockPostWalWriteCallback& other) {
was_called_.store(other.was_called_.load());
MockWalWriteCallback(const MockWalWriteCallback& other) {
wal_write_done_.store(other.wal_write_done_.load());
}

void OnWalWriteFinish() { was_called_.store(true); }
void OnWalWriteFinish() { wal_write_done_.store(true); }
};

#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
Expand Down Expand Up @@ -132,11 +131,11 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
kvs_.clear();
write_batch_.Clear();
callback_.was_called_.store(false);
post_wal_write_cb_.was_called_.store(false);
wal_write_cb_.wal_write_done_.store(false);
}

MockWriteCallback callback_;
MockPostWalWriteCallback post_wal_write_cb_;
MockWalWriteCallback wal_write_cb_;
WriteBatch write_batch_;
std::vector<std::pair<string, string>> kvs_;
};
Expand Down Expand Up @@ -341,25 +340,25 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
// seq_per_batch_ requires a natural batch separator or Noop
ASSERT_OK(WriteBatchInternal::InsertNoop(&write_op.write_batch_));
const size_t ONE_BATCH = 1;
s = db_impl->WriteImpl(
woptions, &write_op.write_batch_, &write_op.callback_,
&write_op.post_wal_write_cb_, nullptr, 0, false, nullptr, ONE_BATCH,
two_queues_ ? &publish_seq_callback : nullptr);
s = db_impl->WriteImpl(woptions, &write_op.write_batch_,
&write_op.callback_, &write_op.wal_write_cb_,
nullptr, 0, false, nullptr, ONE_BATCH,
two_queues_ ? &publish_seq_callback : nullptr);
} else {
s = db_impl->WriteWithCallback(woptions, &write_op.write_batch_,
&write_op.callback_,
&write_op.post_wal_write_cb_);
&write_op.wal_write_cb_);
}

if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy());
ASSERT_FALSE(write_op.post_wal_write_cb_.was_called_.load());
ASSERT_FALSE(write_op.wal_write_cb_.wal_write_done_.load());
} else {
ASSERT_OK(s);
if (enable_pipelined_write_ && enable_WAL_) {
ASSERT_TRUE(write_op.post_wal_write_cb_.was_called_.load());
ASSERT_TRUE(write_op.wal_write_cb_.wal_write_done_.load());
} else {
ASSERT_FALSE(write_op.post_wal_write_cb_.was_called_.load());
ASSERT_FALSE(write_op.wal_write_cb_.wal_write_done_.load());
}
}
};
Expand Down Expand Up @@ -462,6 +461,20 @@ TEST_F(WriteCallbackTest, WriteCallBackTest) {
ASSERT_OK(s);
ASSERT_EQ("value.a2", value);

MockWalWriteCallback wal_write_cb;
WriteBatch wb4;
ASSERT_OK(wb4.Put("a", "value.a4"));

ASSERT_NOK(db->WriteWithCallback(write_options, &wb4, &wal_write_cb));

delete db;
options.enable_pipelined_write = true;
ASSERT_OK(DB::Open(options, dbname, &db));
ASSERT_OK(db->WriteWithCallback(write_options, &wb4, &wal_write_cb));
ASSERT_OK(db->Get(read_options, "a", &value));
ASSERT_EQ(value, "value.a4");
ASSERT_TRUE(wal_write_cb.wal_write_done_.load());

delete db;
ASSERT_OK(DestroyDB(dbname, options));
}
Expand Down
13 changes: 7 additions & 6 deletions db/write_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h"

Expand Down Expand Up @@ -127,7 +128,7 @@ class WriteThread {
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
WriteCallback* callback;
PostWalWriteCallback* post_wal_write_cb;
WalWriteCallback* wal_write_cb;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
WriteGroup* write_group;
Expand All @@ -154,7 +155,7 @@ class WriteThread {
log_used(0),
log_ref(0),
callback(nullptr),
post_wal_write_cb(nullptr),
wal_write_cb(nullptr),
made_waitable(false),
state(STATE_INIT),
write_group(nullptr),
Expand All @@ -163,7 +164,7 @@ class WriteThread {
link_newer(nullptr) {}

Writer(const WriteOptions& write_options, WriteBatch* _batch,
WriteCallback* _callback, PostWalWriteCallback* _post_wal_write_cb,
WriteCallback* _callback, WalWriteCallback* _wal_write_cb,
uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0,
PreReleaseCallback* _pre_release_callback = nullptr,
PostMemTableCallback* _post_memtable_callback = nullptr)
Expand All @@ -182,7 +183,7 @@ class WriteThread {
log_used(0),
log_ref(_log_ref),
callback(_callback),
post_wal_write_cb(_post_wal_write_cb),
wal_write_cb(_wal_write_cb),
made_waitable(false),
state(STATE_INIT),
write_group(nullptr),
Expand All @@ -207,8 +208,8 @@ class WriteThread {
}

void CheckPostWalWriteCallback() {
if (post_wal_write_cb != nullptr) {
post_wal_write_cb->OnWalWriteFinish();
if (wal_write_cb != nullptr) {
wal_write_cb->OnWalWriteFinish();
}
}

Expand Down
10 changes: 10 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "rocksdb/version.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/wide_columns.h"

#ifdef _WIN32
Expand Down Expand Up @@ -583,6 +584,15 @@ class DB {
// Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

// Same as DB::Write, and takes a callback argument for wal writes in the
// pipelined write mode.
virtual Status WriteWithCallback(const WriteOptions& /*options*/,
WriteBatch* /*updates*/,
WalWriteCallback* /*wal_write_cb*/) {
return Status::NotSupported(
"WriteWithCallback not implemented for this interface.");
}

// If the column family specified by "column_family" contains an entry for
// "key", return the corresponding value in "*value". If the entry is a plain
// key-value, return the value as-is; if it is a wide-column entity, return
Expand Down
Loading

0 comments on commit cbce77d

Please sign in to comment.