Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

osdc/Objecter: resend pg commands on interval change #12910

Merged
merged 2 commits into from Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 19 additions & 23 deletions src/osdc/Objecter.cc
Expand Up @@ -860,7 +860,7 @@ void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)

// Populate Op::target
OSDSession *s = NULL;
_calc_target(&info->target, &info->last_force_resend);
_calc_target(&info->target);

// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
Expand Down Expand Up @@ -1068,7 +1068,7 @@ void Objecter::_scan_requests(OSDSession *s,
if (pool_full_map)
force_resend_writes = force_resend_writes ||
(*pool_full_map)[op->target.base_oloc.pool];
int r = _calc_target(&op->target, &op->last_force_resend);
int r = _calc_target(&op->target);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend &&
Expand Down Expand Up @@ -1296,7 +1296,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
p != need_resend_linger.end(); ++p) {
LingerOp *op = *p;
if (!op->session) {
_calc_target(&op->target, &op->last_force_resend);
_calc_target(&op->target);
OSDSession *s = NULL;
int const r = _get_session(op->target.osd, &s, sul);
assert(r == 0);
Expand Down Expand Up @@ -2296,7 +2296,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
assert(op->session == NULL);
OSDSession *s = NULL;

bool check_for_latest_map = _calc_target(&op->target, &op->last_force_resend)
bool check_for_latest_map = _calc_target(&op->target)
== RECALC_OP_TARGET_POOL_DNE;

// Try to get a session, including a retry if we need to take write lock
Expand All @@ -2313,7 +2313,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
// map changed; recalculate mapping
ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
<< dendl;
check_for_latest_map = _calc_target(&op->target, &op->last_force_resend)
check_for_latest_map = _calc_target(&op->target)
== RECALC_OP_TARGET_POOL_DNE;
if (s) {
put_session(s);
Expand Down Expand Up @@ -2670,8 +2670,7 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
return p->raw_hash_to_pg(p->hash_key(key, ns));
}

int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend,
bool any_change)
int Objecter::_calc_target(op_target_t *t, bool any_change)
{
// rwlock is locked

Expand All @@ -2687,11 +2686,12 @@ int Objecter::_calc_target(op_target_t *t, epoch_t *last_force_resend,
bool force_resend = false;
bool need_check_tiering = false;
if (osdmap->get_epoch() == pi->last_force_op_resend) {
if (last_force_resend && *last_force_resend < pi->last_force_op_resend) {
*last_force_resend = pi->last_force_op_resend;
if (t->last_force_resend < pi->last_force_op_resend) {
t->last_force_resend = pi->last_force_op_resend;
force_resend = true;
} else if (last_force_resend == 0)
} else if (t->last_force_resend == 0) {
force_resend = true;
}
}
if (t->target_oid.name.empty() || force_resend) {
t->target_oid = t->base_oid;
Expand Down Expand Up @@ -2954,7 +2954,7 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
{
// rwlock is locked unique

int r = _calc_target(&linger_op->target, &linger_op->last_force_resend,
int r = _calc_target(&linger_op->target,
true);
if (r == RECALC_OP_TARGET_NEED_RESEND) {
ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
Expand Down Expand Up @@ -4750,26 +4750,22 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
c->map_check_error_str = "osd down";
return RECALC_OP_TARGET_OSD_DOWN;
}
c->osd = c->target_osd;
c->target.osd = c->target_osd;
} else {
if (!osdmap->have_pg_pool(c->target_pg.pool())) {
int ret = _calc_target(&(c->target), true);
if (ret == RECALC_OP_TARGET_POOL_DNE) {
c->map_check_error = -ENOENT;
c->map_check_error_str = "pool dne";
return RECALC_OP_TARGET_POOL_DNE;
}
vector<int> acting;
int acting_primary;
osdmap->pg_to_acting_osds(c->target_pg, &acting, &acting_primary);
if (acting_primary == -1) {
return ret;
} else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
c->map_check_error = -ENXIO;
c->map_check_error_str = "osd down";
return RECALC_OP_TARGET_OSD_DOWN;
return ret;
}
c->osd = acting_primary;
}

OSDSession *s;
int r = _get_session(c->osd, &s, sul);
int r = _get_session(c->target.osd, &s, sul);
assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */

if (c->session != s) {
Expand All @@ -4791,7 +4787,7 @@ void Objecter::_assign_command_session(CommandOp *c,
assert(sul.owns_lock() && sul.mutex() == &rwlock);

OSDSession *s;
int r = _get_session(c->osd, &s, sul);
int r = _get_session(c->target.osd, &s, sul);
assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */

if (c->session != s) {
Expand Down
160 changes: 91 additions & 69 deletions src/osdc/Objecter.h
Expand Up @@ -1166,48 +1166,49 @@ class Objecter : public md_config_obs_t, public Dispatcher {
struct OSDSession;

struct op_target_t {
int flags;
int flags = 0;
object_t base_oid;
object_locator_t base_oloc;
object_t target_oid;
object_locator_t target_oloc;

bool precalc_pgid; ///< true if we are directed at base_pgid, not base_oid
pg_t base_pgid; ///< explciti pg target, if any
///< true if we are directed at base_pgid, not base_oid
bool precalc_pgid = false;

///< explcit pg target, if any
pg_t base_pgid;

pg_t pgid; ///< last pg we mapped to
unsigned pg_num; ///< last pg_num we mapped to
unsigned pg_num_mask; ///< last pg_num_mask we mapped to
unsigned pg_num = 0; ///< last pg_num we mapped to
unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
vector<int> up; ///< set of up osds for last pg we mapped to
vector<int> acting; ///< set of acting osds for last pg we mapped to
int up_primary; ///< primary for last pg we mapped to based on the up set
int acting_primary; ///< primary for last pg we mapped to based on the
/// acting set
int size; ///< the size of the pool when were were last mapped
int min_size; ///< the min size of the pool when were were last mapped
bool sort_bitwise; ///< whether the hobject_t sort order is bitwise
int up_primary = -1; ///< last up_primary we mapped to
int acting_primary = -1; ///< last acting_primary we mapped to
int size = -1; ///< the size of the pool when were were last mapped
int min_size = -1; ///< the min size of the pool when were were last mapped
bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise

bool used_replica = false;
bool paused = false;

bool used_replica;
bool paused;
int osd = -1; ///< the final target osd, or -1

int osd; ///< the final target osd, or -1
epoch_t last_force_resend = 0;

op_target_t(object_t oid, object_locator_t oloc, int flags)
: flags(flags),
base_oid(oid),
base_oloc(oloc),
precalc_pgid(false),
pg_num(0),
pg_num_mask(0),
up_primary(-1),
acting_primary(-1),
size(-1),
min_size(-1),
sort_bitwise(false),
used_replica(false),
paused(false),
osd(-1)
{}
base_oloc(oloc)
{}

op_target_t(pg_t pgid)
: base_oloc(pgid.pool(), pgid.ps()),
precalc_pgid(true),
base_pgid(pgid)
{}

op_target_t() = default;

void dump(Formatter *f) const;
};
Expand Down Expand Up @@ -1261,8 +1262,6 @@ class Objecter : public md_config_obs_t, public Dispatcher {

int *data_offset;

epoch_t last_force_resend;

osd_reqid_t reqid; // explicitly setting reqid

Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
Expand All @@ -1286,8 +1285,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
budgeted(false),
should_resend(true),
ctx_budgeted(false),
data_offset(offset),
last_force_resend(0) {
data_offset(offset) {
ops.swap(op);

/* initialize out_* to match op vector */
Expand Down Expand Up @@ -1549,29 +1547,56 @@ class Objecter : public md_config_obs_t, public Dispatcher {

// -- osd commands --
struct CommandOp : public RefCountedObject {
OSDSession *session;
ceph_tid_t tid;
OSDSession *session = nullptr;
ceph_tid_t tid = 0;
vector<string> cmd;
bufferlist inbl;
bufferlist *poutbl;
string *prs;
int target_osd;
pg_t target_pg;
int osd; /* calculated osd for sending request */
epoch_t map_dne_bound;
int map_check_error; // error to return if map check fails
const char *map_check_error_str;
Context *onfinish;
uint64_t ontimeout;
bufferlist *poutbl = nullptr;
string *prs = nullptr;

// target_osd == -1 means target_pg is valid
const int target_osd = -1;
const pg_t target_pg;

op_target_t target;

epoch_t map_dne_bound = 0;
int map_check_error = 0; // error to return if map check fails
const char *map_check_error_str = nullptr;

Context *onfinish = nullptr;
uint64_t ontimeout = 0;
ceph::mono_time last_submit;

CommandOp()
: session(NULL),
tid(0), poutbl(NULL), prs(NULL), target_osd(-1), osd(-1),
map_dne_bound(0),
map_check_error(0),
map_check_error_str(NULL),
onfinish(NULL), ontimeout(0) {}
CommandOp(
int target_osd,
const vector<string> &cmd,
bufferlist inbl,
bufferlist *poutbl,
string *prs,
Context *onfinish)
: cmd(cmd),
inbl(inbl),
poutbl(poutbl),
prs(prs),
target_osd(target_osd),
onfinish(onfinish) {}

CommandOp(
pg_t pgid,
const vector<string> &cmd,
bufferlist inbl,
bufferlist *poutbl,
string *prs,
Context *onfinish)
: cmd(cmd),
inbl(inbl),
poutbl(poutbl),
prs(prs),
target_pg(pgid),
target(pgid),
onfinish(onfinish) {}

};

int submit_command(CommandOp *c, ceph_tid_t *ptid);
Expand Down Expand Up @@ -1640,8 +1665,6 @@ class Objecter : public md_config_obs_t, public Dispatcher {
ceph_tid_t ping_tid;
epoch_t map_dne_bound;

epoch_t last_force_resend;

void _queued_async() {
// watch_lock ust be locked unique
watch_pending_async.push_back(ceph::mono_clock::now());
Expand All @@ -1667,8 +1690,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
session(NULL),
register_tid(0),
ping_tid(0),
map_dne_bound(0),
last_force_resend(0) {}
map_dne_bound(0) {}

// no copy!
const LingerOp &operator=(const LingerOp& r);
Expand Down Expand Up @@ -1834,7 +1856,7 @@ class Objecter : public md_config_obs_t, public Dispatcher {
bool _osdmap_has_pool_full() const;

bool target_should_be_paused(op_target_t *op);
int _calc_target(op_target_t *t, epoch_t *last_force_resend = 0,
int _calc_target(op_target_t *t,
bool any_change = false);
int _map_session(op_target_t *op, OSDSession **s,
shunique_lock& lc);
Expand Down Expand Up @@ -2128,25 +2150,25 @@ class Objecter : public md_config_obs_t, public Dispatcher {
const bufferlist& inbl, ceph_tid_t *ptid,
bufferlist *poutbl, string *prs, Context *onfinish) {
assert(osd >= 0);
CommandOp *c = new CommandOp;
c->cmd = cmd;
c->inbl = inbl;
c->poutbl = poutbl;
c->prs = prs;
c->onfinish = onfinish;
c->target_osd = osd;
CommandOp *c = new CommandOp(
osd,
cmd,
inbl,
poutbl,
prs,
onfinish);
return submit_command(c, ptid);
}
int pg_command(pg_t pgid, vector<string>& cmd,
const bufferlist& inbl, ceph_tid_t *ptid,
bufferlist *poutbl, string *prs, Context *onfinish) {
CommandOp *c = new CommandOp;
c->cmd = cmd;
c->inbl = inbl;
c->poutbl = poutbl;
c->prs = prs;
c->onfinish = onfinish;
c->target_pg = pgid;
CommandOp *c = new CommandOp(
pgid,
cmd,
inbl,
poutbl,
prs,
onfinish);
return submit_command(c, ptid);
}

Expand Down