Skip to content

Commit

Permalink
Merge pull request ceph#3742 from ceph/wip-10788
Browse files Browse the repository at this point in the history
osd: proxy features with proxied reads; only proxy reads to new peers

Reviewed-by: Samuel Just <sjust@redhat.com>
  • Loading branch information
Samuel Just committed Feb 24, 2015
2 parents 3857228 + 91cda52 commit 29861b1
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 31 deletions.
1 change: 1 addition & 0 deletions src/include/ceph_features.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#define CEPH_FEATURE_OSD_MIN_SIZE_RECOVERY (1ULL<<49)
// duplicated since it was introduced at the same time as MIN_SIZE_RECOVERY
#define CEPH_FEATURE_OSD_DEGRADED_WRITES (1ULL<<49)
#define CEPH_FEATURE_OSD_PROXY_FEATURES (1ULL<<49) /* overlap w/ above */

#define CEPH_FEATURE_RESERVED2 (1ULL<<61) /* slow down, we are almost out... */
#define CEPH_FEATURE_RESERVED (1ULL<<62) /* DO NOT USE THIS ... last bit! */
Expand Down
20 changes: 15 additions & 5 deletions src/messages/MOSDBoot.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class MOSDBoot : public PaxosServiceMessage {

static const int HEAD_VERSION = 5;
static const int HEAD_VERSION = 6;
static const int COMPAT_VERSION = 2;

public:
Expand All @@ -31,21 +31,24 @@ class MOSDBoot : public PaxosServiceMessage {
entity_addr_t cluster_addr;
epoch_t boot_epoch; // last epoch this daemon was added to the map (if any)
map<string,string> metadata; ///< misc metadata about this osd
uint64_t osd_features;

MOSDBoot()
: PaxosServiceMessage(MSG_OSD_BOOT, 0, HEAD_VERSION, COMPAT_VERSION),
boot_epoch(0)
boot_epoch(0), osd_features(0)
{ }
MOSDBoot(OSDSuperblock& s, epoch_t be,
const entity_addr_t& hb_back_addr_ref,
const entity_addr_t& hb_front_addr_ref,
const entity_addr_t& cluster_addr_ref)
const entity_addr_t& cluster_addr_ref,
uint64_t feat)
: PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch, HEAD_VERSION, COMPAT_VERSION),
sb(s),
hb_back_addr(hb_back_addr_ref),
hb_front_addr(hb_front_addr_ref),
cluster_addr(cluster_addr_ref),
boot_epoch(be)
boot_epoch(be),
osd_features(feat)
{ }

