Skip to content

Commit

Permalink
Fixes : ceph#12018
Browse files Browse the repository at this point in the history
resend writes after pool loses full flag

Signed-off-by: xinxin shu <xinxin.shu@intel.com>
(cherry picked from commit dbcf2e4)

Conflicts:
	src/osdc/Objecter.cc
	src/osdc/Objecter.h
  • Loading branch information
xinxin shu authored and shinobu-x committed Mar 9, 2017
1 parent 4345de2 commit 2b0646a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 18 deletions.
94 changes: 77 additions & 17 deletions src/osdc/Objecter.cc
Expand Up @@ -898,7 +898,8 @@ bool Objecter::ms_dispatch(Message *m)

void Objecter::_scan_requests(OSDSession *s,
bool force_resend,
bool force_resend_writes,
bool cluster_full,
map<int64_t, bool> *pool_full_map,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
map<ceph_tid_t, CommandOp*>& need_resend_command)
Expand All @@ -918,8 +919,10 @@ void Objecter::_scan_requests(OSDSession *s,
assert(op->session == s);
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
bool unregister;
bool unregister, force_resend_writes = cluster_full;
int r = _recalc_linger_op_target(op, lc);
if (pool_full_map)
force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend && !force_resend_writes)
Expand Down Expand Up @@ -947,6 +950,9 @@ void Objecter::_scan_requests(OSDSession *s,
Op *op = p->second;
++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
ldout(cct, 10) << " checking op " << op->tid << dendl;
bool force_resend_writes = cluster_full;
if (pool_full_map)
force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
int r = _calc_target(&op->target, &op->last_force_resend);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
Expand All @@ -973,6 +979,9 @@ void Objecter::_scan_requests(OSDSession *s,
CommandOp *c = cp->second;
++cp;
ldout(cct, 10) << " checking command " << c->tid << dendl;
bool force_resend_writes = cluster_full;
if (pool_full_map)
force_resend_writes = force_resend_writes || (*pool_full_map)[c->target_pg.pool()];
int r = _calc_command_target(c);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
Expand Down Expand Up @@ -1020,9 +1029,14 @@ void Objecter::handle_osd_map(MOSDMap *m)
}

bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
bool was_full = _osdmap_full_flag();
bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || was_full;
bool cluster_full = _osdmap_full_flag();
bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || _osdmap_has_pool_full();
map<int64_t, bool> pool_full_map;
for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
it != osdmap->get_pools().end(); it++)
pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);


list<LingerOp*> need_resend_linger;
map<ceph_tid_t, Op*> need_resend;
map<ceph_tid_t, CommandOp*> need_resend_command;
Expand Down Expand Up @@ -1073,18 +1087,19 @@ void Objecter::handle_osd_map(MOSDMap *m)
}
logger->set(l_osdc_map_epoch, osdmap->get_epoch());

was_full = was_full || _osdmap_full_flag();
_scan_requests(homeless_session, skipped_map, was_full,
need_resend, need_resend_linger,
need_resend_command);
cluster_full = cluster_full || _osdmap_full_flag();
update_pool_full_map(pool_full_map);
_scan_requests(homeless_session, skipped_map, cluster_full,
&pool_full_map, need_resend,
need_resend_linger, need_resend_command);

