Skip to content

Commit

Permalink
osd: recover partial content
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yao <yaoning@ruijie.com.cn>
  • Loading branch information
Ning Yao committed Jan 22, 2016
1 parent f0756c3 commit 8a5c25e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 23 deletions.
50 changes: 38 additions & 12 deletions src/osd/ReplicatedBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1308,14 +1308,20 @@ void ReplicatedBackend::calc_head_subsets(
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets, bool& can_recover_partial)
{
dout(10) << "calc_head_subsets " << head
<< " clone_overlap " << snapset.clone_overlap << dendl;

uint64_t size = obc->obs.oi.size;
if (size)
data_subset.insert(0, size);
map<hobject_t, pair<bool, interval_set<uint64_t> >, hobject_t::ComparatorWithDefault>::const_iterator it
= missing.missing_range.find(head);
assert(it != missing.missing_range.end());
can_recover_partial = it->second.first;
if (can_recover_partial)
data_subset.intersection_of(it->second.second);

if (get_parent()->get_pool().allow_incomplete_clones()) {
dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
Expand Down Expand Up @@ -1520,7 +1526,14 @@ void ReplicatedBackend::prepare_pull(
} else {
// pulling head or unversioned object.
// always pull the whole thing.
recovery_info.copy_subset.insert(0, (uint64_t)-1);
map<hobject_t, pair<bool, interval_set<uint64_t> >, hobject_t::ComparatorWithDefault>::const_iterator it =
get_parent()->get_local_missing().missing_range.find(soid);
assert(it != get_parent()->get_local_missing().missing_range.end());
recovery_info.can_recover_partial = it->second.first;
if (recovery_info.can_recover_partial)
recovery_info.copy_subset.insert(it->second.second);
else
recovery_info.copy_subset.insert(0, (uint64_t)-1);
recovery_info.size = ((uint64_t)-1);
}

Expand Down Expand Up @@ -1561,6 +1574,7 @@ void ReplicatedBackend::prep_push_to_replica(

map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subsets;
interval_set<uint64_t> data_subset;
bool can_recover_partial = false;

// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
Expand Down Expand Up @@ -1604,10 +1618,9 @@ void ReplicatedBackend::prep_push_to_replica(
obc,
ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
get_parent()->get_shard_info().find(peer)->second.last_backfill,
data_subset, clone_subsets);
data_subset, clone_subsets, can_recover_partial);
}

prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need);
prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need, can_recover_partial);
}

