Skip to content

Commit

Permalink
rgw: let newer RGWs work with older OSDs re: cls-side filtering
Browse files Browse the repository at this point in the history
Previous commits moved bucket list filtering when a delimiter was
specified to the osd/cls layer. However, since rgw's are often
upgraded before osd's are, until we reach verison ceph version 16, an
rgw cannot assume that the osd/cls did the filtering. This is
addressed in the following ways....

First rgw_cls_list_ret now indicates whether filtering was done on the
osd/cls side.

And second, the old filtering code in the rgw is maintained in
RGWRados::Bucket::List::list_objects_ordered, so it can still be
triggered when all osd's are not doing the filtering.

Once we reach ceph version 16, and there is no chance that the rgw is
working with a osd running "young" version 14 code, we can remove the
backward compatibility code in
RGWRados::Bucket::List::list_objects_ordered.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
  • Loading branch information
ivancich committed Oct 2, 2019
1 parent 4a532aa commit 0ddabd8
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 39 deletions.
17 changes: 13 additions & 4 deletions src/cls/rgw/cls_rgw_ops.h
Expand Up @@ -385,8 +385,8 @@ struct rgw_cls_list_op
cls_rgw_obj_key start_obj;
uint32_t num_entries;
string filter_prefix;
string delimiter;
bool list_versions;
string delimiter;

rgw_cls_list_op() : num_entries(0), list_versions(false) {}

