Skip to content

Commit

Permalink
RGW: add admin interfaces to get and delete notifications from bucket
Browse files Browse the repository at this point in the history
Signed-off-by: Ali Masarwa <ali.saed.masarwa@gmail.com>
  • Loading branch information
AliMasarweh committed May 22, 2023
1 parent 8a94605 commit d071bbf
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 35 deletions.
142 changes: 114 additions & 28 deletions src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ void usage()
cout << " script-package add add a lua package to the scripts allowlist\n";
cout << " script-package rm remove a lua package from the scripts allowlist\n";
cout << " script-package list get the lua packages allowlist\n";
cout << " notification list list bucket notifications configuration\n";
cout << " notification get get a bucket notifications configuration\n";
cout << " notification rm remove a bucket notifications configuration\n";
cout << "options:\n";
cout << " --tenant=<tenant> tenant name\n";
cout << " --user_ns=<namespace> namespace of user (oidc in case of users authenticated with oidc provider)\n";
Expand Down Expand Up @@ -483,6 +486,7 @@ void usage()
cout << " --totp-pin the valid value of a TOTP token at a certain time\n";
cout << "\nBucket notifications options:\n";
cout << " --topic bucket notifications topic name\n";
cout << " --notification-id bucket notifications id\n";
cout << "\nScript options:\n";
cout << " --context context in which the script runs. one of: "+LUA_CONTEXT_LIST+"\n";
cout << " --package name of the lua package that should be added/removed to/from the allowlist\n";
Expand Down Expand Up @@ -830,9 +834,12 @@ enum class OPT {
MFA_RESYNC,
RESHARD_STALE_INSTANCES_LIST,
RESHARD_STALE_INSTANCES_DELETE,
PUBSUB_TOPICS_LIST,
PUBSUB_TOPIC_LIST,
PUBSUB_TOPIC_GET,
PUBSUB_TOPIC_RM,
PUBSUB_NOTIFICATION_LIST,
PUBSUB_NOTIFICATION_GET,
PUBSUB_NOTIFICATION_RM,
SCRIPT_PUT,
SCRIPT_GET,
SCRIPT_RM,
Expand Down Expand Up @@ -1061,9 +1068,12 @@ static SimpleCmd::Commands all_cmds = {
{ "reshard stale list", OPT::RESHARD_STALE_INSTANCES_LIST },
{ "reshard stale-instances delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
{ "reshard stale delete", OPT::RESHARD_STALE_INSTANCES_DELETE },
{ "topic list", OPT::PUBSUB_TOPICS_LIST },
{ "topic list", OPT::PUBSUB_TOPIC_LIST },
{ "topic get", OPT::PUBSUB_TOPIC_GET },
{ "topic rm", OPT::PUBSUB_TOPIC_RM },
{ "notification list", OPT::PUBSUB_NOTIFICATION_LIST },
{ "notification get", OPT::PUBSUB_NOTIFICATION_GET },
{ "notification rm", OPT::PUBSUB_NOTIFICATION_RM },
{ "script put", OPT::SCRIPT_PUT },
{ "script get", OPT::SCRIPT_GET },
{ "script rm", OPT::SCRIPT_RM },
Expand Down Expand Up @@ -3463,6 +3473,7 @@ int main(int argc, const char **argv)
int trim_delay_ms = 0;

string topic_name;
string notification_id;
string sub_name;
string event_id;

Expand Down Expand Up @@ -3936,6 +3947,8 @@ int main(int argc, const char **argv)
trim_delay_ms = atoi(val.c_str());
} else if (ceph_argparse_witharg(args, i, &val, "--topic", (char*)NULL)) {
topic_name = val;
} else if (ceph_argparse_witharg(args, i, &val, "--notification-id", (char*)NULL)) {
notification_id = val;
} else if (ceph_argparse_witharg(args, i, &val, "--subscription", (char*)NULL)) {
sub_name = val;
} else if (ceph_argparse_witharg(args, i, &val, "--event-id", (char*)NULL)) {
Expand Down Expand Up @@ -4178,8 +4191,10 @@ int main(int argc, const char **argv)
OPT::ROLE_POLICY_GET,
OPT::RESHARD_LIST,
OPT::RESHARD_STATUS,
OPT::PUBSUB_TOPICS_LIST,
OPT::PUBSUB_TOPIC_LIST,
OPT::PUBSUB_NOTIFICATION_LIST,
OPT::PUBSUB_TOPIC_GET,
OPT::PUBSUB_NOTIFICATION_GET,
OPT::SCRIPT_GET,
};

Expand Down Expand Up @@ -4260,9 +4275,12 @@ int main(int argc, const char **argv)
&& opt_cmd != OPT::RESHARD_ADD
&& opt_cmd != OPT::RESHARD_CANCEL
&& opt_cmd != OPT::RESHARD_STATUS
&& opt_cmd != OPT::PUBSUB_TOPICS_LIST
&& opt_cmd != OPT::PUBSUB_TOPIC_LIST
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_LIST
&& opt_cmd != OPT::PUBSUB_TOPIC_GET
&& opt_cmd != OPT::PUBSUB_TOPIC_RM) {
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_GET
&& opt_cmd != OPT::PUBSUB_TOPIC_RM
&& opt_cmd != OPT::PUBSUB_NOTIFICATION_RM) {
cerr << "ERROR: --tenant is set, but there's no user ID" << std::endl;
return EINVAL;
}
Expand Down Expand Up @@ -10391,34 +10409,41 @@ int main(int argc, const char **argv)
}
}

