Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(transaction): Use PhasedBarrier for easier synchronization #2455

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 74 additions & 114 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,51 @@ void RecordTxScheduleStats(const Transaction* tx) {
}
}

void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inline) {
DCHECK_EQ(tx->GetUniqueShardCnt(), 1u);
auto* ss = ServerState::tlocal();
if (was_ooo) {
ss->stats.tx_type_cnt[was_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
}

} // namespace

IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}

void Transaction::PhasedBarrier::Start(uint32_t count) {
DCHECK_EQ(DEBUG_Count(), 0u);
count_.store(count, memory_order_release);
}

bool Transaction::PhasedBarrier::Active() const {
return count_.load(memory_order_acquire) > 0;
}

void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) {
// Prevent transaction from being destroyed after count was decreased and Wait() unlocked,
// but before this thread finished notifying.
::boost::intrusive_ptr guard(keep_alive);
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

uint32_t before = count_.fetch_sub(1);
CHECK_GE(before, 1u);
if (before == 1)
ec_.notify();
}

void Transaction::PhasedBarrier::Wait() {
ec_.await([this] { return count_.load(memory_order_acquire) == 0; });
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
}

uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}

/**
* @brief Construct a new Transaction:: Transaction object
*
Expand Down Expand Up @@ -246,26 +285,6 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
}
}

/**
*
* There are 4 options that we consider here:
* a. T spans a single shard and it's not multi.
* unique_shard_id_ is predefined before the schedule() is called.
* In that case only a single thread will be scheduled and it will use shard_data[0] just because
* shard_data.size() = 1. Coordinator thread can access any data because there is a
* schedule barrier between InitByArgs and RunInShard/IsArmedInShard functions.
* b. T spans multiple shards and its not multi
* In that case multiple threads will be scheduled. Similarly they have a schedule barrier,
* and IsArmedInShard can read any variable from shard_data[x].
* c. Trans spans a single shard and it's multi. shard_data has size of ess_.size.
* IsArmedInShard will check shard_data[x].
* d. Trans spans multiple shards and it's multi. Similarly shard_data[x] will be checked.
* unique_shard_cnt_ and unique_shard_id_ are not accessed until shard_data[x] is armed, hence
* we have a barrier between coordinator and engine-threads. Therefore there should not be
* data races.
*
**/
romange marked this conversation as resolved.
Show resolved Hide resolved

void Transaction::InitByKeys(const KeyIndex& key_index) {
if (key_index.start == full_args_.size()) { // eval with 0 keys.
CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name();
Expand Down Expand Up @@ -499,9 +518,9 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA

// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_ptr_) << DebugId();
DCHECK(run_barrier_.Active());
DCHECK_GT(txid_, 0u);
CHECK(cb_ptr_) << DebugId();

// Unlike with regular transactions we do not acquire locks upon scheduling
// because Scheduling is done before multi-exec batch is executed. Therefore we
Expand All @@ -511,7 +530,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
auto& sd = shard_data_[idx];

CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
CHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK_GT(run_barrier_.DEBUG_Count(), 0u);

VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;

Expand Down Expand Up @@ -626,9 +645,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
}
}

DecreaseRunCnt();
// From this point on we can not access 'this'.

run_barrier_.Dec(this); // From this point on we can not access 'this'.
return !is_concluding;
}

Expand Down Expand Up @@ -718,71 +735,56 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude.
}

// If we run only on one shard and conclude, we can avoid scheduling at all
// and directly dispatch the task to its destination shard.
// If we run only on one shard and conclude, we can possibly avoid scheduling at all
// and directly run the callback on the destination thread if the locks are free.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();

bool was_ooo = false;
bool run_inline = false;
ServerState* ss = nullptr;
bool was_ooo = false, was_inline = false;

if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(IsActive(unique_shard_id_));
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);

// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
run_count_.store(1, memory_order_release);

time_now_ms_ = GetCurrentTimeMs();
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);

// NOTE: schedule_cb cannot update data on stack when run_fast is false.
// This is because ScheduleSingleHop can finish before the callback returns.
// Start new phase, be careful with writes until phase end!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"be careful with writes" is not clear :)
what do you mean?

Copy link
Contributor Author

@dranikpg dranikpg Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be careful 😄 There are all kinds of writes below, so we can't say they're not allowed, it's just based on the single shard logic and/or fields that are accessed separately

run_barrier_.Start(1);

// This happens when ScheduleUniqueShard schedules into TxQueue (hence run_fast is false), and
// then calls PollExecute that in turn runs the callback which calls DecreaseRunCnt. As a result
// WaitForShardCallbacks below is unblocked before schedule_cb returns. However, if run_fast is
// true, then we may mutate stack variables, but only before DecreaseRunCnt is called.
auto schedule_cb = [this, &was_ooo] {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
if (run_fast) {
// We didn't decrease the barrier, so the scope is valid UNTIL Dec() below
DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u);
was_ooo = true;
// it's important to DecreaseRunCnt only for run_fast and after was_ooo is assigned.
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value
// to was_ooo and cause stack corruption.
DecreaseRunCnt();
run_barrier_.Dec(this);
}
// Otherwise it's not safe to access the function scope, as
// ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below.
};

