Skip to content

Commit

Permalink
Add general user write callback
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed May 24, 2024
1 parent cbce77d commit 477b6b5
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 107 deletions.
15 changes: 7 additions & 8 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
#include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
Expand Down Expand Up @@ -234,7 +234,7 @@ class DBImpl : public DB {

using DB::WriteWithCallback;
Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
WalWriteCallback* wal_write_cb) override;
UserWriteCallback* user_write_cb) override;

using DB::Get;
Status Get(const ReadOptions& _read_options,
Expand Down Expand Up @@ -691,12 +691,10 @@ class DBImpl : public DB {

// Similar to Write() but will call the callback once on the single write
// thread to determine whether it is safe to perform the write.
// And will call the `wal_write_cb` after wal write finishes in pipelined
// write mode.
virtual Status WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback,
WalWriteCallback* wal_write_cb = nullptr);
UserWriteCallback* user_write_cb = nullptr);

// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current
Expand Down Expand Up @@ -1505,7 +1503,7 @@ class DBImpl : public DB {
// batch that does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
WalWriteCallback* wal_write_cb = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
Expand All @@ -1514,7 +1512,7 @@ class DBImpl : public DB {

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
WalWriteCallback* wal_write_cb = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
Expand All @@ -1541,7 +1539,8 @@ class DBImpl : public DB {
// marks start of a new sub-batch.
Status WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& options,
WriteBatch* updates, WriteCallback* callback, uint64_t* log_used,
WriteBatch* updates, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* log_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable);
Expand Down
73 changes: 35 additions & 38 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,41 +155,36 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,
/*wal_write_cb=*/nullptr,
/*user_write_cb=*/nullptr,
/*log_used=*/nullptr);
}
return s;
}

Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
WalWriteCallback* wal_write_cb) {
UserWriteCallback* user_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, callback, wal_write_cb);
s = WriteImpl(write_options, my_batch, callback, user_write_cb);
}
return s;
}

Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
WalWriteCallback* wal_write_cb) {
if (!immutable_db_options_.enable_pipelined_write &&
wal_write_cb != nullptr) {
return Status::InvalidArgument(
"Only the pipelined write mode takes a wal write callback.");
}
UserWriteCallback* user_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, wal_write_cb);
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, user_write_cb);
}
return s;
}
Expand All @@ -199,7 +194,7 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
WalWriteCallback* wal_write_cb, uint64_t* log_used,
UserWriteCallback* user_write_cb, uint64_t* log_used,
uint64_t log_ref, bool disable_memtable,
uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback,
Expand Down Expand Up @@ -304,10 +299,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
// Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
// they don't consume sequence.
return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch,
callback, log_used, log_ref, seq_used, batch_cnt,
pre_release_callback, assign_order,
kDontPublishLastSeq, disable_memtable);
return WriteImplWALOnly(
&nonmem_write_thread_, write_options, my_batch, callback, user_write_cb,
log_used, log_ref, seq_used, batch_cnt, pre_release_callback,
assign_order, kDontPublishLastSeq, disable_memtable);
}

if (immutable_db_options_.unordered_write) {
Expand All @@ -319,9 +314,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// Use a write thread to i) optimize for WAL write, ii) publish last
// sequence in in increasing order, iii) call pre_release_callback serially
Status status = WriteImplWALOnly(
&write_thread_, write_options, my_batch, callback, log_used, log_ref,
&seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder,
kDoPublishLastSeq, disable_memtable);
&write_thread_, write_options, my_batch, callback, user_write_cb,
log_used, log_ref, &seq, sub_batch_cnt, pre_release_callback,
kDoAssignOrder, kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) {
return status;
Expand All @@ -338,15 +333,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, wal_write_cb,
return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb,
log_used, log_ref, disable_memtable, seq_used);
}

PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback,
/*wal_write_cb=*/nullptr, log_ref, disable_memtable,
batch_cnt, pre_release_callback,
post_memtable_callback);
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, batch_cnt,
pre_release_callback, post_memtable_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread_.JoinBatchGroup(&w);
Expand Down Expand Up @@ -700,15 +694,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,

Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
WalWriteCallback* wal_write_cb,
UserWriteCallback* user_write_cb,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteContext write_context;

WriteThread::Writer w(write_options, my_batch, callback, wal_write_cb,
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, /*_batch_cnt=*/0,
/*_pre_release_callback=*/nullptr);
write_thread_.JoinBatchGroup(&w);
Expand Down Expand Up @@ -802,14 +796,6 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteStatusCheck(w.status);
}

if (w.status.ok() && !write_options.disableWAL) {
for (auto* writer : wal_write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
}

VersionEdit synced_wals;
if (log_context.need_log_sync) {
InstrumentedMutexLock l(&log_write_mutex_);
Expand Down Expand Up @@ -897,7 +883,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

WriteThread::Writer w(write_options, my_batch, callback,
/*wal_write_cb=*/nullptr, log_ref,
/*user_write_cb=*/nullptr, log_ref,
false /*disable_memtable*/);

if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
Expand Down Expand Up @@ -947,14 +933,15 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
// applicable in a two-queue setting.
Status DBImpl::WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used,
WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* log_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback,
/*wal_write_cb=*/nullptr, log_ref, disable_memtable,
sub_batch_cnt, pre_release_callback);
WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, sub_batch_cnt,
pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);

