Skip to content

Commit

Permalink
osd/ECBackend: fix on_write ordering w/ sync onreadable callbacks
Browse files Browse the repository at this point in the history
When we call handle_sub_write after a write completion, we may
do a sync read completion and then call back into check_ops().  Attaching
the on_write events to the op we're applying means that we don't ensure
that the on_write event(s) happen before the next write in the queue
is submitted (when we call back into check_ops()).

For example, if we have op A, on_write event W, then op B, a sync
applied completion would mean that we would queue the write for A, call
back into SubWriteApplied -> handle_sub_write_reply -> check_ops and then
process B... before getting to W.

Resolve this by attaching the on_write callback to a separate Op that is
placed into the queue, just like any other Op.  This keeps the ordering
logic clean, although it is a bit ugly with the polymorphism around Op
being either an Op or an on_write callback.

Signed-off-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Dec 11, 2017
1 parent fef5fe8 commit f525d86
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 22 deletions.
59 changes: 38 additions & 21 deletions src/osd/ECBackend.cc
Expand Up @@ -1496,14 +1496,26 @@ void ECBackend::submit_transaction(
dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
}

void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
if (!waiting_state.empty()) {
waiting_state.back().on_write.emplace_back(std::move(cb));
} else if (!waiting_reads.empty()) {
waiting_reads.back().on_write.emplace_back(std::move(cb));
} else {
// Nothing earlier in the pipeline, just call it
void ECBackend::call_write_ordered(std::function<void(void)> &&cb)
{
if (waiting_state.empty() &&
waiting_reads.empty()) {
dout(10) << __func__ << " sync" << dendl;
cb();
} else {
ceph_tid_t tid = parent->get_tid();
Op& op = tid_to_op_map[tid];
op.tid = tid;
op.on_write = std::move(cb);
if (!waiting_state.empty()) {
dout(10) << __func__ << " tid " << tid << " waiting_state" << dendl;
waiting_state.push_back(op);
} else if (!waiting_reads.empty()) {
dout(10) << __func__ << " tid " << tid << " waiting_reads" << dendl;
waiting_reads.push_back(op);
} else {
ceph_abort();
}
}
}

Expand Down Expand Up @@ -1810,6 +1822,12 @@ bool ECBackend::try_state_to_reads()
return false;

Op *op = &(waiting_state.front());
if (op->on_write) {
waiting_state.pop_front();
op->on_write();
tid_to_op_map.erase(op->tid);
return true;
}
if (op->requires_rmw() && pipeline_state.cache_invalid()) {
assert(get_parent()->get_pool().allows_ecoverwrites());
dout(20) << __func__ << ": blocking " << *op
Expand Down Expand Up @@ -1884,6 +1902,12 @@ bool ECBackend::try_reads_to_commit()
if (waiting_reads.empty())
return false;
Op *op = &(waiting_reads.front());
if (op->on_write) {
waiting_reads.pop_front();
op->on_write();
tid_to_op_map.erase(op->tid);
return true;
}
if (op->read_in_progress())
return false;
waiting_reads.pop_front();
Expand Down Expand Up @@ -2021,21 +2045,14 @@ bool ECBackend::try_reads_to_commit()
}
}
if (should_write_local) {
handle_sub_write(
get_parent()->whoami_shard(),
op->client_op,
local_write_op,
op->trace,
op->on_local_applied_sync);
op->on_local_applied_sync = 0;
}

for (auto i = op->on_write.begin();
i != op->on_write.end();
op->on_write.erase(i++)) {
(*i)();
handle_sub_write(
get_parent()->whoami_shard(),
op->client_op,
local_write_op,
op->trace,
op->on_local_applied_sync);
op->on_local_applied_sync = 0;
}

return true;
}

Expand Down
6 changes: 5 additions & 1 deletion src/osd/ECBackend.h
Expand Up @@ -469,7 +469,7 @@ class ECBackend : public PGBackend {
map<hobject_t, ObjectContextRef> obc_map;

/// see call_write_ordered
std::list<std::function<void(void)> > on_write;
std::function<void(void)> on_write;

/// Generated internally
set<hobject_t> temp_added;
Expand Down Expand Up @@ -507,6 +507,10 @@ class ECBackend : public PGBackend {
Context *on_local_applied_sync = nullptr;
Context *on_all_applied = nullptr;
Context *on_all_commit = nullptr;

Op() {}
Op(ceph_tid_t t, std::function<void(void)>&& cb)
: tid(t), on_write(cb) { }
~Op() {
delete on_local_applied_sync;
delete on_all_applied;
Expand Down

0 comments on commit f525d86

Please sign in to comment.