Skip to content

Commit

Permalink
osd: EC Partial Stripe Reads (Retry of #23138)
Browse files Browse the repository at this point in the history
This is a re-implementation of PR #23138 rebased on main with a couple of nitpicky changes to make the code a little more clear (to me at least).  Credit goes to Xiaofei Cui [cuixiaofei@sangfor.com.cn](mailto:cuixiaofei@sangfor.com.cn) for the original implementation.

Looking at the original PR's review, it does not appear that we can use the same technique as in 468ad4b.  We don't have the ReadOp yet.  I'm not sure if @gregsforytwo's idea to query the plugin works, but it's clear we are not doing the efficient thing from the get-go here.

The performance and efficiency benefits for small random reads appears to be quite substantial, especially for large stripe widths.

Signed-off-by: Mark Nelson <mark.nelson@clyso.com>
  • Loading branch information
Mark Nelson authored and Mark Nelson committed Aug 2, 2023
1 parent 86c9a7a commit 5e4fb61
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 47 deletions.
25 changes: 24 additions & 1 deletion src/erasure-code/ErasureCode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,40 @@ int ErasureCode::decode_concat(const map<int, bufferlist> &chunks,
bufferlist *decoded)
{
set<int> want_to_read;
set<int> decode_chunks;
bool need_decode = false;

for (unsigned int i = 0; i < get_data_chunk_count(); i++) {
want_to_read.insert(chunk_index(i));
}
if (chunks.size() < get_data_chunk_count()) {
// for partial_read
for(map<int, bufferlist>::const_iterator i = chunks.begin();
i != chunks.end();
++i) {
if (want_to_read.find(i->first) == want_to_read.end()) {
need_decode = true;
break;
}
decode_chunks.insert(i->first);
}
if (!need_decode) {
want_to_read.swap(decode_chunks);
}
}
map<int, bufferlist> decoded_map;
int r = _decode(want_to_read, chunks, &decoded_map);
if (r == 0) {
for (unsigned int i = 0; i < get_data_chunk_count(); i++) {
decoded->claim_append(decoded_map[chunk_index(i)]);
if (decoded_map.find(chunk_index(i)) != decoded_map.end()) {
decoded->claim_append(decoded_map[chunk_index(i)]);
}
}
}
return r;
}

bool ErasureCode::is_systematic() {
return true;
}
}
2 changes: 2 additions & 0 deletions src/erasure-code/ErasureCode.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ namespace ceph {
int decode_concat(const std::map<int, bufferlist> &chunks,
bufferlist *decoded) override;

virtual bool is_systematic();

protected:
int parse(const ErasureCodeProfile &profile,
std::ostream *ss);
Expand Down
5 changes: 5 additions & 0 deletions src/erasure-code/ErasureCodeInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,11 @@ namespace ceph {
*/
virtual int decode_concat(const std::map<int, bufferlist> &chunks,
bufferlist *decoded) = 0;

/**
* @return **true** if the EC plugin's data placement is systematic.
*/
virtual bool is_systematic() = 0;
};

typedef std::shared_ptr<ErasureCodeInterface> ErasureCodeInterfaceRef;
Expand Down
172 changes: 141 additions & 31 deletions src/osd/ECBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1223,10 +1223,10 @@ void ECBackend::handle_sub_read_reply(
++j, ++req_iter, ++riter) {
ceph_assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
ceph_assert(riter != rop.complete[i->first].returned.end());
pair<uint64_t, uint64_t> adjusted =
pair<uint64_t, uint64_t> aligned =
sinfo.aligned_offset_len_to_chunk(
make_pair(req_iter->get<0>(), req_iter->get<1>()));
ceph_assert(adjusted.first == j->first);
ceph_assert(aligned.first == j->first);
riter->get<2>()[from] = std::move(j->second);
}
}
Expand Down Expand Up @@ -1686,6 +1686,29 @@ int ECBackend::get_min_avail_to_read_shards(
return 0;
}

