Skip to content

Commit

Permalink
Merge pull request #5373: pg_interval_t::check_new_interval - for ec …
Browse files Browse the repository at this point in the history
…pool, should not rely on min_size to determine if the PG was active at the interval

Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
ldachary committed Aug 30, 2015
2 parents 5044bb4 + cd11b88 commit 07d3717
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 36 deletions.
8 changes: 4 additions & 4 deletions src/osd/ECBackend.h
Expand Up @@ -385,7 +385,7 @@ class ECBackend : public PGBackend {
*
* Determines the whether _have is suffient to recover an object
*/
class ECRecPred : public IsRecoverablePredicate {
class ECRecPred : public IsPGRecoverablePredicate {
set<int> want;
ErasureCodeInterfaceRef ec_impl;
public:
Expand All @@ -405,7 +405,7 @@ class ECBackend : public PGBackend {
return ec_impl->minimum_to_decode(want, have, &min) == 0;
}
};
IsRecoverablePredicate *get_is_recoverable_predicate() {
IsPGRecoverablePredicate *get_is_recoverable_predicate() {
return new ECRecPred(ec_impl);
}

Expand All @@ -414,7 +414,7 @@ class ECBackend : public PGBackend {
*
* Determines the whether _have is suffient to read an object
*/
class ECReadPred : public IsReadablePredicate {
class ECReadPred : public IsPGReadablePredicate {
pg_shard_t whoami;
ECRecPred rec_pred;
public:
Expand All @@ -425,7 +425,7 @@ class ECBackend : public PGBackend {
return _have.count(whoami) && rec_pred(_have);
}
};
IsReadablePredicate *get_is_readable_predicate() {
IsPGReadablePredicate *get_is_readable_predicate() {
return new ECReadPred(get_parent()->whoami_shard(), ec_impl);
}

Expand Down
3 changes: 3 additions & 0 deletions src/osd/OSD.cc
Expand Up @@ -3012,6 +3012,8 @@ void OSD::build_past_intervals_parallel()
}
assert(last_map);

boost::scoped_ptr<IsPGRecoverablePredicate> recoverable(
pg->get_is_recoverable_predicate());
std::stringstream debug;
bool new_interval = pg_interval_t::check_new_interval(
p.primary,
Expand All @@ -3024,6 +3026,7 @@ void OSD::build_past_intervals_parallel()
pg->info.history.last_epoch_clean,
cur_map, last_map,
pgid,
recoverable.get(),
&pg->past_intervals,
&debug);
if (new_interval) {
Expand Down
10 changes: 8 additions & 2 deletions src/osd/PG.cc
Expand Up @@ -694,6 +694,8 @@ void PG::generate_past_intervals()
pgid = pgid.get_ancestor(last_map->get_pg_num(pgid.pool()));
cur_map->pg_to_up_acting_osds(pgid, &up, &up_primary, &acting, &primary);

boost::scoped_ptr<IsPGRecoverablePredicate> recoverable(
get_is_recoverable_predicate());
std::stringstream debug;
bool new_interval = pg_interval_t::check_new_interval(
old_primary,
Expand All @@ -709,6 +711,7 @@ void PG::generate_past_intervals()
cur_map,
last_map,
pgid,
recoverable.get(),
&past_intervals,
&debug);
if (new_interval) {
Expand Down Expand Up @@ -1336,7 +1339,7 @@ bool PG::choose_acting(pg_shard_t &auth_log_shard_id)
}

/* Check whether we have enough acting shards to later perform recovery */
boost::scoped_ptr<PGBackend::IsRecoverablePredicate> recoverable_predicate(
boost::scoped_ptr<IsPGRecoverablePredicate> recoverable_predicate(
get_pgbackend()->get_is_recoverable_predicate());
set<pg_shard_t> have;
for (int i = 0; i < (int)want.size(); ++i) {
Expand Down Expand Up @@ -4742,6 +4745,8 @@ void PG::start_peering_interval(
info.history.same_interval_since = osdmap->get_epoch();
} else {
std::stringstream debug;
boost::scoped_ptr<IsPGRecoverablePredicate> recoverable(
get_is_recoverable_predicate());
bool new_interval = pg_interval_t::check_new_interval(
old_acting_primary.osd,
new_acting_primary,
Expand All @@ -4754,6 +4759,7 @@ void PG::start_peering_interval(
osdmap,
lastmap,
info.pgid.pgid,
recoverable.get(),
&past_intervals,
&debug);
dout(10) << __func__ << ": check_new_interval output: "
Expand Down Expand Up @@ -7471,7 +7477,7 @@ void PG::RecoveryState::RecoveryMachine::log_exit(const char *state_name, utime_
#define dout_prefix (*_dout << (debug_pg ? debug_pg->gen_prefix() : string()) << " PriorSet: ")

PG::PriorSet::PriorSet(bool ec_pool,
PGBackend::IsRecoverablePredicate *c,
IsPGRecoverablePredicate *c,
const OSDMap &osdmap,
const map<epoch_t, pg_interval_t> &past_intervals,
const vector<int> &up,
Expand Down
16 changes: 10 additions & 6 deletions src/osd/PG.h
Expand Up @@ -197,6 +197,10 @@ class PG {
void update_snap_mapper_bits(uint32_t bits) {
snap_mapper.update_bits(bits);
}
/// get_is_recoverable_predicate: caller owns returned pointer and must delete when done
IsPGRecoverablePredicate *get_is_recoverable_predicate() {
return get_pgbackend()->get_is_recoverable_predicate();
}
protected:
// Ops waiting for map, should be queued at back
Mutex map_lock;
Expand Down Expand Up @@ -315,13 +319,13 @@ class PG {
PG *pg;
set<pg_shard_t> empty_set;
public:
boost::scoped_ptr<PGBackend::IsReadablePredicate> is_readable;
boost::scoped_ptr<PGBackend::IsRecoverablePredicate> is_recoverable;
boost::scoped_ptr<IsPGReadablePredicate> is_readable;
boost::scoped_ptr<IsPGRecoverablePredicate> is_recoverable;
MissingLoc(PG *pg)
: pg(pg) {}
void set_backend_predicates(
PGBackend::IsReadablePredicate *_is_readable,
PGBackend::IsRecoverablePredicate *_is_recoverable) {
IsPGReadablePredicate *_is_readable,
IsPGRecoverablePredicate *_is_recoverable) {
is_readable.reset(_is_readable);
is_recoverable.reset(_is_recoverable);
}
Expand Down Expand Up @@ -492,9 +496,9 @@ class PG {
map<int, epoch_t> blocked_by; /// current lost_at values for any OSDs in cur set for which (re)marking them lost would affect cur set

bool pg_down; /// some down osds are included in @a cur; the DOWN pg state bit should be set.
boost::scoped_ptr<PGBackend::IsRecoverablePredicate> pcontdec;
boost::scoped_ptr<IsPGRecoverablePredicate> pcontdec;
PriorSet(bool ec_pool,
PGBackend::IsRecoverablePredicate *c,
IsPGRecoverablePredicate *c,
const OSDMap &osdmap,
const map<epoch_t, pg_interval_t> &past_intervals,
const vector<int> &up,
Expand Down
21 changes: 2 additions & 19 deletions src/osd/PGBackend.h
Expand Up @@ -318,25 +318,8 @@

virtual void on_flushed() = 0;

class IsRecoverablePredicate {
public:
/**
* have encodes the shards available
*/
virtual bool operator()(const set<pg_shard_t> &have) const = 0;
virtual ~IsRecoverablePredicate() {}
};
virtual IsRecoverablePredicate *get_is_recoverable_predicate() = 0;

class IsReadablePredicate {
public:
/**
* have encodes the shards available
*/
virtual bool operator()(const set<pg_shard_t> &have) const = 0;
virtual ~IsReadablePredicate() {}
};
virtual IsReadablePredicate *get_is_readable_predicate() = 0;
virtual IsPGRecoverablePredicate *get_is_recoverable_predicate() = 0;
virtual IsPGReadablePredicate *get_is_readable_predicate() = 0;

void temp_colls(list<coll_t> *out) {
if (temp_created)
Expand Down
8 changes: 4 additions & 4 deletions src/osd/ReplicatedBackend.h
Expand Up @@ -73,25 +73,25 @@ class ReplicatedBackend : public PGBackend {
void clear_recovery_state();
void on_flushed();

class RPCRecPred : public IsRecoverablePredicate {
class RPCRecPred : public IsPGRecoverablePredicate {
public:
bool operator()(const set<pg_shard_t> &have) const {
return !have.empty();
}
};
IsRecoverablePredicate *get_is_recoverable_predicate() {
IsPGRecoverablePredicate *get_is_recoverable_predicate() {
return new RPCRecPred;
}

class RPCReadPred : public IsReadablePredicate {
class RPCReadPred : public IsPGReadablePredicate {
pg_shard_t whoami;
public:
RPCReadPred(pg_shard_t whoami) : whoami(whoami) {}
bool operator()(const set<pg_shard_t> &have) const {
return have.count(whoami);
}
};
IsReadablePredicate *get_is_readable_predicate() {
IsPGReadablePredicate *get_is_readable_predicate() {
return new RPCReadPred(get_parent()->whoami_shard());
}

Expand Down
18 changes: 17 additions & 1 deletion src/osd/osd_types.cc
Expand Up @@ -931,6 +931,16 @@ void pg_pool_t::dump(Formatter *f) const
f->dump_unsigned("expected_num_objects", expected_num_objects);
}

void pg_pool_t::convert_to_pg_shards(const vector<int> &from, set<pg_shard_t>* to) const {
for (size_t i = 0; i < from.size(); ++i) {
if (from[i] != CRUSH_ITEM_NONE) {
to->insert(
pg_shard_t(
from[i],
ec_pool() ? shard_id_t(i) : shard_id_t::NO_SHARD));
}
}
}

int pg_pool_t::calc_bits_of(int t)
{
Expand Down Expand Up @@ -2607,6 +2617,7 @@ bool pg_interval_t::check_new_interval(
OSDMapRef osdmap,
OSDMapRef lastmap,
pg_t pgid,
IsPGRecoverablePredicate *could_have_gone_active,
map<epoch_t, pg_interval_t> *past_intervals,
std::ostream *out)
{
Expand Down Expand Up @@ -2640,9 +2651,14 @@ bool pg_interval_t::check_new_interval(
if (*p != CRUSH_ITEM_NONE)
++num_acting;

const pg_pool_t& old_pg_pool = lastmap->get_pools().find(pgid.pool())->second;
set<pg_shard_t> old_acting_shards;
old_pg_pool.convert_to_pg_shards(old_acting, &old_acting_shards);

if (num_acting &&
i.primary != -1 &&
num_acting >= lastmap->get_pools().find(pgid.pool())->second.min_size) {
num_acting >= old_pg_pool.min_size &&
(*could_have_gone_active)(old_acting_shards)) {
if (out)
*out << "generate_past_intervals " << i
<< ": not rw,"
Expand Down
22 changes: 22 additions & 0 deletions src/osd/osd_types.h
Expand Up @@ -95,6 +95,24 @@ WRITE_EQ_OPERATORS_2(pg_shard_t, osd, shard)
WRITE_CMP_OPERATORS_2(pg_shard_t, osd, shard)
ostream &operator<<(ostream &lhs, const pg_shard_t &rhs);

class IsPGRecoverablePredicate {
public:
/**
* have encodes the shards available
*/
virtual bool operator()(const set<pg_shard_t> &have) const = 0;
virtual ~IsPGRecoverablePredicate() {}
};

class IsPGReadablePredicate {
public:
/**
* have encodes the shards available
*/
virtual bool operator()(const set<pg_shard_t> &have) const = 0;
virtual ~IsPGReadablePredicate() {}
};

inline ostream& operator<<(ostream& out, const osd_reqid_t& r) {
return out << r.name << "." << r.inc << ":" << r.tid;
}
Expand Down Expand Up @@ -879,6 +897,9 @@ struct pg_pool_t {
return 0;
}

/// converts the acting/up vector to a set of pg shards
void convert_to_pg_shards(const vector<int> &from, set<pg_shard_t>* to) const;

typedef enum {
CACHEMODE_NONE = 0, ///< no caching
CACHEMODE_WRITEBACK = 1, ///< write to cache, flush later
Expand Down Expand Up @@ -1889,6 +1910,7 @@ struct pg_interval_t {
ceph::shared_ptr<const OSDMap> osdmap, ///< [in] current map
ceph::shared_ptr<const OSDMap> lastmap, ///< [in] last map
pg_t pgid, ///< [in] pgid for pg
IsPGRecoverablePredicate *could_have_gone_active, /// [in] predicate whether the pg can be active
map<epoch_t, pg_interval_t> *past_intervals,///< [out] intervals
ostream *out = 0 ///< [out] debug ostream
);
Expand Down
14 changes: 14 additions & 0 deletions src/test/osd/types.cc
Expand Up @@ -20,6 +20,7 @@
#include "osd/OSDMap.h"
#include "gtest/gtest.h"
#include "common/Thread.h"
#include "osd/ReplicatedBackend.h"

#include <sstream>

Expand Down Expand Up @@ -139,6 +140,7 @@ TEST(pg_interval_t, check_new_interval)
int64_t pool_id = 200;
int pg_num = 4;
__u8 min_size = 2;
boost::scoped_ptr<IsPGRecoverablePredicate> recoverable(new ReplicatedBackend::RPCRecPred());
{
OSDMap::Incremental inc(epoch + 1);
inc.new_pools[pool_id].min_size = min_size;
Expand Down Expand Up @@ -183,6 +185,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_TRUE(past_intervals.empty());
}
Expand Down Expand Up @@ -212,6 +215,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_EQ((unsigned int)1, past_intervals.size());
ASSERT_EQ(same_interval_since, past_intervals[same_interval_since].first);
Expand Down Expand Up @@ -244,6 +248,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
old_primary = new_primary;
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down Expand Up @@ -277,6 +282,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_EQ((unsigned int)1, past_intervals.size());
ASSERT_EQ(same_interval_since, past_intervals[same_interval_since].first);
Expand Down Expand Up @@ -308,6 +314,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_EQ((unsigned int)1, past_intervals.size());
ASSERT_EQ(same_interval_since, past_intervals[same_interval_since].first);
Expand Down Expand Up @@ -346,6 +353,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_EQ((unsigned int)1, past_intervals.size());
ASSERT_EQ(same_interval_since, past_intervals[same_interval_since].first);
Expand Down Expand Up @@ -384,6 +392,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals));
ASSERT_EQ((unsigned int)1, past_intervals.size());
ASSERT_EQ(same_interval_since, past_intervals[same_interval_since].first);
Expand Down Expand Up @@ -417,6 +426,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals,
&out));
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down Expand Up @@ -468,6 +478,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals,
&out));
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down Expand Up @@ -502,6 +513,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals,
&out));
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down Expand Up @@ -546,6 +558,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals,
&out));
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down Expand Up @@ -594,6 +607,7 @@ TEST(pg_interval_t, check_new_interval)
osdmap,
lastmap,
pgid,
recoverable.get(),
&past_intervals,
&out));
ASSERT_EQ((unsigned int)1, past_intervals.size());
Expand Down

0 comments on commit 07d3717

Please sign in to comment.