ss = ServerState::tlocal();
auto* ss = ServerState::tlocal();
if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) {
DVLOG(2) << "Inline scheduling a transaction";
schedule_cb();
run_inline = true;
was_inline = true;
} else {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
}
} else { // This transaction either spans multiple shards and/or is multi.
if (!IsAtomicMulti()) // Multi schedule in advance.
} else {
// This transaction either spans multiple shards and/or is multi, which schedule in advance.
if (!IsAtomicMulti())
ScheduleInternal();

ExecuteAsync();
}

DVLOG(2) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
WaitForShardCallbacks();
DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId();
run_barrier_.Wait();

if (schedule_fast) {
CHECK(!cb_ptr_); // we should have reset it within the callback.
if (was_ooo) {
ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
RecordTxScheduleFastStats(this, was_ooo, was_inline);
}
cb_ptr_ = nullptr;
return local_result_;
Expand Down Expand Up @@ -814,9 +816,6 @@ void Transaction::UnlockMulti() {
unsigned shard_journals_cnt =
ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;

uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
DCHECK_EQ(prev, 0u);

use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() {
Expand Down Expand Up @@ -867,56 +866,41 @@ void Transaction::Execute(RunnableType cb, bool conclude) {

ExecuteAsync();

DVLOG(1) << "Execute::WaitForCbs " << DebugId();
WaitForShardCallbacks();
DVLOG(1) << "Execute::WaitForCbs " << DebugId() << " completed";
run_barrier_.Wait();

cb_ptr_ = nullptr;
}

// Runs in coordinator thread.
void Transaction::ExecuteAsync() {
DVLOG(1) << "ExecuteAsync " << DebugId();

DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());

// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
// executed by the engine shard once it has been armed and coordinator thread will finish the
// transaction before engine shard thread stops accessing it. Therefore, we increase reference
// by number of callbacks accessing 'this' to allow callbacks to execute shard->Execute(this);
// safely.
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
// Set armed flags on all active shards
IterateActiveShards([](auto& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });

// We access sd.is_armed outside of shard-threads but we guard it with run_count_ release.
IterateActiveShards(
[](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });

// this fence prevents that a read or write operation before a release fence will be reordered
// with a write operation after a release fence. Specifically no writes below will be reordered
// upwards. Important, because it protects non-threadsafe local_mask from being accessed by
// IsArmedInShard in other threads.
run_count_.store(unique_shard_cnt_, memory_order_release);
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
// and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end!
run_barrier_.Start(unique_shard_cnt_);

auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
intrusive_ptr_release(this); // against use_count_.fetch_add above.
return;
}

auto cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); // for each pointer from poll_cb

auto poll_cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
DVLOG(3) << "ptr_release " << DebugId();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};

// IsArmedInShard is the protector of non-thread safe data.
IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); });
IterateActiveShards([&poll_cb](PerShardData& sd, auto i) { shard_set->Add(i, poll_cb); });
}

void Transaction::Conclude() {
Expand Down Expand Up @@ -990,16 +974,15 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
DCHECK(!IsGlobal());
DVLOG(1) << "ExpireBlocking " << DebugId();

run_count_.store(unique_shard_cnt_, memory_order_release);
run_barrier_.Start(unique_shard_cnt_);

auto expire_cb = [this, &wcb] {
EngineShard* es = EngineShard::tlocal();
ExpireShardCb(wcb(this, es), es);
};
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });

WaitForShardCallbacks();
run_barrier_.Wait();
DVLOG(1) << "ExpireBlocking finished " << DebugId();
}

Expand Down Expand Up @@ -1302,8 +1285,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {

// Resume processing of transaction queue
shard->PollExecution("unwatchcb", nullptr);

DecreaseRunCnt();
run_barrier_.Dec(this);
}

OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
Expand Down Expand Up @@ -1363,28 +1345,6 @@ void Transaction::UnlockMultiShardCb(absl::Span<const std::string_view> sharded_
shard->blocking_controller()->NotifyPending();

shard->PollExecution("unlockmulti", nullptr);

this->DecreaseRunCnt();
}

void Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late.
::boost::intrusive_ptr guard(this);

// We use release so that no stores will be reordered after.
// It's needed because we need to enforce that all stores executed before this point
// are visible right after run_count_ is unblocked in the coordinator thread.
// The fact that run_ec_.notify() does release operation is not enough, because
// WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0.
uint32_t res = run_count_.fetch_sub(1, memory_order_release);

CHECK_GE(res, 1u) << unique_shard_cnt_ << " " << unique_shard_id_ << " " << cid_->name() << " "
<< use_count_.load(memory_order_relaxed) << " " << uint32_t(coordinator_state_);

if (res == 1) {
run_ec_.notify();
}
}

bool Transaction::IsGlobal() const {
Expand Down
Loading
Loading