void ReplicatedBackend::prep_push(ObjectContextRef obc,
Expand All @@ -1631,7 +1644,7 @@ void ReplicatedBackend::prep_push(
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *pop,
bool cache_dont_need)
bool cache_dont_need, bool can_recover_partial)
{
get_parent()->begin_peer_recover(peer, soid);
// take note.
Expand All @@ -1643,6 +1656,7 @@ void ReplicatedBackend::prep_push(
pi.recovery_info.soid = soid;
pi.recovery_info.oi = obc->obs.oi;
pi.recovery_info.version = version;
pi.recovery_info.can_recover_partial = can_recover_partial;
pi.recovery_progress.first = true;
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
Expand Down Expand Up @@ -1706,6 +1720,7 @@ void ReplicatedBackend::submit_push_data(
ObjectStore::Transaction *t)
{
hobject_t target_oid;
bool can_recover_partial = recovery_info.can_recover_partial;
if (first && complete) {
target_oid = recovery_info.soid;
} else {
Expand All @@ -1715,12 +1730,17 @@ void ReplicatedBackend::submit_push_data(
dout(10) << __func__ << ": Adding oid "
<< target_oid << " in the temp collection" << dendl;
add_temp_obj(target_oid);
if (can_recover_partial)
t->collection_move_rename(coll, ghobject_t(recovery_info.soid),
coll, ghobject_t(target_oid));
}
}

if (first) {
t->remove(coll, ghobject_t(target_oid));
t->touch(coll, ghobject_t(target_oid));
if (!can_recover_partial) {
t->remove(coll, ghobject_t(target_oid));
t->touch(coll, ghobject_t(target_oid));
}
t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
}
Expand Down Expand Up @@ -1748,7 +1768,8 @@ void ReplicatedBackend::submit_push_data(
dout(10) << __func__ << ": Removing oid "
<< target_oid << " from the temp collection" << dendl;
clear_temp_obj(target_oid);
t->remove(coll, ghobject_t(recovery_info.soid));
if (!can_recover_partial)
t->remove(coll, ghobject_t(recovery_info.soid));
t->collection_move_rename(coll, ghobject_t(target_oid),
coll, ghobject_t(recovery_info.soid));
}
Expand Down Expand Up @@ -2280,9 +2301,14 @@ void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
if (progress.first && recovery_info.size == ((uint64_t)-1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
recovery_info.copy_subset.clear();
if (st.st_size)
recovery_info.copy_subset.insert(0, st.st_size);
assert(!recovery_info.copy_subset.empty());
if (st.st_size) {
interval_set<uint64_t> temp;
temp.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(temp);
}
else
recovery_info.copy_subset.clear();
assert(recovery_info.clone_subset.empty());
}

Expand Down
5 changes: 3 additions & 2 deletions src/osd/ReplicatedBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,13 @@ class ReplicatedBackend : public PGBackend {
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *op,
bool cache = false);
bool cache = false,
bool can_recover_partial = false);
void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets, bool& can_recover_partial);
ObjectRecoveryInfo recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
SnapSetContext *ssc
Expand Down
11 changes: 9 additions & 2 deletions src/osd/ReplicatedPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3942,7 +3942,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;

ctx->can_recover_partial = false;
// TODO: check endianness (__le32 vs uint32_t, etc.)
// The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
// but the code in this function seems to treat them as native-endian. What should the
Expand Down Expand Up @@ -4783,7 +4783,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
case CEPH_OSD_OP_WRITE:
++ctx->num_write;
{ // write
__u32 seq = oi.truncate_seq;
__u32 seq = oi.truncate_seq;
ctx->can_recover_partial = true;
tracepoint(osd, do_osd_op_pre_write, soid.oid.name.c_str(), soid.snap.val, oi.size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
if (op.extent.length != osd_op.indata.length()) {
result = -EINVAL;
Expand Down Expand Up @@ -4830,6 +4831,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (obs.exists && !oi.is_whiteout()) {
dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
<< ", truncating to " << op.extent.truncate_size << dendl;
ctx->can_recover_partial &= false;
t->truncate(soid, op.extent.truncate_size);
oi.truncate_seq = op.extent.truncate_seq;
oi.truncate_size = op.extent.truncate_size;
Expand All @@ -4849,6 +4851,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
if (result < 0)
break;
if (pool.info.require_rollback()) {
ctx->can_recover_partial &= false;
t->append(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
} else {
t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
Expand Down Expand Up @@ -6600,6 +6603,10 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
ctx->obs->oi.version,
ctx->user_at_version, ctx->reqid,
ctx->mtime));
ctx->log.back().can_recover_partial = ctx->can_recover_partial;
if (ctx->can_recover_partial)
ctx->log.back().dirty_data_interval.insert(ctx->modified_ranges);

if (soid.snap < CEPH_NOSNAP) {
switch (log_op_type) {
case pg_log_entry_t::MODIFY:
Expand Down
3 changes: 2 additions & 1 deletion src/osd/ReplicatedPG.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ class ReplicatedPG : public PG, public PGBackend::Listener {
bool cache_evict; ///< true if this is a cache eviction
bool ignore_cache; ///< true if IGNORE_CACHE flag is set
bool ignore_log_op_stats; // don't log op stats
bool can_recover_partial; // only recover modified content

// side effects
list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
Expand Down Expand Up @@ -609,7 +610,7 @@ class ReplicatedPG : public PG, public PGBackend::Listener {
snapset(0),
new_obs(obs->oi, obs->exists),
modify(false), user_modify(false), undirty(false), cache_evict(false),
ignore_cache(false), ignore_log_op_stats(false),
ignore_cache(false), ignore_log_op_stats(false), can_recover_partial(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
op_t(NULL),
Expand Down
31 changes: 26 additions & 5 deletions src/osd/osd_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3866,14 +3866,25 @@ eversion_t pg_missing_t::have_old(const hobject_t& oid) const
*/
void pg_missing_t::add_next_event(const pg_log_entry_t& e)
{

if (e.is_update()) {
map<hobject_t, item, hobject_t::ComparatorWithDefault>::iterator missing_it;
map<hobject_t, pair<bool, interval_set<uint64_t> >, hobject_t::ComparatorWithDefault>::iterator missing_range_it;
missing_it = missing.find(e.soid);
missing_range_it = missing_range.find(e.soid);
bool is_missing_divergent_item = missing_it != missing.end();
bool is_missing_range_divergent_item = missing_range_it != missing_range.end();
assert(is_missing_divergent_item == is_missing_range_divergent_item);
if (e.prior_version == eversion_t() || e.is_clone()) {
// new object.
missing_range[e.soid].first = e.can_recover_partial;
missing_range[e.soid].second.clear();
if (is_missing_range_divergent_item) {
missing_range_it->second.first &= e.can_recover_partial;
} else {
missing_range[e.soid].first = e.can_recover_partial;
missing_range_it = missing_range.find(e.soid);
}
missing_range_it->second.second.clear();
missing_range_it->second.second.union_of(e.dirty_data_interval);
if (is_missing_divergent_item) { // use iterator
rmissing.erase((missing_it->second).need.version);
missing_it->second = item(e.version, eversion_t()); // .have = nil
Expand All @@ -3882,15 +3893,22 @@ void pg_missing_t::add_next_event(const pg_log_entry_t& e)
} else if (is_missing_divergent_item) {
// already missing (prior).
rmissing.erase((missing_it->second).need.version);
if (is_missing_range_divergent_item) {
missing_range_it->second.first &= e.can_recover_partial;
} else {
missing_range[e.soid].first = e.can_recover_partial;
missing_range_it = missing_range.find(e.soid);
}
(missing_it->second).need = e.version; // leave .have unchanged.
missing_range_it->second.second.union_of(e.dirty_data_interval);
} else if (e.is_backlog()) {
// May not have prior version
assert(0 == "these don't exist anymore");
} else {
// not missing, we must have prior_version (if any)
assert(!is_missing_divergent_item);
missing[e.soid] = item(e.version, e.prior_version);
missing_range[e.soid].first &= e.can_recover_partial;
missing_range[e.soid].first = e.can_recover_partial;
missing_range[e.soid].second.union_of(e.dirty_data_interval);
}
rmissing[e.version.version] = e.soid;
Expand Down Expand Up @@ -4951,28 +4969,31 @@ void ObjectRecoveryProgress::dump(Formatter *f) const

void ObjectRecoveryInfo::encode(bufferlist &bl) const
{
ENCODE_START(2, 1, bl);
ENCODE_START(3, 1, bl);
::encode(soid, bl);
::encode(version, bl);
::encode(size, bl);
::encode(oi, bl);
::encode(ss, bl);
::encode(copy_subset, bl);
::encode(clone_subset, bl);
::encode(can_recover_partial, bl);
ENCODE_FINISH(bl);
}

void ObjectRecoveryInfo::decode(bufferlist::iterator &bl,
int64_t pool)
{
DECODE_START(2, bl);
DECODE_START(3, bl);
::decode(soid, bl);
::decode(version, bl);
::decode(size, bl);
::decode(oi, bl);
::decode(ss, bl);
::decode(copy_subset, bl);
::decode(clone_subset, bl);
if (struct_v >2)
::decode(can_recover_partial, bl);
DECODE_FINISH(bl);

if (struct_v < 2) {
Expand Down
3 changes: 2 additions & 1 deletion src/osd/osd_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -3738,8 +3738,9 @@ struct ObjectRecoveryInfo {
SnapSet ss;
interval_set<uint64_t> copy_subset;
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subset;
bool can_recover_partial;

ObjectRecoveryInfo() : size(0) { }
ObjectRecoveryInfo() : size(0), can_recover_partial(false) { }

static void generate_test_instances(list<ObjectRecoveryInfo*>& o);
void encode(bufferlist &bl) const;
Expand Down

0 comments on commit 8a5c25e

Please sign in to comment.