Skip to content

Commit

Permalink
osd/PrimaryLogPG: cancel all objecter ops atomically
Browse files Browse the repository at this point in the history
We want to avoid a situation like this:

- start proxy op A (epoch E)
- start proxy op B (epoch E)
- start proxy op C (epoch E)
- objecter sends none of these because target is down in epoch E
- osdmap update to E+1
- pg cancels requeues A, B
- objecter updates to E+1
- objecter sends C
- pg cancels/requeues C

Note that the key thing is that operations on each object are canceled
atomically.  On the interval change we do it all at once.  In the other
cases, we cancel everything on the given object together.

Fixes: http://tracker.ceph.com/issues/22123
Signed-off-by: Sage Weil <sage@redhat.com>
(cherry picked from commit 93fd56e)

Conflicts:
	src/osd/PrimaryLogPG.cc: - Resolved in cancel_copy and added
				  cancel_and_requeue_proxy_ops.
				 - Define io_tids to FlushOp
  • Loading branch information
liewegas authored and Prashant D committed Feb 27, 2018
1 parent b696021 commit 458c1f2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 36 deletions.
106 changes: 77 additions & 29 deletions src/osd/PrimaryLogPG.cc
Expand Up @@ -2899,14 +2899,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid)
in_progress_proxy_ops.erase(p);
}

void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << prdop->soid << dendl;
prdop->canceled = true;

// cancel objecter op, if we can
if (prdop->objecter_tid) {
osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
tids->push_back(prdop->objecter_tid);
for (uint32_t i = 0; i < prdop->ops.size(); i++) {
prdop->ops[i].outdata.clear();
}
Expand All @@ -2915,20 +2916,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
}
}

void PrimaryLogPG::cancel_proxy_ops(bool requeue)
void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;

// cancel proxy reads
map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
while (p != proxyread_ops.end()) {
cancel_proxy_read((p++)->second);
cancel_proxy_read((p++)->second, tids);
}

// cancel proxy writes
map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
while (q != proxywrite_ops.end()) {
cancel_proxy_write((q++)->second);
cancel_proxy_write((q++)->second, tids);
}

if (requeue) {
Expand Down Expand Up @@ -3082,14 +3083,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
pwop->ctx = NULL;
}

void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << pwop->soid << dendl;
pwop->canceled = true;

// cancel objecter op, if we can
if (pwop->objecter_tid) {
osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
tids->push_back(pwop->objecter_tid);
delete pwop->ctx;
pwop->ctx = NULL;
proxywrite_ops.erase(pwop->objecter_tid);
Expand Down Expand Up @@ -8026,7 +8028,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
// FIXME: if the src etc match, we could avoid restarting from the
// beginning.
CopyOpRef cop = copy_ops[dest];
cancel_copy(cop, false);
vector<ceph_tid_t> tids;
cancel_copy(cop, false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
}

CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
Expand Down Expand Up @@ -8106,6 +8110,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)

void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
{
vector<ceph_tid_t> tids;
dout(10) << __func__ << " " << oid << " tid " << tid
<< " " << cpp_strerror(r) << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
Expand Down Expand Up @@ -8292,25 +8297,48 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
it != proxyread_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
cancel_proxy_read((it++)->second);
cancel_proxy_read((it++)->second, &tids);
} else {
++it;
}
}
for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
it != proxywrite_ops.end();) {
if (it->second->soid == cobc->obs.oi.soid) {
cancel_proxy_write((it++)->second);
cancel_proxy_write((it++)->second, &tids);
} else {
++it;
}
}
osd->objecter->op_cancel(tids, -ECANCELED);
kick_proxy_ops_blocked(cobc->obs.oi.soid);
}

kick_object_context_blocked(cobc);
}

void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
vector<ceph_tid_t> tids;
for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
it != proxyread_ops.end();) {
if (it->second->soid == oid) {
cancel_proxy_read((it++)->second, &tids);
} else {
++it;
}
}
for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
it != proxywrite_ops.end();) {
if (it->second->soid == oid) {
cancel_proxy_write((it++)->second, &tids);
} else {
++it;
}
}
osd->objecter->op_cancel(tids, -ECANCELED);
kick_proxy_ops_blocked(oid);
}

void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
{
dout(20) << __func__ << " " << cop
Expand Down Expand Up @@ -8645,18 +8673,19 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
agent_choose_mode();
}

void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
<< " from " << cop->src << " " << cop->oloc
<< " v" << cop->results.user_version << dendl;

// cancel objecter op, if we can
if (cop->objecter_tid) {
osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
tids->push_back(cop->objecter_tid);
cop->objecter_tid = 0;
if (cop->objecter_tid2) {
osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
tids->push_back(cop->objecter_tid2);
cop->objecter_tid2 = 0;
}
}
Expand All @@ -8675,13 +8704,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
cop->obc = ObjectContextRef();
}

void PrimaryLogPG::cancel_copy_ops(bool requeue)
void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
while (p != copy_ops.end()) {
// requeue this op? can I queue up all of them?
cancel_copy((p++)->second, requeue);
cancel_copy((p++)->second, requeue, tids);
}
}

