Skip to content

Commit

Permalink
Merge branch 'wip-testrados'
Browse files Browse the repository at this point in the history
Lightly-reviewed-by: Josh Durgin <josh.durgin@inktank.com>
  • Loading branch information
Sage Weil committed Oct 21, 2012
2 parents 91a2492 + bcd6aea commit ddde1c8
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 86 deletions.
5 changes: 5 additions & 0 deletions qa/workunits/rados/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@ test_rados_api_snapshots
test_rados_api_stat
test_rados_api_watch_notify

testrados_list_parallel
testrados_open_pools_parallel
testrados_delete_pools_parallel
testrados_watch_notify

exit 0
2 changes: 1 addition & 1 deletion src/common/version.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const char *git_version_to_str(void)
std::string const pretty_version_to_str(void)
{
std::ostringstream oss;
oss << "ceph version " << CEPH_GIT_NICE_VER << " (commit:"
oss << "ceph version " << CEPH_GIT_NICE_VER << " ("
<< STRINGIFY(CEPH_GIT_VER) << ")";
return oss.str();
}
17 changes: 0 additions & 17 deletions src/mon/MonClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,23 +690,6 @@ int MonClient::wait_auth_rotating(double timeout)

// ---------

struct C_IsLatestMap : public Context {
Context *onfinish;
version_t newest;
version_t have;
C_IsLatestMap(Context *f, version_t h) : onfinish(f), newest(0), have(h) {}
void finish(int r) {
onfinish->complete(r == 0 ? (have != newest) : r);
}
};

void MonClient::is_latest_map(string map, version_t cur_ver, Context *onfinish)
{
ldout(cct, 10) << "is_latest_map " << map << " current " << cur_ver << dendl;;
C_IsLatestMap *c = new C_IsLatestMap(onfinish, cur_ver);
get_version(map, &c->newest, NULL, c);
}