// osd addr changes?
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ) {
OSDSession *s = p->second;
_scan_requests(s, skipped_map, was_full,
need_resend, need_resend_linger,
need_resend_command);
_scan_requests(s, skipped_map, cluster_full,
&pool_full_map, need_resend,
need_resend_linger, need_resend_command);
++p;
if (!osdmap->is_up(s->osd) ||
(s->con &&
Expand All @@ -1102,14 +1117,14 @@ void Objecter::handle_osd_map(MOSDMap *m)
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ++p) {
OSDSession *s = p->second;
_scan_requests(s, false, false, need_resend, need_resend_linger,
need_resend_command);
_scan_requests(s, false, false, NULL, need_resend,
need_resend_linger, need_resend_command);
}
ldout(cct, 3) << "handle_osd_map decoding full epoch "
<< m->get_last() << dendl;
osdmap->decode(m->maps[m->get_last()]);

_scan_requests(homeless_session, false, false,
_scan_requests(homeless_session, false, false, NULL,
need_resend, need_resend_linger,
need_resend_command);
} else {
Expand All @@ -1122,7 +1137,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
}

bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || _osdmap_has_pool_full();

// was/is paused?
if (was_pauserd || was_pausewr || pauserd || pausewr || osdmap->get_epoch() < epoch_barrier) {
Expand Down Expand Up @@ -2154,7 +2169,8 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
ldout(cct, 10) << " paused read " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
_maybe_request_map();
} else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && _osdmap_full_flag()) {
} else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
(_osdmap_full_flag() || _osdmap_pool_full(op->target.base_oloc.pool))) {
ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
_maybe_request_map();
Expand Down Expand Up @@ -2357,8 +2373,9 @@ bool Objecter::is_pg_changed(

bool Objecter::target_should_be_paused(op_target_t *t)
{
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || pi->has_flag(pg_pool_t::FLAG_FULL);

return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
(t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
Expand All @@ -2375,6 +2392,38 @@ bool Objecter::osdmap_full_flag() const
return _osdmap_full_flag();
}

bool Objecter::osdmap_pool_full(const int64_t pool_id) const
{
RWLock::RLocker rl(rwlock);

if (_osdmap_full_flag()) {
return true;
}

return _osdmap_pool_full(pool_id);
}

bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
{
const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
if (pool == NULL) {
ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
return false;
}

return pool->has_flag(pg_pool_t::FLAG_FULL);
}

bool Objecter::_osdmap_has_pool_full() const
{
for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
it != osdmap->get_pools().end(); it++) {
if (it->second.has_flag(pg_pool_t::FLAG_FULL))
return true;
}
return false;
}

/**
* Wrapper around osdmap->test_flag for special handling of the FULL flag.
*/
Expand All @@ -2384,6 +2433,17 @@ bool Objecter::_osdmap_full_flag() const
return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
}

void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
{
for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
it != osdmap->get_pools().end(); it++) {
if (pool_full_map.find(it->first) == pool_full_map.end()) {
pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);
} else {
pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL) || pool_full_map[it->first];
}
}
}

int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
const string& ns)
Expand Down
14 changes: 13 additions & 1 deletion src/osdc/Objecter.h
Expand Up @@ -1711,6 +1711,16 @@ class Objecter : public md_config_obs_t, public Dispatcher {

bool osdmap_full_flag() const;

/**
* Test pg_pool_t::FLAG_FULL on a pool
*
* @return true if the pool exists and has the flag set, or
* the global full flag is set, else false
*/
bool osdmap_pool_full(const int64_t pool_id) const;
bool _osdmap_pool_full(const int64_t pool_id) const;
void update_pool_full_map(map<int64_t, bool>& pool_full_map);

private:
map<uint64_t, LingerOp*> linger_ops;
// we use this just to confirm a cookie is valid before dereferencing the ptr
Expand Down Expand Up @@ -1756,6 +1766,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
RECALC_OP_TARGET_OSD_DOWN,
};
bool _osdmap_full_flag() const;
bool _osdmap_has_pool_full() const;

bool target_should_be_paused(op_target_t *op);
int _calc_target(op_target_t *t, epoch_t *last_force_resend=0, bool any_change=false);
Expand Down Expand Up @@ -1919,7 +1930,8 @@ class Objecter : public md_config_obs_t, public Dispatcher {

void _scan_requests(OSDSession *s,
bool force_resend,
bool force_resend_writes,
bool cluster_full,
map<int64_t, bool> *pool_full_map,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
map<ceph_tid_t, CommandOp*>& need_resend_command);
Expand Down

0 comments on commit 2b0646a

Please sign in to comment.