if (opt_cmd == OPT::PUBSUB_TOPICS_LIST) {
if (opt_cmd == OPT::PUBSUB_NOTIFICATION_LIST) {
if (bucket_name.empty()) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
return EINVAL;
}

RGWPubSub ps(driver, tenant);

if (!bucket_name.empty()) {
rgw_pubsub_bucket_topics result;
int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
rgw_pubsub_bucket_topics result;
int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("result", result, formatter.get());
} else {
rgw_pubsub_topics result;
int ret = ps.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("result", result, formatter.get());
const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("result", result, formatter.get());
formatter->flush(cout);
}

if (opt_cmd == OPT::PUBSUB_TOPIC_LIST) {
RGWPubSub ps(driver, tenant);

rgw_pubsub_topics result;
int ret = ps.get_topics(dpp(), result, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get topics: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("result", result, formatter.get());
formatter->flush(cout);
}

Expand All @@ -10440,6 +10465,42 @@ int main(int argc, const char **argv)
formatter->flush(cout);
}

if (opt_cmd == OPT::PUBSUB_NOTIFICATION_GET) {
if (notification_id.empty()) {
cerr << "ERROR: notification-id was not provided (via --notification-id)" << std::endl;
return EINVAL;
}
if (bucket_name.empty()) {
cerr << "ERROR: bucket name was not provided (via --bucket)" << std::endl;
return EINVAL;
}

int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

RGWPubSub ps(driver, tenant);

rgw_pubsub_bucket_topics bucket_topics;
const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), bucket_topics, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

rgw_pubsub_topic_filter bucket_topic;
ret = b.get_notification_by_id(dpp(), notification_id, bucket_topic, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get notification: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
encode_json("notification", bucket_topic, formatter.get());
formatter->flush(cout);
}

if (opt_cmd == OPT::PUBSUB_TOPIC_RM) {
if (topic_name.empty()) {
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
Expand All @@ -10455,6 +10516,31 @@ int main(int argc, const char **argv)
}
}

if (opt_cmd == OPT::PUBSUB_NOTIFICATION_RM) {
int ret = init_bucket(user.get(), tenant, bucket_name, bucket_id, &bucket);
if (ret < 0) {
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

RGWPubSub ps(driver, tenant);

rgw_pubsub_bucket_topics bucket_topics;
const RGWPubSub::Bucket b(ps, bucket.get());
ret = b.get_topics(dpp(), bucket_topics, null_yield);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: could not get bucket notifications: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

rgw_pubsub_topic_filter bucket_topic;
if(notification_id.empty()) {
ret = b.remove_notifications(dpp(), null_yield);
} else {
ret = b.remove_notification_by_id(dpp(), notification_id, null_yield);
}
}

if (opt_cmd == OPT::SCRIPT_PUT) {
if (!str_script_ctx) {
cerr << "ERROR: context was not provided (via --context)" << std::endl;
Expand Down
97 changes: 93 additions & 4 deletions src/rgw/rgw_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
}
}

void rgw_s3_key_filter::dump(Formatter *f) const {
if (!prefix_rule.empty()) {
f->open_object_section("FilterRule");
::encode_json("Name", "prefix", f);
::encode_json("Value", prefix_rule, f);
f->close_section();
}
if (!suffix_rule.empty()) {
f->open_object_section("FilterRule");
::encode_json("Name", "suffix", f);
::encode_json("Value", suffix_rule, f);
f->close_section();
}
if (!regex_rule.empty()) {
f->open_object_section("FilterRule");
::encode_json("Name", "regex", f);
::encode_json("Value", regex_rule, f);
f->close_section();
}
}

bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
XMLObjIter iter = obj->find("FilterRule");
XMLObj *o;
Expand Down Expand Up @@ -75,6 +96,15 @@ bool rgw_s3_key_filter::has_content() const {
return !(prefix_rule.empty() && suffix_rule.empty() && regex_rule.empty());
}

void rgw_s3_key_value_filter::dump(Formatter *f) const {
for (const auto& key_value : kv) {
f->open_object_section("FilterRule");
::encode_json("Name", key_value.first, f);
::encode_json("Value", key_value.second, f);
f->close_section();
}
}

bool rgw_s3_key_value_filter::decode_xml(XMLObj* obj) {
kv.clear();
XMLObjIter iter = obj->find("FilterRule");
Expand Down Expand Up @@ -106,6 +136,12 @@ bool rgw_s3_key_value_filter::has_content() const {
return !kv.empty();
}

void rgw_s3_filter::dump(Formatter *f) const {
encode_json("S3Key", key_filter, f);
encode_json("S3Metadata", metadata_filter, f);
encode_json("S3Tags", tag_filter, f);
}

bool rgw_s3_filter::decode_xml(XMLObj* obj) {
RGWXMLDecoder::decode_xml("S3Key", key_filter, obj);
RGWXMLDecoder::decode_xml("S3Metadata", metadata_filter, obj);
Expand Down Expand Up @@ -343,13 +379,15 @@ void encode_json(const char *name, const rgw::notify::EventTypeList& l, Formatte

void rgw_pubsub_topic_filter::dump(Formatter *f) const
{
encode_json("topic", topic, f);
encode_json("events", events, f);
encode_json("TopicArn", topic.arn, f);
encode_json("Id", s3_id, f);
encode_json("Events", events, f);
encode_json("Filter", s3_filter, f);
}

void rgw_pubsub_bucket_topics::dump(Formatter *f) const
{
Formatter::ArraySection s(*f, "topics");
Formatter::ArraySection s(*f, "notifications");
for (auto& t : topics) {
encode_json(t.first.c_str(), t.second, f);
}
Expand Down Expand Up @@ -475,6 +513,35 @@ int RGWPubSub::get_topic(const DoutPrefixProvider *dpp, const std::string& name,
return 0;
}

// from list of bucket topics, find the one that was auto-generated by a notification
auto find_unique_topic(const rgw_pubsub_bucket_topics &bucket_topics, const std::string &notification_id) {
auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(),
[&](const auto& val) { return notification_id == val.second.s3_id; });
return it != bucket_topics.topics.end() ?
std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
std::nullopt;
}

int RGWPubSub::Bucket::get_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notification_id,
rgw_pubsub_topic_filter& result, optional_yield y) const {
rgw_pubsub_bucket_topics bucket_topics;
const int ret = read_topics(dpp, bucket_topics, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read bucket_topics info: ret=" << ret << dendl;
return ret;
}

auto iter = find_unique_topic(bucket_topics, notification_id);
if (!iter) {
ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
return -ENOENT;
}

result = iter->get();
return 0;
}


int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const std::string& topic_name,
const rgw::notify::EventTypeList& events, optional_yield y) const {
return create_notification(dpp, topic_name, events, std::nullopt, "", y);
Expand Down Expand Up @@ -523,6 +590,12 @@ int RGWPubSub::Bucket::create_notification(const DoutPrefixProvider *dpp, const
}

int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const std::string& topic_name, optional_yield y) const
{
return remove_notification_inner(dpp, topic_name, false, y);
}

int RGWPubSub::Bucket::remove_notification_inner(const DoutPrefixProvider *dpp, const std::string& notification_id,
bool is_notification_id, optional_yield y) const
{
RGWObjVersionTracker objv_tracker;
rgw_pubsub_bucket_topics bucket_topics;
Expand All @@ -533,7 +606,18 @@ int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const
return ret;
}

if (bucket_topics.topics.erase(topic_name) == 0) {

std::unique_ptr<std::string> topic_name = std::make_unique<std::string>(notification_id);
if(is_notification_id) {
auto iter = find_unique_topic(bucket_topics, notification_id);
if (!iter) {
ldpp_dout(dpp, 1) << "ERROR: notification was not found" << dendl;
return -ENOENT;
}
topic_name = std::make_unique<std::string>(iter->get().topic.name);
}

if (bucket_topics.topics.erase(*topic_name) == 0) {
ldpp_dout(dpp, 1) << "INFO: no need to remove, topic does not exist" << dendl;
return 0;
}
Expand All @@ -558,6 +642,11 @@ int RGWPubSub::Bucket::remove_notification(const DoutPrefixProvider *dpp, const
return 0;
}

int RGWPubSub::Bucket::remove_notification_by_id(const DoutPrefixProvider *dpp, const std::string& notif_id, optional_yield y) const
{
return remove_notification_inner(dpp, notif_id, true, y);
}

int RGWPubSub::Bucket::remove_notifications(const DoutPrefixProvider *dpp, optional_yield y) const
{
// get all topics on a bucket
Expand Down

0 comments on commit d071bbf

Please sign in to comment.