Skip to content

Commit

Permalink
chore: improve Migration() (#3033)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostasrim committed May 28, 2024
1 parent b02a789 commit b89997c
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,8 @@ void Connection::OnPostMigrateThread() {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

DCHECK(!dispatch_fb_.IsJoinable());

// Update tl variables
queue_backpressure_ = &tl_queue_backpressure_;

Expand Down Expand Up @@ -1116,11 +1118,17 @@ void Connection::HandleMigrateRequest() {

DecreaseStatsOnClose();

this->Migrate(dest);
// We need to return early as the socket is closing and IoLoop will clean up.
// The reason that this is true is because of the following DCHECK
DCHECK(!dispatch_fb_.IsJoinable());
// which can never trigger since we Joined on the dispatch_fb_ above and we are
// atomic in respect to our proactor meaning that no other fiber will
// launch the DispatchFiber.
if (!this->Migrate(dest)) {
return;
}
}

DCHECK(dispatch_q_.empty());

// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
LaunchDispatchFiberIfNeeded();
}
Expand Down Expand Up @@ -1331,6 +1339,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
while (!builder->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
if (cc_->conn_closing)
Expand Down Expand Up @@ -1457,7 +1466,10 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) {
CHECK(!cc_->async_dispatch);
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure
if (dispatch_fb_.IsJoinable() || cc_->conn_closing) { // can't move once it started
// Migrate is only used by DFLY Thread and Flow command which both check against
// the result of Migration and handle it explicitly in their flows so this can act
// as a weak if condition instead of a crash prone CHECK.
if (dispatch_fb_.IsJoinable() || cc_->conn_closing) {
return false;
}

Expand Down

0 comments on commit b89997c

Please sign in to comment.