Expand Down Expand Up @@ -8817,7 +8846,9 @@ int PrimaryLogPG::start_flush(
osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
fop->dup_ops.pop_front();
}
cancel_flush(fop, false);
vector<ceph_tid_t> tids;
cancel_flush(fop, false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
}

/**
Expand Down Expand Up @@ -9017,7 +9048,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
return -EAGAIN; // will retry
} else {
osd->logger->inc(l_osd_tier_try_flush_fail);
cancel_flush(fop, false);
vector<ceph_tid_t> tids;
cancel_flush(fop, false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}
}
Expand Down Expand Up @@ -9056,7 +9089,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
close_op_ctx(ctx.release());
osd->logger->inc(l_osd_tier_try_flush_fail);
cancel_flush(fop, false);
vector<ceph_tid_t> tids;
cancel_flush(fop, false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);
return -ECANCELED;
}

Expand Down Expand Up @@ -9096,15 +9131,22 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
return -EINPROGRESS;
}

void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
<< fop->objecter_tid << dendl;
if (fop->objecter_tid) {
osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
tids->push_back(fop->objecter_tid);
fop->objecter_tid = 0;
}
if (fop->blocking) {
if (fop->io_tids.size()) {
for (auto &p : fop->io_tids) {
tids->push_back(p.second);
p.second = 0;
}
}
if (fop->blocking && fop->obc->is_blocked()) {
fop->obc->stop_block();
kick_object_context_blocked(fop->obc);
}
Expand All @@ -9120,12 +9162,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
flush_ops.erase(fop->obc->obs.oi.soid);
}

void PrimaryLogPG::cancel_flush_ops(bool requeue)
void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
while (p != flush_ops.end()) {
cancel_flush((p++)->second, requeue);
cancel_flush((p++)->second, requeue, tids);
}
}

Expand Down Expand Up @@ -11002,9 +11044,13 @@ void PrimaryLogPG::on_shutdown()
scrub_clear_state();

unreg_next_scrub();
cancel_copy_ops(false);
cancel_flush_ops(false);
cancel_proxy_ops(false);

vector<ceph_tid_t> tids;
cancel_copy_ops(false, &tids);
cancel_flush_ops(false, &tids);
cancel_proxy_ops(false, &tids);
osd->objecter->op_cancel(tids, -ECANCELED);

apply_and_flush_repops(false);
cancel_log_updates();
// we must remove PGRefs, so do this this prior to release_backoffs() callers
Expand Down Expand Up @@ -11109,9 +11155,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)

clear_scrub_reserved();

cancel_copy_ops(is_primary());
cancel_flush_ops(is_primary());
cancel_proxy_ops(is_primary());
vector<ceph_tid_t> tids;
cancel_copy_ops(is_primary(), &tids);
cancel_flush_ops(is_primary(), &tids);
cancel_proxy_ops(is_primary(), &tids);
osd->objecter->op_cancel(tids, -ECANCELED);

// requeue object waiters
for (auto& p : waiting_for_unreadable_object) {
Expand Down
17 changes: 10 additions & 7 deletions src/osd/PrimaryLogPG.h
Expand Up @@ -228,6 +228,8 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {
bool blocking; ///< whether we are blocking updates
bool removal; ///< we are removing the backend object
boost::optional<std::function<void()>> on_flush; ///< callback, may be null
// for chunked object
map<uint64_t, ceph_tid_t> io_tids;

FlushOp()
: flushed_version(0), objecter_tid(0), rval(0),
Expand Down Expand Up @@ -1288,8 +1290,8 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
void finish_copyfrom(CopyFromCallback *cb);
void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
void cancel_copy(CopyOpRef cop, bool requeue);
void cancel_copy_ops(bool requeue);
void cancel_copy(CopyOpRef cop, bool requeue, vector<ceph_tid_t> *tids);
void cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids);

friend struct C_Copyfrom;

Expand All @@ -1303,8 +1305,8 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {
boost::optional<std::function<void()>> &&on_flush);
void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
int try_flush_mark_clean(FlushOpRef fop);
void cancel_flush(FlushOpRef fop, bool requeue);
void cancel_flush_ops(bool requeue);
void cancel_flush(FlushOpRef fop, bool requeue, vector<ceph_tid_t> *tids);
void cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids);

/// @return false if clone is has been evicted
bool is_present_clone(hobject_t coid);
Expand Down Expand Up @@ -1351,23 +1353,24 @@ class PrimaryLogPG : public PG, public PGBackend::Listener {

map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
void kick_proxy_ops_blocked(hobject_t& soid);
void cancel_proxy_ops(bool requeue);
void cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids);

// -- proxyread --
map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;

void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
void cancel_proxy_read(ProxyReadOpRef prdop);
void cancel_proxy_read(ProxyReadOpRef prdop, vector<ceph_tid_t> *tids);

friend struct C_ProxyRead;

// -- proxywrite --
map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;

void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
void cancel_and_requeue_proxy_ops(hobject_t oid);
void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
void cancel_proxy_write(ProxyWriteOpRef pwop);
void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);

friend struct C_ProxyWrite_Commit;

Expand Down

0 comments on commit 458c1f2

Please sign in to comment.