Skip to content

Commit

Permalink
Merge pull request #11680 from tchaikov/wip-17728
Browse files Browse the repository at this point in the history
test/ceph_test_msgr: do not use Message::middle for holding transient…

Reviewed-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
tchaikov committed Oct 29, 2016
2 parents d22d6ca + 56896a7 commit 15ebffa
Showing 1 changed file with 57 additions and 43 deletions.
100 changes: 57 additions & 43 deletions src/test/msgr/test_msgr.cc
Expand Up @@ -769,6 +769,34 @@ TEST_P(MessengerTest, MessageTest) {

class SyntheticWorkload;

struct Payload {
enum Who : uint8_t {
PING = 0,
PONG = 1,
};
uint8_t who;
uint64_t seq;
bufferlist data;

Payload(Who who, uint64_t seq, const bufferlist& data)
: who(who), seq(seq), data(data)
{}
Payload() = default;
DENC(Payload, v, p) {
DENC_START(1, 1, p);
denc(v.who, p);
denc(v.seq, p);
denc(v.data, p);
DENC_FINISH(p);
}
};
WRITE_CLASS_DENC(Payload)

ostream& operator<<(ostream& out, const Payload &pl)
{
return out << "reply=" << pl.who << " i = " << pl.seq;
}

class SyntheticDispatcher : public Dispatcher {
public:
Mutex lock;
Expand All @@ -779,7 +807,7 @@ class SyntheticDispatcher : public Dispatcher {
bool got_connect;
map<ConnectionRef, list<uint64_t> > conn_sent;
map<uint64_t, bufferlist> sent;
atomic_t index;
atomic<uint64_t> index;
SyntheticWorkload *workload;

SyntheticDispatcher(bool s, SyntheticWorkload *wl):
Expand Down Expand Up @@ -838,27 +866,24 @@ class SyntheticDispatcher : public Dispatcher {
return ;
}

uint64_t i;
bool reply;
assert(m->get_middle().length());
bufferlist::iterator blp = m->get_middle().begin();
::decode(i, blp);
::decode(reply, blp);
if (reply) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
reply_message(m, i);
Payload pl;
auto p = m->get_data().begin();
::decode(pl, p);
if (pl.who == Payload::PING) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
reply_message(m, pl);
m->put();
Mutex::Locker l(lock);
got_new = true;
cond.Signal();
} else {
Mutex::Locker l(lock);
if (sent.count(i)) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply=" << reply << " i=" << i << dendl;
ASSERT_EQ(conn_sent[m->get_connection()].front(), i);
ASSERT_TRUE(m->get_data().contents_equal(sent[i]));
if (sent.count(pl.seq)) {
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
conn_sent[m->get_connection()].pop_front();
sent.erase(i);
sent.erase(pl.seq);
}
m->put();
got_new = true;
Expand All @@ -873,35 +898,29 @@ class SyntheticDispatcher : public Dispatcher {
return true;
}

void reply_message(Message *m, uint64_t i) {
void reply_message(const Message *m, Payload& pl) {
pl.who = Payload::PONG;
bufferlist bl;
::encode(i, bl);
::encode(false, bl);
::encode(pl, bl);
MPing *rm = new MPing();
if (m->get_data_len())
rm->set_data(m->get_data());
if (m->get_middle().length())
rm->set_middle(bl);
rm->set_data(bl);
m->get_connection()->send_message(rm);
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << dendl;
lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
}

void send_message_wrap(ConnectionRef con, Message *m) {
{
void send_message_wrap(ConnectionRef con, const bufferlist& data) {
Message *m = new MPing();
Payload pl{Payload::PING, index++, data};
bufferlist bl;
::encode(pl, bl);
m->set_data(bl);
if (!con->get_messenger()->get_default_policy().lossy) {
Mutex::Locker l(lock);
bufferlist bl;
uint64_t i = index.read();
index.inc();
::encode(i, bl);
::encode(true, bl);
m->set_middle(bl);
if (!con->get_messenger()->get_default_policy().lossy) {
sent[i] = m->get_data();
conn_sent[con].push_back(i);
}
lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << dendl;
sent[pl.seq] = pl.data;
conn_sent[con].push_back(pl.seq);
}
ASSERT_EQ(con->send_message(m), 0);
lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
ASSERT_EQ(0, con->send_message(m));
}

uint64_t get_pending() {
Expand Down Expand Up @@ -1067,13 +1086,8 @@ class SyntheticWorkload {
m->set_priority(200);
conn->send_message(m);
} else {
Message *m = new MPing();
bufferlist bl;
boost::uniform_int<> u(0, rand_data.size()-1);
uint64_t index = u(rng);
bl = rand_data[index];
m->set_data(bl);
dispatcher.send_message_wrap(conn, m);
dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
}
}

Expand Down

0 comments on commit 15ebffa

Please sign in to comment.