Skip to content

Commit

Permalink
rgw/notifications: fix tag based filtering to works on all ops
Browse files Browse the repository at this point in the history
fixes: https://tracker.ceph.com/issues/48321

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif committed Nov 23, 2020
1 parent d3cf17d commit f8099cd
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 52 deletions.
98 changes: 89 additions & 9 deletions src/rgw/rgw_notify.cc
Expand Up @@ -597,9 +597,59 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
return s_manager->remove_persistent_topic(topic_name, y);
}

rgw::sal::RGWObject* get_object_with_atttributes(const req_state* s, rgw::sal::RGWObject* obj) {
// in case of copy obj, the tags and metadata are taken from source
const auto src_obj = s->src_object ? s->src_object.get() : obj;
if (src_obj->get_attrs().empty()) {
if (!src_obj->get_bucket()) {
src_obj->set_bucket(s->bucket.get());
}
if (src_obj->get_obj_attrs(s->obj_ctx, s->yield) < 0) {
return nullptr;
}
}
return src_obj;
}

void metadata_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValueList& metadata) {
const auto src_obj = get_object_with_atttributes(s, obj);
if (!src_obj) {
return;
}
for (auto& attr : src_obj->get_attrs()) {
if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
std::string_view key(attr.first);
key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
// we want to pass a null terminated version
// of the bufferlist, hence "to_str().c_str()"
metadata.emplace(key, attr.second.to_str().c_str());
}
}
}

void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValueList& tags) {
const auto src_obj = get_object_with_atttributes(s, obj);
if (!src_obj) {
return;
}
const auto& attrs = src_obj->get_attrs();
const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
if (attr_iter != attrs.end()) {
auto bliter = attr_iter->second.cbegin();
RGWObjTags obj_tags;
try {
::decode(obj_tags, bliter);
} catch(buffer::error&) {
// not able to decode tags
return;
}
tags = std::move(obj_tags.get_tags());
}
}

// populate record from request
void populate_record_from_request(const req_state *s,
const rgw::sal::RGWObject* obj,
rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
Expand All @@ -625,24 +675,54 @@ void populate_record_from_request(const req_state *s,
set_event_id(record.id, etag, ts);
record.bucket_id = s->bucket->get_bucket_id();
// pass meta data
record.x_meta_map = s->info.x_meta_map;
if (s->info.x_meta_map.empty()) {
// try to fetch the metadata from the attributes
metadata_from_attributes(s, obj, record.x_meta_map);
} else {
record.x_meta_map = s->info.x_meta_map;
}
// pass tags
record.tags = s->tagset.get_tags();
if (s->tagset.get_tags().empty()) {
// try to fetch the tags from the attributes
tags_from_attributes(s, obj, record.tags);
} else {
record.tags = s->tagset.get_tags();
}
// opaque data will be filled from topic configuration
}

bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, const rgw::sal::RGWObject* obj, EventType event) {
bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, EventType event) {
if (!::match(filter.events, event)) {
return false;
}
if (!::match(filter.s3_filter.key_filter, obj->get_name())) {
return false;
}
if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
if (!filter.s3_filter.metadata_filter.kvl.empty()) {
// metadata filter exists
if (s->info.x_meta_map.empty()) {
// try to fetch the metadata from the attributes
KeyValueList metadata;
metadata_from_attributes(s, obj, metadata);
if (!::match(filter.s3_filter.metadata_filter, metadata)) {
return false;
}
} else if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
}
}
if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
return false;
if (!filter.s3_filter.tag_filter.kvl.empty()) {
// tag filter exists
if(s->tagset.get_tags().empty()) {
// try to fetch tags from the attributes
KeyValueList tags;
tags_from_attributes(s, obj, tags);
if (!::match(filter.s3_filter.tag_filter, tags)) {
return false;
}
} else if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
return false;
}
}
return true;
}
Expand Down Expand Up @@ -700,7 +780,7 @@ int publish_reserve(EventType event_type,
return 0;
}