void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
{
ldout(cct, 10) << "get_version " << map << dendl;
Expand Down
10 changes: 0 additions & 10 deletions src/mon/MonClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,6 @@ class MonClient : public Dispatcher {

// version requests
public:
/**
* check if we have the latest version of a map
*
* @param map name of map (e.g., 'osdmap')
* @param cur_ver version we have
* @param onfinish completion
* @return (via context) 1 if cur_ver is the latest, 0 if it is not, -EAGAIN if we need to retry
*/
void is_latest_map(string map, version_t cur_ver, Context *onfinish);

/**
* get latest known version(s) of cluster map
*
Expand Down
117 changes: 79 additions & 38 deletions src/osdc/Objecter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void Objecter::send_linger(LingerOp *info)
if (info->session) {
int r = recalc_op_target(o);
if (r == RECALC_OP_TARGET_POOL_DNE) {
linger_check_for_latest_map(info);
check_linger_pool_dne(info);
}
}

Expand Down Expand Up @@ -408,7 +408,7 @@ void Objecter::scan_requests(bool skipped_map,
linger_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
linger_check_for_latest_map(op);
check_linger_pool_dne(op);
break;
}
}
Expand All @@ -431,7 +431,7 @@ void Objecter::scan_requests(bool skipped_map,
op_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
op_check_for_latest_map(op);
check_op_pool_dne(op);
break;
}
}
Expand Down Expand Up @@ -599,6 +599,8 @@ void Objecter::handle_osd_map(MOSDMap *m)

void Objecter::C_Op_Map_Latest::finish(int r)
{
lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest r=" << r << " tid=" << tid
<< " latest " << latest << dendl;
if (r == -EAGAIN) {
// ignore callback; we will retry in resend_mon_ops()
return;
Expand All @@ -609,22 +611,50 @@ void Objecter::C_Op_Map_Latest::finish(int r)
map<tid_t, Op*>::iterator iter =
objecter->check_latest_map_ops.find(tid);
if (iter == objecter->check_latest_map_ops.end()) {
lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest op " << tid << " not found" << dendl;
return;
}

Op *op = iter->second;
objecter->check_latest_map_ops.erase(iter);

if (r == 0) { // we had the latest map
if (op->onack) {
op->onack->complete(-ENOENT);
lgeneric_subdout(objecter->cct, objecter, 20) << "op_map_latest op " << op << dendl;

if (op->map_dne_bound == 0)
op->map_dne_bound = latest;

objecter->check_op_pool_dne(op);
}

void Objecter::check_op_pool_dne(Op *op)
{
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << op->map_dne_bound
<< dendl;
if (op->map_dne_bound > 0) {
if (osdmap->get_epoch() >= op->map_dne_bound) {
// we had a new enough map
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
<< " concluding pool " << op->pgid.pool() << " dne"
<< dendl;
if (op->onack) {
op->onack->complete(-ENOENT);
}
if (op->oncommit) {
op->oncommit->complete(-ENOENT);
}
op->session_item.remove_myself();
ops.erase(op->tid);
delete op;
}
if (op->oncommit) {
op->oncommit->complete(-ENOENT);
} else {
// ask the monitor
if (check_latest_map_ops.count(op->tid) == 0) {
check_latest_map_ops[op->tid] = op;
C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
monc->get_version("osdmap", &c->latest, NULL, c);
}
op->session_item.remove_myself();
objecter->ops.erase(op->tid);
delete op;
}
}

Expand All @@ -645,32 +675,39 @@ void Objecter::C_Linger_Map_Latest::finish(int r)

LingerOp *op = iter->second;
objecter->check_latest_map_lingers.erase(iter);

if (r == 0) { // we had the latest map
if (op->on_reg_ack) {
op->on_reg_ack->complete(-ENOENT);
}
if (op->on_reg_commit) {
op->on_reg_commit->complete(-ENOENT);
}
objecter->unregister_linger(op->linger_id);
}
op->put();
}

void Objecter::op_check_for_latest_map(Op *op)
{
check_latest_map_ops[op->tid] = op;
monc->is_latest_map("osdmap", osdmap->get_epoch(),
new C_Op_Map_Latest(this, op->tid));
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;

objecter->check_linger_pool_dne(op);
}

void Objecter::linger_check_for_latest_map(LingerOp *op)
void Objecter::check_linger_pool_dne(LingerOp *op)
{
op->get();
check_latest_map_lingers[op->linger_id] = op;
monc->is_latest_map("osdmap", osdmap->get_epoch(),
new C_Linger_Map_Latest(this, op->linger_id));
ldout(cct, 10) << "check_linger_pool_dne linger_id " << op->linger_id
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << op->map_dne_bound
<< dendl;
if (op->map_dne_bound > 0) {
if (osdmap->get_epoch() >= op->map_dne_bound) {
if (op->on_reg_ack) {
op->on_reg_ack->complete(-ENOENT);
}
if (op->on_reg_commit) {
op->on_reg_commit->complete(-ENOENT);
}
unregister_linger(op->linger_id);
}
} else {
// ask the monitor
if (check_latest_map_lingers.count(op->linger_id) == 0) {
op->get();
check_latest_map_lingers[op->linger_id] = op;
C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
monc->get_version("osdmap", &c->latest, NULL, c);
}
}
}

void Objecter::op_cancel_map_check(Op *op)
Expand Down Expand Up @@ -891,15 +928,15 @@ void Objecter::resend_mon_ops()
for (map<tid_t, Op*>::iterator p = check_latest_map_ops.begin();
p != check_latest_map_ops.end();
++p) {
monc->is_latest_map("osdmap", osdmap->get_epoch(),
new C_Op_Map_Latest(this, p->second->tid));
C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
monc->get_version("osdmap", &c->latest, NULL, c);
}

for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
p != check_latest_map_lingers.end();
++p) {
monc->is_latest_map("osdmap", osdmap->get_epoch(),
new C_Linger_Map_Latest(this, p->second->linger_id));
C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, p->second->linger_id);
monc->get_version("osdmap", &c->latest, NULL, c);
}
}

Expand Down Expand Up @@ -1025,7 +1062,7 @@ tid_t Objecter::_op_submit(Op *op)
}

if (check_for_latest_map) {
op_check_for_latest_map(op);
check_op_pool_dne(op);
}

ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
Expand All @@ -1050,7 +1087,11 @@ int Objecter::recalc_op_target(Op *op)
{
vector<int> acting;
pg_t pgid = op->pgid;
if (!op->precalc_pgid) {
if (op->precalc_pgid) {
ldout(cct, 10) << "recalc_op_target have " << pgid << " pool " << osdmap->have_pg_pool(pgid.pool()) << dendl;
if (!osdmap->have_pg_pool(pgid.pool()))
return RECALC_OP_TARGET_POOL_DNE;
} else {
int ret = osdmap->object_locator_to_pg(op->oid, op->oloc, pgid);
if (ret == -ENOENT)
return RECALC_OP_TARGET_POOL_DNE;
Expand Down
16 changes: 11 additions & 5 deletions src/osdc/Objecter.h
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ class Objecter {
utime_t stamp;

bool precalc_pgid;
epoch_t map_dne_bound;

bool budgeted;

Expand All @@ -631,6 +632,7 @@ class Objecter {
flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false),
map_dne_bound(0),
budgeted(false),
should_resend(true) {
ops.swap(op);
Expand All @@ -657,7 +659,8 @@ class Objecter {
struct C_Op_Map_Latest : public Context {
Objecter *objecter;
tid_t tid;
C_Op_Map_Latest(Objecter *o, tid_t t) : objecter(o), tid(t) {}
version_t latest;
C_Op_Map_Latest(Objecter *o, tid_t t) : objecter(o), tid(t), latest(0) {}
void finish(int r);
};

Expand Down Expand Up @@ -800,12 +803,14 @@ class Objecter {
xlist<LingerOp*>::item session_item;

tid_t register_tid;
epoch_t map_dne_bound;

LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
registered(false),
on_reg_ack(NULL), on_reg_commit(NULL),
session(NULL), session_item(this),
register_tid(0) {}
register_tid(0),
map_dne_bound(0) {}

// no copy!
const LingerOp &operator=(const LingerOp& r);
Expand Down Expand Up @@ -845,8 +850,9 @@ class Objecter {
struct C_Linger_Map_Latest : public Context {
Objecter *objecter;
uint64_t linger_id;
version_t latest;
C_Linger_Map_Latest(Objecter *o, uint64_t id) :
objecter(o), linger_id(id) {}
objecter(o), linger_id(id), latest(0) {}
void finish(int r);
};

Expand Down Expand Up @@ -895,9 +901,9 @@ class Objecter {
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);

void op_check_for_latest_map(Op *op);
void check_op_pool_dne(Op *op);
void op_cancel_map_check(Op *op);
void linger_check_for_latest_map(LingerOp *op);
void check_linger_pool_dne(LingerOp *op);
void linger_cancel_map_check(LingerOp *op);

void kick_requests(OSDSession *session);
Expand Down
9 changes: 6 additions & 3 deletions src/test/system/rados_list_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "cross_process_sem.h"
#include "include/rados/librados.h"
#include "include/stringify.h"
#include "st_rados_create_pool.h"
#include "st_rados_list_objects.h"
#include "systest_runnable.h"
Expand All @@ -30,6 +31,8 @@
#include <string>
#include <time.h>
#include <vector>
#include <sys/types.h>
#include <unistd.h>

using std::ostringstream;
using std::string;
Expand Down Expand Up @@ -68,7 +71,7 @@ class RadosDeleteObjectsR : public SysTestRunnable
pool_setup_sem->post();

rados_ioctx_t io_ctx;
RETURN1_IF_NOT_VAL(-EEXIST, rados_pool_create(cl, m_pool_name.c_str()));
rados_pool_create(cl, m_pool_name.c_str());
RETURN1_IF_NONZERO(rados_ioctx_create(cl, m_pool_name.c_str(), &io_ctx));

std::map <int, std::string> to_delete;
Expand Down Expand Up @@ -149,7 +152,7 @@ class RadosAddObjectsR : public SysTestRunnable
pool_setup_sem->post();

rados_ioctx_t io_ctx;
RETURN1_IF_NOT_VAL(-EEXIST, rados_pool_create(cl, m_pool_name.c_str()));
rados_pool_create(cl, m_pool_name.c_str());
RETURN1_IF_NONZERO(rados_ioctx_create(cl, m_pool_name.c_str(), &io_ctx));

std::map <int, std::string> to_add;
Expand Down Expand Up @@ -210,7 +213,7 @@ const char *get_id_str()
int main(int argc, const char **argv)
{
const char *num_objects = getenv("NUM_OBJECTS");
std::string pool = "foo";
std::string pool = "foo." + stringify(getpid());
if (num_objects) {
g_num_objects = atoi(num_objects);
if (g_num_objects == 0)
Expand Down
2 changes: 1 addition & 1 deletion src/test/system/rados_open_pools_parallel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class StRadosOpenPool : public SysTestRunnable
m_pool_setup_sem->wait();

printf("%s: rados_pool_create.\n", get_id_str());
RETURN1_IF_NOT_VAL(-EEXIST, rados_pool_create(cl, "foo"));
rados_pool_create(cl, "foo");
rados_ioctx_t io_ctx;
printf("%s: rados_ioctx_create.\n", get_id_str());
RETURN1_IF_NOT_VAL(0, rados_ioctx_create(cl, "foo", &io_ctx));
Expand Down
Loading

0 comments on commit ddde1c8

Please sign in to comment.