Skip to content

Commit

Permalink
MOSDOp::decode : Splitting message decoding, new version
Browse files Browse the repository at this point in the history
Signed-off-by: Jacek J. Lakis <jacek.lakis@intel.com>
  • Loading branch information
Jacek J. Lakis authored and root committed Aug 6, 2015
1 parent abd453c commit b3aee8c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 67 deletions.
2 changes: 2 additions & 0 deletions src/include/ceph_features.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
// duplicated since it was introduced at the same time as MIN_SIZE_RECOVERY
#define CEPH_FEATURE_OSD_PROXY_FEATURES (1ULL<<49) /* overlap w/ above */
#define CEPH_FEATURE_MON_METADATA (1ULL<<50)
#define CEPH_FEATURE_NEW_OSDOP_ENCODING (1ULL<<51) /* New, v6 encoding */

#define CEPH_FEATURE_RESERVED2 (1ULL<<61) /* slow down, we are almost out... */
#define CEPH_FEATURE_RESERVED (1ULL<<62) /* DO NOT USE THIS ... last bit! */
Expand Down Expand Up @@ -126,6 +127,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
CEPH_FEATURE_OSD_HBMSGS | \
CEPH_FEATURE_MDSENC | \
CEPH_FEATURE_OSDHASHPSPOOL | \
CEPH_FEATURE_NEW_OSDOP_ENCODING | \
CEPH_FEATURE_MON_SINGLE_PAXOS | \
CEPH_FEATURE_OSD_SNAPMAPPER | \
CEPH_FEATURE_MON_SCRUB | \
Expand Down
185 changes: 125 additions & 60 deletions src/messages/MOSDOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class OSD;