write_thread->JoinBatchGroup(&w);
Expand Down Expand Up @@ -1521,6 +1508,11 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
}
return io_s;
}
Expand Down Expand Up @@ -1585,6 +1577,11 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
}
return io_s;
}
Expand Down
48 changes: 27 additions & 21 deletions db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "db/db_impl/db_impl.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/wal_write_callback.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/write_batch.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
Expand Down Expand Up @@ -83,17 +83,26 @@ class MockWriteCallback : public WriteCallback {
bool AllowWriteBatching() override { return allow_batching_; }
};

class MockWalWriteCallback : public WalWriteCallback {
class MockUserWriteCallback : public UserWriteCallback {
public:
std::atomic<bool> write_enqueued_{false};
std::atomic<bool> wal_write_done_{false};

MockWalWriteCallback() = default;
MockUserWriteCallback() = default;

MockWalWriteCallback(const MockWalWriteCallback& other) {
MockUserWriteCallback(const MockUserWriteCallback& other) {
write_enqueued_.store(other.write_enqueued_.load());
wal_write_done_.store(other.wal_write_done_.load());
}

void OnWalWriteFinish() { wal_write_done_.store(true); }
void OnWriteEnqueued() override { write_enqueued_.store(true); }

void OnWalWriteFinish() override { wal_write_done_.store(true); }

void Reset() {
write_enqueued_.store(false);
wal_write_done_.store(false);
}
};

#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
Expand Down Expand Up @@ -131,11 +140,11 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
kvs_.clear();
write_batch_.Clear();
callback_.was_called_.store(false);
wal_write_cb_.wal_write_done_.store(false);
user_write_cb_.Reset();
}

MockWriteCallback callback_;
MockWalWriteCallback wal_write_cb_;
MockUserWriteCallback user_write_cb_;
WriteBatch write_batch_;
std::vector<std::pair<string, string>> kvs_;
};
Expand Down Expand Up @@ -341,24 +350,25 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
ASSERT_OK(WriteBatchInternal::InsertNoop(&write_op.write_batch_));
const size_t ONE_BATCH = 1;
s = db_impl->WriteImpl(woptions, &write_op.write_batch_,
&write_op.callback_, &write_op.wal_write_cb_,
&write_op.callback_, &write_op.user_write_cb_,
nullptr, 0, false, nullptr, ONE_BATCH,
two_queues_ ? &publish_seq_callback : nullptr);
} else {
s = db_impl->WriteWithCallback(woptions, &write_op.write_batch_,
&write_op.callback_,
&write_op.wal_write_cb_);
&write_op.user_write_cb_);
}

ASSERT_TRUE(write_op.user_write_cb_.write_enqueued_.load());
if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy());
ASSERT_FALSE(write_op.wal_write_cb_.wal_write_done_.load());
ASSERT_FALSE(write_op.user_write_cb_.wal_write_done_.load());
} else {
ASSERT_OK(s);
if (enable_pipelined_write_ && enable_WAL_) {
ASSERT_TRUE(write_op.wal_write_cb_.wal_write_done_.load());
if (enable_WAL_) {
ASSERT_TRUE(write_op.user_write_cb_.wal_write_done_.load());
} else {
ASSERT_FALSE(write_op.wal_write_cb_.wal_write_done_.load());
ASSERT_FALSE(write_op.user_write_cb_.wal_write_done_.load());
}
}
};
Expand Down Expand Up @@ -461,19 +471,15 @@ TEST_F(WriteCallbackTest, WriteCallBackTest) {
ASSERT_OK(s);
ASSERT_EQ("value.a2", value);

MockWalWriteCallback wal_write_cb;
MockUserWriteCallback user_write_cb;
WriteBatch wb4;
ASSERT_OK(wb4.Put("a", "value.a4"));

ASSERT_NOK(db->WriteWithCallback(write_options, &wb4, &wal_write_cb));

delete db;
options.enable_pipelined_write = true;
ASSERT_OK(DB::Open(options, dbname, &db));
ASSERT_OK(db->WriteWithCallback(write_options, &wb4, &wal_write_cb));
ASSERT_OK(db->WriteWithCallback(write_options, &wb4, &user_write_cb));
ASSERT_OK(db->Get(read_options, "a", &value));
ASSERT_EQ(value, "value.a4");
ASSERT_TRUE(wal_write_cb.wal_write_done_.load());
ASSERT_TRUE(user_write_cb.write_enqueued_.load());
ASSERT_TRUE(user_write_cb.wal_write_done_.load());

delete db;
ASSERT_OK(DestroyDB(dbname, options));
Expand Down
2 changes: 2 additions & 0 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ void WriteThread::JoinBatchGroup(Writer* w) {

bool linked_as_leader = LinkOne(w, &newest_writer_);

w->CheckWriteEnqueuedCallback();

if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
Expand Down

0 comments on commit 477b6b5

Please sign in to comment.