Skip to content

Commit

Permalink
Merge pull request #6503 from XinzeChi/wip-repop-worker
Browse files Browse the repository at this point in the history
osd: defer decoding of MOSDRepOp/MOSDRepOpReply

Reviewed-by: Haomai Wang <haomai@xsky.com>
Reviewed-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Nov 28, 2015
2 parents 2a9d6cb + 22d2732 commit dc217d4
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 20 deletions.
33 changes: 24 additions & 9 deletions src/messages/MOSDRepOp.h
Expand Up @@ -34,9 +34,14 @@ class MOSDRepOp : public Message {
// metadata from original request
osd_reqid_t reqid;

spg_t pgid;

bufferlist::iterator p;
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
bool final_decode_needed;

// subop
pg_shard_t from;
spg_t pgid;
hobject_t poid;

__u8 acks_wanted;
Expand Down Expand Up @@ -64,10 +69,16 @@ class MOSDRepOp : public Message {
}

virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
p = payload.begin();
// splitted to partial and final
::decode(map_epoch, p);
::decode(reqid, p);
::decode(pgid, p);
}

void finish_decode() {
if (!final_decode_needed)
return; // Message is already final decoded
::decode(poid, p);

::decode(acks_wanted, p);
Expand All @@ -83,6 +94,7 @@ class MOSDRepOp : public Message {
::decode(from, p);
::decode(updated_hit_set_history, p);
::decode(pg_trim_rollback_to, p);
final_decode_needed = false;
}

virtual void encode_payload(uint64_t features) {
Expand All @@ -105,15 +117,17 @@ class MOSDRepOp : public Message {

MOSDRepOp()
: Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(0), acks_wanted (0) {}
map_epoch(0),
final_decode_needed(true), acks_wanted (0) {}
MOSDRepOp(osd_reqid_t r, pg_shard_t from,
spg_t p, const hobject_t& po, int aw,
epoch_t mape, ceph_tid_t rtid, eversion_t v)
: Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(mape),
reqid(r),
from(from),
pgid(p),
final_decode_needed(false),
from(from),
poid(po),
acks_wanted(aw),
version(v) {
Expand All @@ -126,11 +140,12 @@ class MOSDRepOp : public Message {
const char *get_type_name() const { return "osd_repop"; }
void print(ostream& out) const {
out << "osd_repop(" << reqid
<< " " << pgid
<< " " << poid;
out << " v " << version;
if (updated_hit_set_history)
out << ", has_updated_hit_set_history";
<< " " << pgid;
if (!final_decode_needed) {
out << " " << poid << " v " << version;
if (updated_hit_set_history)
out << ", has_updated_hit_set_history";
}
out << ")";
}
};
Expand Down
34 changes: 23 additions & 11 deletions src/messages/MOSDRepOpReply.h
Expand Up @@ -46,18 +46,26 @@ class MOSDRepOpReply : public Message {
// piggybacked osd state
eversion_t last_complete_ondisk;

bufferlist::iterator p;
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
bool final_decode_needed;

virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
p = payload.begin();
::decode(map_epoch, p);
::decode(reqid, p);
::decode(pgid, p);
}

void finish_decode() {
if (!final_decode_needed)
return; // Message is already final decoded
::decode(ack_type, p);
::decode(result, p);
::decode(last_complete_ondisk, p);

::decode(from, p);
final_decode_needed = false;
}
virtual void encode_payload(uint64_t features) {
::encode(map_epoch, payload);
Expand Down Expand Up @@ -91,12 +99,14 @@ class MOSDRepOpReply : public Message {
from(from),
pgid(req->pgid.pgid, req->from.shard),
ack_type(at),
result(result_) {
result(result_),
final_decode_needed(false) {
set_tid(req->get_tid());
}
MOSDRepOpReply()
: Message(MSG_OSD_REPOPREPLY), map_epoch(0),
ack_type(0), result(0) {}
ack_type(0), result(0),
final_decode_needed(true) {}
private:
~MOSDRepOpReply() {}

Expand All @@ -105,14 +115,16 @@ class MOSDRepOpReply : public Message {

void print(ostream& out) const {
out << "osd_repop_reply(" << reqid
<< " " << pgid;
if (ack_type & CEPH_OSD_FLAG_ONDISK)
out << " ondisk";
if (ack_type & CEPH_OSD_FLAG_ONNVRAM)
out << " onnvram";
if (ack_type & CEPH_OSD_FLAG_ACK)
out << " ack";
out << ", result = " << result;
<< " " << pgid;
if (!final_decode_needed) {
if (ack_type & CEPH_OSD_FLAG_ONDISK)
out << " ondisk";
if (ack_type & CEPH_OSD_FLAG_ONNVRAM)
out << " onnvram";
if (ack_type & CEPH_OSD_FLAG_ACK)
out << " ack";
out << ", result = " << result;
}
out << ")";
}

Expand Down
2 changes: 2 additions & 0 deletions src/messages/MOSDSubOp.h
Expand Up @@ -172,6 +172,8 @@ class MOSDSubOp : public Message {
}
}

void finish_decode() { }

virtual void encode_payload(uint64_t features) {
::encode(map_epoch, payload);
::encode(reqid, payload);
Expand Down
3 changes: 3 additions & 0 deletions src/messages/MOSDSubOpReply.h
Expand Up @@ -85,6 +85,9 @@ class MOSDSubOpReply : public Message {
pgid.shard = shard_id_t::NO_SHARD;
}
}

void finish_decode() { }

virtual void encode_payload(uint64_t features) {
::encode(map_epoch, payload);
::encode(reqid, payload);
Expand Down
2 changes: 2 additions & 0 deletions src/osd/ReplicatedBackend.cc
Expand Up @@ -674,6 +674,7 @@ template<typename T, int MSGTYPE>
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
T *r = static_cast<T *>(op->get_req());
r->finish_decode();
assert(r->get_header().type == MSGTYPE);
assert(MSGTYPE == MSG_OSD_SUBOPREPLY || MSGTYPE == MSG_OSD_REPOPREPLY);

Expand Down Expand Up @@ -1127,6 +1128,7 @@ template<typename T, int MSGTYPE>
void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
{
T *m = static_cast<T *>(op->get_req());
m->finish_decode();
int msg_type = m->get_type();
assert(MSGTYPE == msg_type);
assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
Expand Down

0 comments on commit dc217d4

Please sign in to comment.