Skip to content

Commit

Permalink
osd: recover partial extentis
Browse files Browse the repository at this point in the history
basic logic for partial recovery
upgrade compatibility
read_log support

Signed-off-by: Ning Yao <yaoning@unitedstack.com>
  • Loading branch information
Ning Yao committed Apr 19, 2016
1 parent d95f2ca commit b63468b
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/include/interval_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ class interval_set {

private:
// data
int64_t _size;
uint64_t _size;
std::map<T,T> m; // map start -> len
};

Expand Down
4 changes: 3 additions & 1 deletion src/os/filestore/FileStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ int FileStore::lfn_stat(const coll_t& cid, const ghobject_t& oid, struct stat *b
RWLock::RLocker l((index.index)->access_lock);

r = lfn_find(oid, index, &path);
if (r < 0)
if (r < 0 || !buf)
return r;
r = ::stat(path->path(), buf);
if (r < 0)
Expand Down Expand Up @@ -2942,6 +2942,8 @@ int FileStore::stat(
tracepoint(objectstore, stat_enter, _cid.c_str());
const coll_t& cid = !_need_temp_object_collection(_cid, oid) ? _cid : _cid.get_temp();
int r = lfn_stat(cid, oid, st);
if (!st)
return r;
assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
if (r < 0) {
dout(10) << "stat " << cid << "/" << oid
Expand Down
2 changes: 2 additions & 0 deletions src/osd/PGBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ struct shard_info_wrapper;

virtual hobject_t get_temp_recovery_object(eversion_t version,
snapid_t snap) = 0;
virtual hobject_t get_local_temp_recovery_object(eversion_t version,
snapid_t snap) = 0;

virtual void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch) = 0;
Expand Down
18 changes: 16 additions & 2 deletions src/osd/PGLog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,12 @@ void PGLog::read_log(ObjectStore *store, coll_t pg_coll,
if (i->version <= info.last_complete) break;
if (cmp(i->soid, info.last_backfill, info.last_backfill_bitwise) > 0)
continue;
std::map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::iterator miter = missing.missing.find(i->soid);
if (miter != missing.missing.end()) {
assert(did.count(i->soid));
miter->second.omap_unchanged = miter->second.omap_unchanged && i->omap_unchanged;
miter->second.dirty_extents.intersection_of(i->dirty_extents);
}
if (did.count(i->soid)) continue;
did.insert(i->soid);

Expand All @@ -1008,12 +1014,20 @@ void PGLog::read_log(ObjectStore *store, coll_t pg_coll,
object_info_t oi(bv);
if (oi.version < i->version) {
ldpp_dout(dpp, 15) << "read_log missing " << *i
<< " (have " << oi.version << ")" << dendl;
<< " (have " << oi.version << ")"
<< " omap_unchanged " << i->omap_unchanged
<< " dirty_extents " << i->dirty_extents << dendl;
missing.add(i->soid, i->version, oi.version);
missing.missing[i->soid].omap_unchanged = i->omap_unchanged;
missing.missing[i->soid].dirty_extents.insert(i->dirty_extents);
}
} else {
ldpp_dout(dpp, 15) << "read_log missing " << *i << dendl;
ldpp_dout(dpp, 15) << "read_log missing " << *i
<< " omap_unchanged " << i->omap_unchanged
<< " dirty_extents " << i->dirty_extents << dendl;
missing.add(i->soid, i->version, eversion_t());
missing.missing[i->soid].omap_unchanged = i->omap_unchanged;
missing.missing[i->soid].dirty_extents.insert(i->dirty_extents);
}
}
for (map<eversion_t, hobject_t>::reverse_iterator i =
Expand Down
98 changes: 70 additions & 28 deletions src/osd/ReplicatedBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,21 @@ void ReplicatedBackend::calc_head_subsets(
uint64_t size = obc->obs.oi.size;
if (size)
data_subset.insert(0, size);
else
return;
if (parent->min_peer_features() & CEPH_OSD_PARTIAL_RECOVERY) {
map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator it
= missing.missing.find(head);
assert(it != missing.missing.end());
interval_set<uint64_t> unmodified;
unmodified.insert(0, size);
unmodified.intersection_of(it->second.dirty_extents);
data_subset.subtract(unmodified);
dout(10) << "calc_head_subsets " << head
<< " unmodified " << unmodified
<< " data_subset " << data_subset << dendl;
return;
}

if (get_parent()->get_pool().allow_incomplete_clones()) {
dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
Expand Down Expand Up @@ -1408,9 +1423,10 @@ void ReplicatedBackend::prepare_pull(
ObjectContextRef headctx,
RPGHandle *h)
{
assert(get_parent()->get_local_missing().missing.count(soid));
eversion_t _v = get_parent()->get_local_missing().missing.find(
soid)->second.need;
map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator missing_iter =
get_parent()->get_local_missing().missing.find(soid);
assert(missing_iter != get_parent()->get_local_missing().missing.end());
eversion_t _v = missing_iter->second.need;
assert(_v == v);
const map<hobject_t, set<pg_shard_t>, hobject_t::BitwiseComparator> &missing_loc(
get_parent()->get_missing_loc_shards());
Expand Down Expand Up @@ -1474,6 +1490,9 @@ void ReplicatedBackend::prepare_pull(
// pulling head or unversioned object.
// always pull the whole thing.
recovery_info.copy_subset.insert(0, (uint64_t)-1);
if (!missing_iter->second.dirty_extents.empty() &&
(parent->min_peer_features() & CEPH_OSD_PARTIAL_RECOVERY))
recovery_info.copy_subset.subtract(missing_iter->second.dirty_extents);
recovery_info.size = ((uint64_t)-1);
}

Expand All @@ -1485,7 +1504,8 @@ void ReplicatedBackend::prepare_pull(
op.recovery_info.soid = soid;
op.recovery_info.version = v;
op.recovery_progress.data_complete = false;
op.recovery_progress.omap_complete = false;
op.recovery_progress.omap_complete = missing_iter->second.omap_unchanged &&
(parent->min_peer_features() & CEPH_OSD_PARTIAL_RECOVERY);
op.recovery_progress.data_recovered_to = 0;
op.recovery_progress.first = true;

Expand Down Expand Up @@ -1587,6 +1607,11 @@ void ReplicatedBackend::prep_push(
bool cache_dont_need)
{
get_parent()->begin_peer_recover(peer, soid);
map<pg_shard_t, pg_missing_t>::const_iterator pmissing_iter =
get_parent()->get_shard_missing().find(peer);
map<hobject_t, pg_missing_t::item, hobject_t::ComparatorWithDefault>::const_iterator missing_iter =
pmissing_iter->second.missing.find(soid);
assert(missing_iter != pmissing_iter->second.missing.end());
// take note.
PushInfo &pi = pushing[soid][peer];
pi.obc = obc;
Expand All @@ -1599,7 +1624,8 @@ void ReplicatedBackend::prep_push(
pi.recovery_progress.first = true;
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
pi.recovery_progress.omap_complete = 0;
pi.recovery_progress.omap_complete = missing_iter->second.omap_unchanged &&
(parent->min_peer_features() & CEPH_OSD_PARTIAL_RECOVERY);

ObjectRecoveryProgress new_progress;
int r = build_push_op(pi.recovery_info,
Expand Down Expand Up @@ -1658,24 +1684,38 @@ void ReplicatedBackend::submit_push_data(
map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t)
{
hobject_t target_oid;
if (first && complete) {
target_oid = recovery_info.soid;
} else {
target_oid = get_parent()->get_temp_recovery_object(recovery_info.version,
hobject_t target_oid = get_parent()->get_temp_recovery_object(recovery_info.version,
recovery_info.soid.snap);
if (first) {
dout(10) << __func__ << ": Adding oid "
<< target_oid << " in the temp collection" << dendl;
add_temp_obj(target_oid);
}
}

hobject_t local_oid = get_parent()->get_local_temp_recovery_object(recovery_info.version,
recovery_info.soid.snap);

if (first) {
dout(10) << __func__ << ": Adding oid "
<< target_oid << " in the temp collection" << dendl;
int local_exist = store->stat(coll, ghobject_t(recovery_info.soid), NULL);
add_temp_obj(target_oid);
if (local_exist != -ENOENT)
t->collection_move_rename(coll, ghobject_t(recovery_info.soid),
coll, ghobject_t(local_oid));
t->remove(coll, ghobject_t(target_oid));
t->touch(coll, ghobject_t(target_oid));
t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
t->omap_setheader(coll, ghobject_t(target_oid), omap_header);

//clone overlap content in local object
interval_set<uint64_t> local_intervals_included;
if (recovery_info.size && local_exist != -ENOENT) {
local_intervals_included.insert(0, recovery_info.size);
local_intervals_included.subtract(recovery_info.copy_subset);
}
for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
q != local_intervals_included.end();
++q) {
dout(15) << " clone_range " << recovery_info.soid << " "
<< q.get_start() << "~" << q.get_len() << dendl;
t->clone_range(coll, ghobject_t(local_oid), ghobject_t(target_oid),
q.get_start(), q.get_len(), q.get_start());
}
}
uint64_t off = 0;
uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
Expand All @@ -1697,15 +1737,12 @@ void ReplicatedBackend::submit_push_data(
t->setattrs(coll, ghobject_t(target_oid), attrs);

if (complete) {
if (!first) {
dout(10) << __func__ << ": Removing oid "
<< target_oid << " from the temp collection" << dendl;
clear_temp_obj(target_oid);
t->remove(coll, ghobject_t(recovery_info.soid));
t->collection_move_rename(coll, ghobject_t(target_oid),
dout(10) << __func__ << ": Removing oid "
<< target_oid << " from the temp collection" << dendl;
clear_temp_obj(target_oid);
t->collection_move_rename(coll, ghobject_t(target_oid),
coll, ghobject_t(recovery_info.soid));
}

t->remove(coll, ghobject_t(local_oid));
submit_push_complete(recovery_info, t);
}
}
Expand Down Expand Up @@ -2016,6 +2053,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
}
out_op->data_included.span_of(copy_subset, progress.data_recovered_to,
available);

if (out_op->data_included.empty()) // zero filled section, skip to end!
new_progress.data_recovered_to = recovery_info.copy_subset.range_end();
else
Expand Down Expand Up @@ -2232,9 +2270,13 @@ void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
if (progress.first && recovery_info.size == ((uint64_t)-1)) {
// Adjust size and copy_subset
recovery_info.size = st.st_size;
recovery_info.copy_subset.clear();
if (st.st_size)
recovery_info.copy_subset.insert(0, st.st_size);
if (st.st_size) {
interval_set<uint64_t> object_range;
object_range.insert(0, st.st_size);
recovery_info.copy_subset.intersection_of(object_range);
} else {
recovery_info.copy_subset.clear();
}
assert(recovery_info.clone_subset.empty());
}

Expand Down
2 changes: 1 addition & 1 deletion src/osd/ReplicatedBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class ReplicatedBackend : public PGBackend {
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *op,
bool cache = false);
bool cache = false);
void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
const pg_missing_t& missing,
const hobject_t &last_backfill,
Expand Down
19 changes: 18 additions & 1 deletion src/osd/ReplicatedPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4944,7 +4944,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
case CEPH_OSD_OP_WRITE:
++ctx->num_write;
{ // write
__u32 seq = oi.truncate_seq;
__u32 seq = oi.truncate_seq;
tracepoint(osd, do_osd_op_pre_write, soid.oid.name.c_str(), soid.snap.val, oi.size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
if (op.extent.length != osd_op.indata.length()) {
result = -EINVAL;
Expand Down Expand Up @@ -5784,6 +5784,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
}
}
t->omap_setkeys(soid, to_set_bl);
ctx->omap_unchanged = false;
ctx->delta_stats.num_wr++;
}
obs.oi.set_flag(object_info_t::FLAG_OMAP);
Expand Down Expand Up @@ -6563,6 +6564,19 @@ hobject_t ReplicatedPG::get_temp_recovery_object(eversion_t version, snapid_t sn
return hoid;
}

hobject_t ReplicatedPG::get_local_temp_recovery_object(eversion_t version, snapid_t snap)
{
ostringstream ss;
ss << "local_temp_recovering_" << info.pgid // (note this includes the shardid)
<< "_" << version
<< "_" << info.history.same_interval_since
<< "_" << snap;
// pgid + version + interval + snapid is unique, and short
hobject_t hoid = info.pgid.make_temp_object(ss.str());
dout(20) << __func__ << " " << hoid << dendl;
return hoid;
}

int ReplicatedPG::prepare_transaction(OpContext *ctx)
{
assert(!ctx->ops.empty());
Expand Down Expand Up @@ -6780,6 +6794,9 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
ctx->obs->oi.version,
ctx->user_at_version, ctx->reqid,
ctx->mtime));
ctx->log.back().omap_unchanged = ctx->omap_unchanged;
ctx->log.back().dirty_extents.subtract(ctx->modified_ranges);

if (soid.snap < CEPH_NOSNAP) {
switch (log_op_type) {
case pg_log_entry_t::MODIFY:
Expand Down
1 change: 1 addition & 0 deletions src/osd/ReplicatedPG.h
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,7 @@ class ReplicatedPG : public PG, public PGBackend::Listener {
hobject_t generate_temp_object(); ///< generate a new temp object name
/// generate a new temp object name (for recovery)
hobject_t get_temp_recovery_object(eversion_t version, snapid_t snap);
hobject_t get_local_temp_recovery_object(eversion_t version, snapid_t snap);
int get_recovery_op_priority() const {
int pri = 0;
pool.info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
Expand Down
4 changes: 2 additions & 2 deletions src/osd/osd_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2575,14 +2575,14 @@ struct pg_log_entry_t {

pg_log_entry_t()
: user_version(0), op(0),
invalid_hash(false), invalid_pool(false), omap_unchanged(false)
invalid_hash(false), invalid_pool(false), omap_unchanged(true)
{ dirty_extents.insert(0, (uint64_t) - 1); }
pg_log_entry_t(int _op, const hobject_t& _soid,
const eversion_t& v, const eversion_t& pv,
version_t uv,
const osd_reqid_t& rid, const utime_t& mt)
: soid(_soid), reqid(rid), version(v), prior_version(pv), user_version(uv),
mtime(mt), op(_op), invalid_hash(false), invalid_pool(false), omap_unchanged(false)
mtime(mt), op(_op), invalid_hash(false), invalid_pool(false), omap_unchanged(true)
{ dirty_extents.insert(0, (uint64_t) - 1); }

bool is_clone() const { return op == CLONE; }
Expand Down

0 comments on commit b63468b

Please sign in to comment.