class MOSDOp : public Message {

static const int HEAD_VERSION = 5;
static const int HEAD_VERSION = 6;
static const int COMPAT_VERSION = 3;

private:
Expand All @@ -46,6 +46,11 @@ class MOSDOp : public Message {
object_t oid;
object_locator_t oloc;
pg_t pgid;
bufferlist::iterator p;
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
bool partial_decode_needed;
bool final_decode_needed;
//
public:
vector<OSDOp> ops;
private:
Expand All @@ -59,48 +64,57 @@ class MOSDOp : public Message {
public:
friend class MOSDOpReply;

// read
const snapid_t& get_snapid() { return snapid; }
void set_snapid(const snapid_t& s) { snapid = s; }
// writ
const snapid_t& get_snap_seq() const { return snap_seq; }
const vector<snapid_t> &get_snaps() const { return snaps; }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
void set_snap_seq(const snapid_t& s) { snap_seq = s; }

osd_reqid_t get_reqid() const {
return osd_reqid_t(get_orig_source(),
client_inc,
header.tid);
}
int get_client_inc() { return client_inc; }
ceph_tid_t get_client_tid() { return header.tid; }

object_t& get_oid() { return oid; }

const pg_t& get_pg() const { return pgid; }

const object_locator_t& get_object_locator() const {
return oloc;
void set_snapid(const snapid_t& s) { snapid = s; }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
void set_snap_seq(const snapid_t& s) { snap_seq = s; }

epoch_t get_map_epoch() { return osdmap_epoch; }

const eversion_t& get_version() { return reassert_version; }

utime_t get_mtime() { return mtime; }
// Fields decoded in partial decoding
const pg_t& get_pg() const { assert(!partial_decode_needed); return pgid; }
epoch_t get_map_epoch() { assert(!partial_decode_needed); return osdmap_epoch; }
int get_flags() const { assert(!partial_decode_needed); return flags; }

// Fields decoded in final decoding
int get_client_inc() { assert(!final_decode_needed); return client_inc; }
utime_t get_mtime() { assert(!final_decode_needed); return mtime; }
const eversion_t& get_version() { assert(!final_decode_needed); return reassert_version; }
const object_locator_t& get_object_locator() const { assert(!final_decode_needed); return oloc; }
object_t& get_oid() { assert(!final_decode_needed); return oid; }
const snapid_t& get_snapid() { assert(!final_decode_needed); return snapid; }
const snapid_t& get_snap_seq() const { assert(!final_decode_needed); return snap_seq; }
const vector<snapid_t> &get_snaps() const { assert(!final_decode_needed); return snaps; }
/**
* get retry attempt
*
* 0 is the first attempt.
*
* @return retry attempt, or -1 if we don't know
*/
int get_retry_attempt() const {
return retry_attempt;
}
uint64_t get_features() const {
if (features)
return features;
return get_connection()->get_features();
}

MOSDOp()
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), partial_decode_needed(true), final_decode_needed(true) { }
MOSDOp(int inc, long tid,
object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch,
int _flags, uint64_t feat)
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
oid(_oid), oloc(_oloc), pgid(_pgid),
oid(_oid), oloc(_oloc), pgid(_pgid), partial_decode_needed(false), final_decode_needed(false),
features(feat) {
set_tid(tid);
}
Expand Down Expand Up @@ -146,16 +160,7 @@ class MOSDOp : public Message {
add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
}

uint64_t get_features() const {
if (features)
return features;
return get_connection()->get_features();
}

// flags
int get_flags() const { return flags; }
bool has_flag(__u32 flag) { return flags & flag; };

bool wants_ack() const { return flags & CEPH_OSD_FLAG_ACK; }
bool wants_ondisk() const { return flags & CEPH_OSD_FLAG_ONDISK; }
bool wants_onnvram() const { return flags & CEPH_OSD_FLAG_ONNVRAM; }
Expand All @@ -173,17 +178,6 @@ class MOSDOp : public Message {
retry_attempt = a;
}

/**
* get retry attempt
*
* 0 is the first attempt.
*
* @return retry attempt, or -1 if we don't know
*/
int get_retry_attempt() const {
return retry_attempt;
}

// marshalling
virtual void encode_payload(uint64_t features) {

Expand Down Expand Up @@ -240,16 +234,39 @@ struct ceph_osd_request_head {

::encode_nohead(oid.name, payload);
::encode_nohead(snaps, payload);
} else {
header.version = HEAD_VERSION;
} else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
header.version = 5;
::encode(client_inc, payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
::encode(mtime, payload);
::encode(reassert_version, payload);

::encode(oloc, payload);
::encode(pgid, payload);

::encode(oid, payload);

__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);

::encode(snapid, payload);
::encode(snap_seq, payload);
::encode(snaps, payload);

::encode(retry_attempt, payload);
::encode(features, payload);
} else {
// new, reordered, v6 message encoding
header.version = HEAD_VERSION;
::encode(pgid, payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
::encode(client_inc, payload);
::encode(mtime, payload);
::encode(reassert_version, payload);
::encode(oloc, payload);
::encode(oid, payload);

__u16 num_ops = ops.size();
Expand All @@ -267,7 +284,8 @@ struct ceph_osd_request_head {
}

virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
assert(partial_decode_needed && final_decode_needed);
p = payload.begin();

if (header.version < 2) {
// old decode
Expand Down Expand Up @@ -310,8 +328,11 @@ struct ceph_osd_request_head {

retry_attempt = -1;
features = 0;
} else {
// new decode
OSDOp::split_osd_op_vector_in_data(ops, data);

// In old versions, final decoding is done in first step
final_decode_needed = false;
} else if (header.version < 6) {
::decode(client_inc, p);
::decode(osdmap_epoch, p);
::decode(flags, p);
Expand All @@ -328,31 +349,75 @@ struct ceph_osd_request_head {
::decode(pgid, p);
}

} else {
// new, v6 decode, splitted to partial and final
::decode(pgid, p);
::decode(osdmap_epoch, p);
::decode(flags, p);
}

partial_decode_needed = false;

}

void finish_decode() {
assert(!partial_decode_needed); // partial decoding required

if (!final_decode_needed)
return; //Message is already final decoded

if (header.version < 6) {
::decode(oid, p);

//::decode(ops, p);
__u16 num_ops;
::decode(num_ops, p);
ops.resize(num_ops);
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
::decode(ops[i].op, p);

::decode(snapid, p);
::decode(snap_seq, p);
::decode(snaps, p);

if (header.version >= 4)
::decode(retry_attempt, p);
::decode(retry_attempt, p);
else
retry_attempt = -1;
retry_attempt = -1;

if (header.version >= 5)
::decode(features, p);
::decode(features, p);
else
features = 0;
features = 0;

OSDOp::split_osd_op_vector_in_data(ops, data);

} else {
::decode(client_inc, p);
::decode(mtime, p);
::decode(reassert_version, p);
::decode(oloc, p);
::decode(oid, p);

__u16 num_ops;
::decode(num_ops, p);
ops.resize(num_ops);
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);

::decode(snapid, p);
::decode(snap_seq, p);
::decode(snaps, p);

::decode(retry_attempt, p);

::decode(features, p);

OSDOp::split_osd_op_vector_in_data(ops, data);

}

OSDOp::split_osd_op_vector_in_data(ops, data);
final_decode_needed = false;
}

void clear_buffers() {
Expand Down Expand Up @@ -386,7 +451,7 @@ struct ceph_osd_request_head {
out << " RETRY=" << get_retry_attempt();
if (reassert_version != eversion_t())
out << " reassert_version=" << reassert_version;
if (get_snap_seq())
if (!final_decode_needed)
out << " snapc " << get_snap_seq() << "=" << snaps;
out << " " << ceph_osd_flag_string(get_flags());
out << " e" << osdmap_epoch;
Expand Down
3 changes: 0 additions & 3 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7959,9 +7959,6 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
return;
}

// we don't need encoded payload anymore
m->clear_payload();

// set up a map send if the Op gets blocked for some reason
send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
Session *client_session =
Expand Down
12 changes: 8 additions & 4 deletions src/osd/ReplicatedPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,10 +1245,6 @@ void ReplicatedPG::do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle)
{
if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}
assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
if (can_discard_request(op)) {
return;
Expand Down Expand Up @@ -1373,6 +1369,9 @@ void ReplicatedPG::do_op(OpRequestRef& op)
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);

m->finish_decode();
m->clear_payload();

if (op->rmw_flags == 0) {
int r = osd->osd->init_op_flags(op);
if (r) {
Expand All @@ -1389,6 +1388,11 @@ void ReplicatedPG::do_op(OpRequestRef& op)
return do_pg_op(op);
}

if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}

// object name too long?
unsigned max_name_len = MIN(g_conf->osd_max_object_name_len,
osd->osd->store->get_max_object_name_length());
Expand Down

0 comments on commit b3aee8c

Please sign in to comment.