private:
Expand All @@ -54,7 +57,9 @@ class MOSDBoot : public PaxosServiceMessage {
public:
const char *get_type_name() const { return "osd_boot"; }
void print(ostream& out) const {
out << "osd_boot(osd." << sb.whoami << " booted " << boot_epoch << " v" << version << ")";
out << "osd_boot(osd." << sb.whoami << " booted " << boot_epoch
<< " features " << osd_features
<< " v" << version << ")";
}

void encode_payload(uint64_t features) {
Expand All @@ -65,6 +70,7 @@ class MOSDBoot : public PaxosServiceMessage {
::encode(boot_epoch, payload);
::encode(hb_front_addr, payload);
::encode(metadata, payload);
::encode(osd_features, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
Expand All @@ -79,6 +85,10 @@ class MOSDBoot : public PaxosServiceMessage {
::decode(hb_front_addr, p);
if (header.version >= 5)
::decode(metadata, p);
if (header.version >= 6)
::decode(osd_features, p);
else
osd_features = 0;
}
};

Expand Down
22 changes: 19 additions & 3 deletions src/messages/MOSDOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class OSD;

class MOSDOp : public Message {

static const int HEAD_VERSION = 4;
static const int HEAD_VERSION = 5;
static const int COMPAT_VERSION = 3;

private:
Expand All @@ -54,6 +54,8 @@ class MOSDOp : public Message {
snapid_t snap_seq;
vector<snapid_t> snaps;

uint64_t features;

public:
friend class MOSDOpReply;

Expand Down Expand Up @@ -94,11 +96,12 @@ class MOSDOp : public Message {
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
MOSDOp(int inc, long tid,
object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch,
int _flags)
int _flags, uint64_t feat)
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
oid(_oid), oloc(_oloc), pgid(_pgid) {
oid(_oid), oloc(_oloc), pgid(_pgid),
features(feat) {
set_tid(tid);
}
private:
Expand Down Expand Up @@ -143,6 +146,12 @@ class MOSDOp : public Message {
add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
}

uint64_t get_features() const {
if (features)
return features;
return get_connection()->get_features();
}

// flags
int get_flags() const { return flags; }

Expand Down Expand Up @@ -251,6 +260,7 @@ struct ceph_osd_request_head {
::encode(snaps, payload);

::encode(retry_attempt, payload);
::encode(features, payload);
}
}

Expand Down Expand Up @@ -297,6 +307,7 @@ struct ceph_osd_request_head {
oid.name.length()));

retry_attempt = -1;
features = 0;
} else {
// new decode
::decode(client_inc, p);
Expand Down Expand Up @@ -332,6 +343,11 @@ struct ceph_osd_request_head {
::decode(retry_attempt, p);
else
retry_attempt = -1;

if (header.version >= 5)
::decode(features, p);
else
features = 0;
}

OSDOp::split_osd_op_vector_in_data(ops, data);
Expand Down
3 changes: 2 additions & 1 deletion src/mon/Monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2968,7 +2968,8 @@ void Monitor::forward_request_leader(PaxosServiceMessage *req)
routed_requests[rr->tid] = rr;
session->routed_request_tids.insert(rr->tid);

dout(10) << "forward_request " << rr->tid << " request " << *req << dendl;
dout(10) << "forward_request " << rr->tid << " request " << *req
<< " features " << rr->con_features << dendl;

MForward *forward = new MForward(rr->tid, req,
rr->con_features,
Expand Down
7 changes: 6 additions & 1 deletion src/mon/OSDMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1779,8 +1779,13 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m)
xi.laggy_probability * (1.0 - g_conf->mon_osd_laggy_weight);
dout(10) << " laggy, now xi " << xi << dendl;
}

// set features shared by the osd
xi.features = m->get_connection()->get_features();
if (m->osd_features)
xi.features = m->osd_features;
else
xi.features = m->get_connection()->get_features();

pending_inc.new_xinfo[from] = xi;

// wait
Expand Down
2 changes: 1 addition & 1 deletion src/msg/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class Message : public RefCountedObject {
completion_hook->complete(0);
}
public:
inline const ConnectionRef& get_connection() { return connection; }
inline const ConnectionRef& get_connection() const { return connection; }
void set_connection(const ConnectionRef& c) {
connection = c;
}
Expand Down
3 changes: 2 additions & 1 deletion src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4414,7 +4414,8 @@ void OSD::_send_boot()
}

MOSDBoot *mboot = new MOSDBoot(superblock, service.get_boot_epoch(),
hb_back_addr, hb_front_addr, cluster_addr);
hb_back_addr, hb_front_addr, cluster_addr,
CEPH_FEATURES_ALL);
dout(10) << " client_addr " << client_messenger->get_myaddr()
<< ", cluster_addr " << cluster_addr
<< ", hb_back_addr " << hb_back_addr
Expand Down
17 changes: 12 additions & 5 deletions src/osd/OSDMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1090,22 +1090,26 @@ uint64_t OSDMap::get_features(int entity_type, uint64_t *pmask) const
return features;
}

uint64_t OSDMap::get_up_osd_features() const
void OSDMap::_calc_up_osd_features()
{
bool first = true;
uint64_t features = 0;
cached_up_osd_features = 0;
for (int osd = 0; osd < max_osd; ++osd) {
if (!is_up(osd))
continue;
const osd_xinfo_t &xi = get_xinfo(osd);
if (first) {
features = xi.features;
cached_up_osd_features = xi.features;
first = false;
} else {
features &= xi.features;
cached_up_osd_features &= xi.features;
}
}
return features;
}

uint64_t OSDMap::get_up_osd_features() const
{
return cached_up_osd_features;
}

void OSDMap::dedup(const OSDMap *o, OSDMap *n)
Expand Down Expand Up @@ -1258,6 +1262,7 @@ int OSDMap::apply_incremental(const Incremental &inc)
return -EINVAL;

assert(inc.epoch == epoch+1);

epoch++;
modified = inc.modified;

Expand Down Expand Up @@ -1439,6 +1444,7 @@ int OSDMap::apply_incremental(const Incremental &inc)
}

calc_num_osds();
_calc_up_osd_features();
return 0;
}

Expand Down Expand Up @@ -2196,6 +2202,7 @@ void OSDMap::post_decode()
}

calc_num_osds();
_calc_up_osd_features();
}