int publish_commit(const rgw::sal::RGWObject* obj,
int publish_commit(rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
Expand Down
6 changes: 3 additions & 3 deletions src/rgw/rgw_notify.h
Expand Up @@ -56,9 +56,9 @@ struct reservation_t {
rgw::sal::RGWRadosStore* const store;
const req_state* const s;
size_t size;
const rgw::sal::RGWObject* const object;
rgw::sal::RGWObject* const object;

reservation_t(rgw::sal::RGWRadosStore* _store, const req_state* _s, const rgw::sal::RGWObject* _object) :
reservation_t(rgw::sal::RGWRadosStore* _store, const req_state* _s, rgw::sal::RGWObject* _object) :
store(_store), s(_s), object(_object) {}

// dtor doing resource leak guarding
Expand All @@ -71,7 +71,7 @@ int publish_reserve(EventType event_type,
reservation_t& reservation);

// commit the reservation to the queue
int publish_commit(const rgw::sal::RGWObject* obj,
int publish_commit(rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
Expand Down
40 changes: 0 additions & 40 deletions src/rgw/rgw_op.cc
Expand Up @@ -926,29 +926,6 @@ int RGWGetObj::verify_permission(optional_yield y)
return 0;
}

// cache the objects tags into the requests
// use inside try/catch as "decode()" may throw
void populate_tags_in_request(req_state* s, const rgw::sal::RGWAttrs& attrs) {
const auto attr_iter = attrs.find(RGW_ATTR_TAGS);
if (attr_iter != attrs.end()) {
auto bliter = attr_iter->second.cbegin();
decode(s->tagset, bliter);
}
}

// cache the objects metadata into the request
void populate_metadata_in_request(req_state* s, const rgw::sal::RGWAttrs& attrs) {
for (auto& attr : attrs) {
if (boost::algorithm::starts_with(attr.first, RGW_ATTR_META_PREFIX)) {
std::string_view key(attr.first);
key.remove_prefix(sizeof(RGW_ATTR_PREFIX)-1);
// we want to pass a null terminated version
// of the bufferlist, hence "to_str().c_str()"
s->info.x_meta_map.emplace(key, attr.second.to_str().c_str());
}
}
}

int RGWOp::verify_op_mask()
{
uint32_t required_mask = op_mask();
Expand Down Expand Up @@ -4812,15 +4789,6 @@ void RGWDeleteObj::execute(optional_yield y)
return;
}

// cache the objects tags and metadata into the requests
// so it could be used in the notification mechanism
try {
populate_tags_in_request(s, attrs);
} catch (buffer::error& err) {
ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
}
populate_metadata_in_request(s, attrs);

// make reservation for notification if needed
rgw::notify::reservation_t res(store, s, s->object.get());
const auto versioned_object = s->bucket->versioning_enabled();
Expand Down Expand Up @@ -4872,14 +4840,6 @@ void RGWDeleteObj::execute(optional_yield y)
op_ret = 0;
}

// cache the objects tags and metadata into the requests
// so it could be used in the notification mechanism
try {
populate_tags_in_request(s, attrs);
} catch (buffer::error& err) {
ldpp_dout(this, 5) << "WARNING: failed to populate delete request with object tags: " << err.what() << dendl;
}
populate_metadata_in_request(s, attrs);
const auto obj_state = obj_ctx->get_state(s->object->get_obj());

// send request to notification manager
Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_sal.h
Expand Up @@ -478,6 +478,7 @@ class RGWObject {
virtual bool placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) = 0;

RGWAttrs& get_attrs(void) { return attrs; }
const RGWAttrs& get_attrs(void) const { return attrs; }
ceph::real_time get_mtime(void) const { return mtime; }
uint64_t get_obj_size(void) const { return obj_size; }
RGWBucket* get_bucket(void) const { return bucket; }
Expand Down

0 comments on commit f8099cd

Please sign in to comment.