void ECBackend::get_min_want_to_read_shards(
pair<uint64_t, uint64_t> off_len,
set<int> *want_to_read) {
uint64_t off = off_len.first;
uint64_t len = off_len.second;
uint64_t chunk_size = sinfo.get_chunk_size();
int data_chunk_count = sinfo.get_stripe_width() / sinfo.get_chunk_size();
const vector<int> &chunk_mapping = ec_impl->get_chunk_mapping();

int total_chunks = (chunk_size - 1 + len) / chunk_size;
int first_chunk = (off / chunk_size) % data_chunk_count;

if (total_chunks > data_chunk_count) {
total_chunks = data_chunk_count;
}

for(int i = 0; i < total_chunks; i++) {
int j = (first_chunk + i) % data_chunk_count;
int chunk = (int)chunk_mapping.size() > j ? chunk_mapping[j] : j;
want_to_read->insert(chunk);
}
}

int ECBackend::get_remaining_shards(
const hobject_t &hoid,
const set<int> &avail,
Expand Down Expand Up @@ -2243,8 +2266,7 @@ void ECBackend::objects_read_async(
i != to_read.end();
++i) {
pair<uint64_t, uint64_t> tmp =
sinfo.offset_len_to_stripe_bounds(
make_pair(i->first.get<0>(), i->first.get<1>()));
make_pair(i->first.get<0>(), i->first.get<1>());

es.union_insert(tmp.first, tmp.second);
flags |= i->first.get<2>();
Expand Down Expand Up @@ -2364,34 +2386,35 @@ struct CallClientContexts :
ceph_assert(res.returned.size() == to_read.size());
ceph_assert(res.errors.empty());
for (auto &&read: to_read) {
pair<uint64_t, uint64_t> adjusted =
ec->sinfo.offset_len_to_stripe_bounds(
make_pair(read.get<0>(), read.get<1>()));
ceph_assert(res.returned.front().get<0>() == adjusted.first);
ceph_assert(res.returned.front().get<1>() == adjusted.second);
auto bounds = make_pair(read.get<0>(), read.get<1>());
auto aligned = ec->sinfo.offset_len_to_stripe_bounds(bounds);
if (aligned.first != read.get<0>() || aligned.second != read.get<1>()) {
aligned = ec->sinfo.offset_len_to_chunk_bounds(bounds);
}
ceph_assert(res.returned.front().get<0>() == aligned.first);
ceph_assert(res.returned.front().get<1>() == aligned.second);
map<int, bufferlist> to_decode;
bufferlist bl;

for (map<pg_shard_t, bufferlist>::iterator j =
res.returned.front().get<2>().begin();
j != res.returned.front().get<2>().end();
++j) {
to_decode[j->first.shard] = std::move(j->second);
}
int r = ECUtil::decode(
ec->sinfo,
ec->ec_impl,
to_decode,
&bl);

int r = ECUtil::decode(ec->sinfo, ec->ec_impl, to_decode, &bl);
if (r < 0) {
res.r = r;
goto out;
}

bufferlist trimmed;
trimmed.substr_of(
bl,
read.get<0>() - adjusted.first,
std::min(read.get<1>(),
bl.length() - (read.get<0>() - adjusted.first)));
auto off = read.get<0>() - aligned.first;
auto len =
std::min(read.get<1>(), bl.length() - (read.get<0>() - aligned.first));
trimmed.substr_of(bl, off, len);

result.insert(
read.get<0>(), trimmed.length(), std::move(trimmed));
res.returned.pop_front();
Expand All @@ -2403,11 +2426,9 @@ struct CallClientContexts :
};

void ECBackend::objects_read_and_reconstruct(
const map<hobject_t,
std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
> &reads,
const std::map<hobject_t,std::list<ec_align_t> > &reads,
bool fast_read,
GenContextURef<map<hobject_t,pair<int, extent_map> > &&> &&func)
GenContextURef<ec_extents_t &&> &&func)
{
in_progress_client_reads.emplace_back(
reads.size(), std::move(func));
Expand All @@ -2417,11 +2438,47 @@ void ECBackend::objects_read_and_reconstruct(
}

map<hobject_t, set<int>> obj_want_to_read;
set<int> want_to_read;
get_want_to_read_shards(&want_to_read);

map<hobject_t, read_request_t> for_read_op;
for (auto &&to_read: reads) {
set<int> want_to_read;
get_want_to_read_shards(&want_to_read);

std::list<ec_align_t> align_offsets;
bool partial_read = should_partial_read(
to_read.first,
to_read.second,
want_to_read,
fast_read,
false);

// Our extent set and flags
extent_set es;
uint32_t flags = 0;

for (auto read : to_read.second) {
auto bounds = make_pair(read.get<0>(), read.get<1>());

// By default, align to the stripe
auto aligned = sinfo.offset_len_to_stripe_bounds(bounds);
if (partial_read) {
// align to the chunk instead
aligned = sinfo.offset_len_to_chunk_bounds(bounds);
set<int> new_want_to_read;
get_min_want_to_read_shards(aligned, &new_want_to_read);
want_to_read = new_want_to_read;
}
// Add the new extents/flags
extent_set new_es;
new_es.insert(aligned.first, aligned.second);
es.union_of(new_es);
flags |= read.get<2>();
}
if (!es.empty()) {
for (auto e = es.begin(); e != es.end(); ++e) {
align_offsets.push_back(
boost::make_tuple(e.get_start(), e.get_len(), flags));
}
}
map<pg_shard_t, vector<pair<int, int>>> shards;
int r = get_min_avail_to_read_shards(
to_read.first,
Expand All @@ -2435,12 +2492,12 @@ void ECBackend::objects_read_and_reconstruct(
to_read.first,
this,
&(in_progress_client_reads.back()),
to_read.second);
align_offsets);
for_read_op.insert(
make_pair(
to_read.first,
read_request_t(
to_read.second,
align_offsets,
shards,
false,
c)));
Expand All @@ -2456,6 +2513,44 @@ void ECBackend::objects_read_and_reconstruct(
return;
}

bool ECBackend::should_partial_read(
const hobject_t &hoid,
std::list<ec_align_t> to_read,
const std::set<int> &want,
bool fast_read,
bool for_recovery)
{
// Don't partial read if we are doing a fast_read
if (fast_read) {
return false;
}
// Don't partial read if the EC isn't systematic
if (!ec_impl->is_systematic()) {
return false;
}
// Don't partial read if we have multiple stripes
if (to_read.size() != 1) {
return false;
}
// Only partial read if the length is inside the stripe boundary
auto read = to_read.front();
auto bounds = make_pair(read.get<0>(), read.get<1>());
auto aligned = sinfo.offset_len_to_stripe_bounds(bounds);
if (sinfo.get_stripe_width() != aligned.second) {
return false;
}

set<int> have;
map<shard_id_t, pg_shard_t> shards;
set<pg_shard_t> error_shards;
get_all_avail_shards(hoid, error_shards, have, shards, for_recovery);

set<int> data_shards;
get_want_to_read_shards(&data_shards);

return includes(data_shards.begin(), data_shards.end(), want.begin(), want.end())
&& includes(have.begin(), have.end(), want.begin(), want.end());
}

int ECBackend::send_all_remaining_reads(
const hobject_t &hoid,
Expand All @@ -2472,8 +2567,7 @@ int ECBackend::send_all_remaining_reads(
if (r)
return r;

list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
rop.to_read.find(hoid)->second.to_read;
list<ec_align_t> to_read = rop.to_read.find(hoid)->second.to_read;
GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
rop.to_read.find(hoid)->second.cb;

Expand All @@ -2485,14 +2579,30 @@ int ECBackend::send_all_remaining_reads(
dout(10) << __func__ << " want attrs again" << dendl;
}

bool partial_read = rop.to_read.find(hoid)->second.partial_read;
// realign the offset and len to make partial reads normal reads.
if (partial_read) {
list<ec_align_t> new_to_read;
for(auto read : to_read) {
auto bounds = make_pair(read.get<0>(), read.get<1>());
auto aligned = sinfo.offset_len_to_stripe_bounds(bounds);
new_to_read.push_back(
boost::make_tuple(aligned.first, aligned.second, read.get<2>()));
}
to_read = new_to_read;
}

rop.to_read.erase(hoid);
rop.to_read.insert(make_pair(
hoid,
read_request_t(
offsets,
to_read,
shards,
want_attrs,
c)));
if (partial_read) {
rop.refresh_complete(hoid);
}
return 0;
}

Expand Down

0 comments on commit 5e4fb61

Please sign in to comment.