Expand Down Expand Up @@ -428,18 +428,27 @@ struct rgw_cls_list_ret {
rgw_bucket_dir dir;
bool is_truncated;

rgw_cls_list_ret() : is_truncated(false) {}
// cls_filtered is not transmitted; it is assumed true for versions
// on/after 3 and false for prior versions; this allows the rgw
// layer to know when an older osd (cls) does not do the filtering
bool cls_filtered;

rgw_cls_list_ret() :
is_truncated(false),
cls_filtered(true)
{}

void encode(bufferlist &bl) const {
ENCODE_START(2, 2, bl);
ENCODE_START(3, 2, bl);
encode(dir, bl);
encode(is_truncated, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
decode(dir, bl);
decode(is_truncated, bl);
cls_filtered = struct_v >= 3;
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/rgw_admin.cc
Expand Up @@ -6188,6 +6188,7 @@ int main(int argc, const char **argv)
}

bool is_truncated = true;
bool cls_filtered = true;

rgw_obj_index_key marker;
string empty_prefix;
Expand All @@ -6202,7 +6203,7 @@ int main(int argc, const char **argv)
bucket_info, RGW_NO_SHARD,
marker, empty_prefix, empty_delimiter,
1000, true,
result, &is_truncated, &marker,
result, &is_truncated, &cls_filtered, &marker,
null_yield,
rgw_bucket_object_check_filter);
if (r < 0 && r != -ENOENT) {
Expand Down
5 changes: 3 additions & 2 deletions src/rgw/rgw_bucket.cc
Expand Up @@ -1020,6 +1020,7 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
string empty_delimiter;
rgw_obj_index_key marker;
bool is_truncated = true;
bool cls_filtered = true;

Formatter *formatter = flusher.get_formatter();
formatter->open_object_section("objects");
Expand All @@ -1029,8 +1030,8 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,

int r = store->getRados()->cls_bucket_list_ordered(
bucket_info, RGW_NO_SHARD, marker, prefix, empty_delimiter,
listing_max_entries, true, result, &is_truncated, &marker, y,
rgw_bucket_object_check_filter);
listing_max_entries, true, result, &is_truncated, &cls_filtered,
&marker, y, rgw_bucket_object_check_filter);
if (r == -ENOENT) {
break;
} else if (r < 0 && r != -ENOENT) {
Expand Down
131 changes: 100 additions & 31 deletions src/rgw/rgw_rados.cc
Expand Up @@ -1741,6 +1741,7 @@ int RGWRados::Bucket::List::list_objects_ordered(

int count = 0;
bool truncated = true;
bool cls_filtered = false;
const int64_t max = // protect against memory issues and negative vals
std::min(bucket_list_objects_absolute_max, std::max(int64_t(0), max_p));
int read_ahead = std::max(cct->_conf->rgw_list_bucket_min_readahead, max);
Expand Down Expand Up @@ -1789,6 +1790,7 @@ int RGWRados::Bucket::List::list_objects_ordered(
params.list_versions,
ent_map,
&truncated,
&cls_filtered,
&cur_marker,
y);
if (r < 0) {
Expand Down Expand Up @@ -1851,26 +1853,61 @@ int RGWRados::Bucket::List::list_objects_ordered(
}

if (!params.delim.empty()) {
int delim_pos = obj.name.find(params.delim, params.prefix.size());

if (delim_pos >= 0) {
// should only find one delimiter at the end if it finds any
// after the prefix
ceph_assert(delim_pos ==
int(obj.name.length() - params.delim.length()));
if (common_prefixes) {
if (count >= max) {
truncated = true;
goto done;
}

(*common_prefixes)[obj.name] = true;
count++;
}

continue;
} // if found delimiter after prefix
} // if there is a delimiter
const int delim_pos = obj.name.find(params.delim, params.prefix.size());
if (delim_pos >= 0) {
// run either the code where delimiter filtering is done a)
// in the OSD/CLS or b) here.
if (cls_filtered) {
// NOTE: this condition is for the newer versions of the
// OSD that does filtering on the CLS side

// should only find one delimiter at the end if it finds any
// after the prefix
if (delim_pos !=
int(obj.name.length() - params.delim.length())) {
ldout(cct, 0) <<
"WARNING: found delimiter in place other than the end of "
"the prefix; obj.name=" << obj.name <<
", prefix=" << params.prefix << dendl;
}
if (common_prefixes) {
if (count >= max) {
truncated = true;
goto done;
}

(*common_prefixes)[obj.name] = true;
count++;
}

continue;
} else {
// NOTE: this condition is for older versions of the OSD
// that do not filter on the CLS side, so the following code
// must do the filtering; once we reach version 16 of ceph,
// this code can be removed along with the conditional that
// can lead this way

/* extract key -with trailing delimiter- for CommonPrefix */
string prefix_key =
obj.name.substr(0, delim_pos + params.delim.length());

if (common_prefixes &&
common_prefixes->find(prefix_key) == common_prefixes->end()) {
if (count >= max) {
truncated = true;
goto done;
}
next_marker = prefix_key;
(*common_prefixes)[prefix_key] = true;

count++;
}

continue;
} // if we're running an older OSD version
} // if a delimiter was found after prefix
} // if a delimiter was passed in

if (count >= max) {
truncated = true;
Expand All @@ -1880,6 +1917,30 @@ int RGWRados::Bucket::List::list_objects_ordered(
result->emplace_back(std::move(entry));
count++;
} // for (auto eiter = ent_map.begin(); ...

// NOTE: the following conditional is needed by older versions of
// the OSD that don't do delimiter filtering on the CLS side; once
// we reach version 16 of ceph, the following conditional and the
// code within can be removed
if (!cls_filtered && !params.delim.empty()) {
int marker_delim_pos =
cur_marker.name.find(params.delim, cur_prefix.size());
if (marker_delim_pos >= 0) {
std::string skip_after_delim =
cur_marker.name.substr(0, marker_delim_pos);
skip_after_delim.append(after_delim_s);

ldout(cct, 20) << "skip_after_delim=" << skip_after_delim << dendl;

if (skip_after_delim > cur_marker.name) {
cur_marker = skip_after_delim;
ldout(cct, 20) << "setting cur_marker="
<< cur_marker.name
<< "[" << cur_marker.instance << "]"
<< dendl;
}
}
} // if older osd didn't do delimiter filtering
} // while (truncated && count <= max)) ...

done:
Expand Down Expand Up @@ -7891,7 +7952,8 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
uint32_t num_entries,
bool list_versions,
ent_map_t& m,
bool *is_truncated,
bool* is_truncated,
bool* cls_filtered,
rgw_obj_index_key *last_entry,
optional_yield y,
check_filter_t force_check_filter)
Expand Down Expand Up @@ -7926,18 +7988,25 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
vcurrents.reserve(list_results.size());
vends.reserve(list_results.size());
vnames.reserve(list_results.size());
map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
*is_truncated = false;
for (; iter != list_results.end(); ++iter) {
vcurrents.push_back(iter->second.dir.m.begin());
vends.push_back(iter->second.dir.m.end());
vnames.push_back(oids[iter->first]);
*is_truncated = (*is_truncated || iter->second.is_truncated);
*cls_filtered = true;
for (auto& r : list_results) {
vcurrents.push_back(r.second.dir.m.begin());
vends.push_back(r.second.dir.m.end());
vnames.push_back(oids[r.first]);

// if any *one* shard's result is trucated, the entire result is
// truncated
*is_truncated = *is_truncated || r.second.is_truncated;

// unless *all* are shards are cls_filtered, the entire result is
// not filtered
*cls_filtered = *cls_filtered && r.second.cls_filtered;
}

// Create a map to track the next candidate entry from each shard, if the entry
// from a specified shard is selected/erased, the next entry from that shard will
// be inserted for next round selection
// Create a map to track the next candidate entry from each shard,
// if the entry from a specified shard is selected/erased, the next
// entry from that shard will be inserted for next round selection
map<string, size_t> candidates;
for (size_t i = 0; i < vcurrents.size(); ++i) {
if (vcurrents[i] != vends[i]) {
Expand Down Expand Up @@ -7989,7 +8058,7 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
}
}

// Suggest updates if there is any
// suggest updates if there are any
map<string, bufferlist>::iterator miter = updates.begin();
for (; miter != updates.end(); ++miter) {
if (miter->second.length()) {
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/rgw_rados.h
Expand Up @@ -1354,7 +1354,8 @@ class RGWRados
uint32_t num_entries,
bool list_versions,
ent_map_t& m,
bool *is_truncated,
bool* is_truncated,
bool* cls_filtered,
rgw_obj_index_key *last_entry,
optional_yield y,
check_filter_t force_check_filter = nullptr);
Expand Down

0 comments on commit 0ddabd8

Please sign in to comment.