void OSDMap::dump_erasure_code_profiles(const map<string,map<string,string> > &profiles,
Expand Down
5 changes: 5 additions & 0 deletions src/osd/OSDMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,13 @@ class OSDMap {
string cluster_snapshot;
bool new_blacklist_entries;

mutable uint64_t cached_up_osd_features;

mutable bool crc_defined;
mutable uint32_t crc;

void _calc_up_osd_features();

public:
bool have_crc() const { return crc_defined; }
uint32_t get_crc() const { return crc; }
Expand All @@ -269,6 +273,7 @@ class OSDMap {
osd_uuid(new vector<uuid_d>),
cluster_snapshot_epoch(0),
new_blacklist_entries(false),
cached_up_osd_features(0),
crc_defined(false), crc(0),
crush(new CrushWrapper) {
memset(&fsid, 0, sizeof(fsid));
Expand Down
42 changes: 32 additions & 10 deletions src/osd/ReplicatedPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1843,10 +1843,14 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op,
dout(25) << __func__ << " " << obc->obs.oi << " "
<< (obc->obs.exists ? "exists" : "DNE")
<< " missing_oid " << missing_oid
<< " must_promote " << (int)must_promote
<< " in_hit_set " << (int)in_hit_set
<< dendl;
else
dout(25) << __func__ << " (no obc)"
<< " missing_oid " << missing_oid
<< " must_promote " << (int)must_promote
<< " in_hit_set " << (int)in_hit_set
<< dendl;

// if it is write-ordered and blocked, stop now
Expand All @@ -1873,13 +1877,23 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op,
return true;
}

// older versions do not proxy the feature bits.
bool can_proxy_read = get_osdmap()->get_up_osd_features() &
CEPH_FEATURE_OSD_PROXY_FEATURES;
OpRequestRef promote_op;

switch (pool.info.cache_mode) {
case pg_pool_t::CACHEMODE_WRITEBACK:
if (agent_state &&
agent_state->evict_mode == TierAgentState::EVICT_MODE_FULL) {
if (!op->may_write() && !op->may_cache() && !write_ordered) {
dout(20) << __func__ << " cache pool full, proxying read" << dendl;
do_proxy_read(op);
if (can_proxy_read) {
dout(20) << __func__ << " cache pool full, proxying read" << dendl;
do_proxy_read(op);
} else {
dout(20) << __func__ << " cache pool full, redirect read" << dendl;
do_cache_redirect(op);
}
return true;
}
dout(20) << __func__ << " cache pool full, waiting" << dendl;
Expand All @@ -1894,8 +1908,10 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op,
return true;
}

// Always proxy
do_proxy_read(op);
if (can_proxy_read)
do_proxy_read(op);
else
promote_op = op; // for non-proxy case promote_object needs this

// Avoid duplicate promotion
if (obc.get() && obc->is_blocked()) {
Expand All @@ -1905,17 +1921,19 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op,
// Promote too?
switch (pool.info.min_read_recency_for_promote) {
case 0:
promote_object(obc, missing_oid, oloc, OpRequestRef());
promote_object(obc, missing_oid, oloc, promote_op);
break;
case 1:
// Check if in the current hit set
if (in_hit_set) {
promote_object(obc, missing_oid, oloc, OpRequestRef());
promote_object(obc, missing_oid, oloc, promote_op);
} else if (!can_proxy_read) {
do_cache_redirect(op);
}
break;
default:
if (in_hit_set) {
promote_object(obc, missing_oid, oloc, OpRequestRef());
promote_object(obc, missing_oid, oloc, promote_op);
} else {
// Check if in other hit sets
map<time_t,HitSetRef>::iterator itor;
Expand All @@ -1927,7 +1945,9 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op,
}
}
if (in_other_hit_sets) {
promote_object(obc, missing_oid, oloc, OpRequestRef());
promote_object(obc, missing_oid, oloc, promote_op);
} else if (!can_proxy_read) {
do_cache_redirect(op);
}
}
break;
Expand Down Expand Up @@ -2079,7 +2099,8 @@ void ReplicatedPG::do_proxy_read(OpRequestRef op)
m->get_snapid(), NULL,
flags, new C_OnFinisher(fin, &osd->objecter_finisher),
&prdop->user_version,
&prdop->data_offset);
&prdop->data_offset,
m->get_features());
fin->tid = tid;
prdop->objecter_tid = tid;
proxyread_ops[tid] = prdop;
Expand Down Expand Up @@ -6165,7 +6186,8 @@ int ReplicatedPG::fill_in_copy_get(
return result;
}

uint64_t features = ctx->op->get_req()->get_connection()->get_features();
MOSDOp *op = reinterpret_cast<MOSDOp*>(ctx->op->get_req());
uint64_t features = op->get_features();

bool async_read_started = false;
object_copy_data_t _reply_obj;
Expand Down
2 changes: 1 addition & 1 deletion src/osdc/Objecter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2719,7 +2719,7 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
op->target.target_oid, op->target.target_oloc,
op->target.pgid,
osdmap->get_epoch(),
flags);
flags, op->features);

m->set_snapid(op->snapid);
m->set_snap_seq(op->snapc.seq);
Expand Down
Loading

0 comments on commit 29861b1

Please sign in to comment.