-
Notifications
You must be signed in to change notification settings - Fork 876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(transaction): Use PhasedBarrier for easier synchronization #2455
Conversation
src/server/transaction.h
Outdated
// Barrier akin to helio's BlockingCounter, but with phase counting and proper acquire semantics | ||
// for polling work from other threads. And without keap allocation. | ||
struct PhasedBarrier { | ||
uint32_t Start(uint32_t count); // Release: Store count, return phase | ||
bool Active() const; // Acquire: Check if count > 0. Use for polling for work | ||
|
||
void Dec(Transaction* keep_alive); // Relase: Decrease count, notify ec on 0 | ||
void Wait(); // Acquire: Wait until count = 0, increment phase | ||
|
||
bool IsMyPhase(uint32_t phase) const; // Assumes already Acquired. Return true if same phase | ||
uint32_t DEBUG_Get() const; | ||
|
||
private: | ||
std::atomic_uint32_t phase_{0}; | ||
std::atomic_uint32_t count_{0}; | ||
EventCount ec_{}; | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My take is that extracting something into a simple abstraction (but with a clearly defined interface) makes the code much easier to reason about, because we now think in terms of the 4 operations, instead of raw atomics and events, mixed with regular code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's sad we can't reuse helio's BlockingCounter for this. We could, but we have to get rid of heap allocation and allow peeking with acquire semantics... Which is more complicated than counting manually
src/server/transaction.cc
Outdated
EngineShard* shard = EngineShard::tlocal(); | ||
if (IsArmedInShard(shard->shard_id())) { | ||
if (run_barrier_.IsMyPhase(phase)) { | ||
// PollExecution doesn't necessarily this transaction, so execution and cleanup code belongs | ||
// to RunInShard | ||
shard->PollExecution("exec_cb", this); | ||
} else { | ||
VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")"; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My radical opinion: is_armed doesn't have to be a separate atomic and can be a regular shard flag.
How do we read safely from it without exiting the execution phase at the same time?
- Atomically increment
use_count
and enter the section if it's larger than 1 - Because we added an artifical job, Wait() is guaranteed to be waiting for us
- Safely read is_armed and determine here whether we need to run -> profit
Same logic applies to the phase, it's changed only after the execution phase, so it's safe to read while inside it
That would be a goal for another PR
src/server/transaction.cc
Outdated
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); | ||
DCHECK_EQ(prev, 0u); | ||
|
||
run_barrier_.Start(shard_data_.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we do it here if we never wait for the barrier? (reminder, UnlockMulti is supposed to be async) 🤔
80d384d
to
0a8484e
Compare
src/server/transaction.h
Outdated
// Main synchronization point for dispatching hop callbacks and waiting for them to finish. | ||
// After scheduling, sequential hops are executed as follows: | ||
// coordinator: Prepare hop, then Start(num_shards), dispatch poll jobs and Wait() | ||
// tx queue: Once IsArmedInShard() /* checks Active() */ -> run in shard and Dec() | ||
// As long as barrier is active any writes by the coordinator are prohibited, so shard threads | ||
// can safely read transaction state and modify per-shard state belonging to them. | ||
// Inter-thread synchronization is provided by the barriers acquire/release pairs. | ||
PhasedBarrier run_barrier_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now our inter-thread coordination during hops is clearly defined in a single place in simple terms
src/server/transaction.h
Outdated
// Barrier akin to helio's BlockingCounter, but with phase counting and proper acquire semantics | ||
// for polling work from other threads. And without keap allocation. | ||
struct PhasedBarrier { | ||
uint32_t Start(uint32_t count); // Release: Store count, return current phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
phase is like seqlock?
add a comment that it corresponds to a specific hop in a transaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
phase is seqlock, I added a comment in transaction.h that a phase is a hop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not that I suggest doing it in this PR but did you consider merging phase and count into the same atomic_uint32?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need, because #2457 will get rid of it at all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting PR. Please add to PR description not only the motivation but also the description of major changes your performed. It took me a while to understand that state variables have not changed, they just got renamed (or maybe I am wrong and then you can clear this up)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io> fix: fixes Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io> chore: Update, remove and add comments fix: fix comments Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
|
||
// NOTE: schedule_cb cannot update data on stack when run_fast is false. | ||
// This is because ScheduleSingleHop can finish before the callback returns. | ||
// Start new phase, be careful with writes until phase end! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"be careful with writes" is not clear :)
what do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be careful 😄 There are all kinds of writes below, so we can't say they're not allowed, it's just based on the single shard logic and/or fields that are accessed separately
Looks good, added minor comments. Also, please add to the PR description whether this PR has functional changes and if it has - where and I will double check. I suggest running this PR on sidekiq benchmark + regression tests too |
Ran the loadtest 10 (half an hour) times on graviton, seems to work. Updated PR desciption |
This PR introduces a PhasedBarrier primitive to replace the manual control of callback dispatches within transaction. A separate primitive makes way for simpler and easier code, as well as more clearly defined state semantics.
This PR has no functional changes.