Skip to content

Commit

Permalink
rgw/pubsub: use rgw_pubsub_dest::persistent_queue for queue oid
Browse files Browse the repository at this point in the history
Signed-off-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed Apr 10, 2024
1 parent 4c50ad6 commit 4bac81a
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 34 deletions.
9 changes: 5 additions & 4 deletions src/rgw/driver/rados/rgw_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,18 +1241,19 @@ int publish_abort(reservation_t& res) {
return 0;
}

int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &topic_name, rgw_topic_stats &stats, optional_yield y)
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &queue_name, rgw_topic_stats &stats, optional_yield y)
{
// TODO: use optional_yield instead calling rados_ioctx.operate() synchronously
cls_2pc_reservations reservations;
auto ret = cls_2pc_queue_list_reservations(rados_ioctx, topic_name, reservations);
auto ret = cls_2pc_queue_list_reservations(rados_ioctx, queue_name, reservations);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to read queue list reservation: " << ret << dendl;
return ret;
}
stats.queue_reservations = reservations.size();

ret = cls_2pc_queue_get_topic_stats(rados_ioctx, topic_name, stats.queue_entries, stats.queue_size);
ret = cls_2pc_queue_get_topic_stats(rados_ioctx, queue_name, stats.queue_entries, stats.queue_size);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get the queue size or the number of entries: " << ret << dendl;
return ret;
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/driver/rados/rgw_notify.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ int publish_commit(rgw::sal::Object* obj,
// cancel the reservation
int publish_abort(reservation_t& reservation);

int get_persistent_queue_stats_by_topic_name(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &topic_name, rgw_topic_stats &stats, optional_yield y);
int get_persistent_queue_stats(const DoutPrefixProvider *dpp, librados::IoCtx &rados_ioctx,
const std::string &queue_name, rgw_topic_stats &stats, optional_yield y);

}

34 changes: 23 additions & 11 deletions src/rgw/driver/rados/topic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,12 @@ class MetadataHandler : public RGWMetadataHandler {
if (r < 0) {
return r;
}
if (!info.dest.push_endpoint.empty() && info.dest.persistent) {
r = rgw::notify::add_persistent_topic(info.name, y);
if (!info.dest.push_endpoint.empty() && info.dest.persistent &&
!info.dest.persistent_queue.empty()) {
r = rgw::notify::add_persistent_topic(info.dest.persistent_queue, y);
if (r < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to create queue for persistent topic "
<< info.name << " with: " << cpp_strerror(r) << dendl;
<< info.dest.persistent_queue << " with: " << cpp_strerror(r) << dendl;
return r;
}
}
Expand All @@ -370,18 +371,29 @@ class MetadataHandler : public RGWMetadataHandler {
std::string tenant;
parse_topic_metadata_key(entry, tenant, name);

int r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
tenant, name, objv_tracker);
rgw_pubsub_topic info;
int r = read(dpp, y, sysobj, cache_svc, zone, entry,
info, cache, nullptr, &objv_tracker);
if (r < 0) {
return r;
}

// delete persistent topic queue. expect ENOENT for non-persistent topics
r = rgw::notify::remove_persistent_topic(name, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
<< name << " with error: " << r << dendl;
} // not fatal
r = topic::remove(dpp, y, sysobj, &mdlog, rados, zone,
tenant, name, objv_tracker);
if (r < 0) {
return r;
}

const rgw_pubsub_dest& dest = info.dest;
if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
// delete persistent topic queue
r = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, 1) << "Failed to delete queue for persistent topic: "
<< name << " with error: " << r << dendl;
} // not fatal
}
return 0;
}

Expand Down
20 changes: 17 additions & 3 deletions src/rgw/rgw_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11245,11 +11245,25 @@ int main(int argc, const char **argv)
cerr << "ERROR: topic name was not provided (via --topic)" << std::endl;
return EINVAL;
}
const std::string& account = !account_id.empty() ? account_id : tenant;
RGWPubSub ps(driver, account, *site);

rgw_pubsub_topic topic;
ret = ps.get_topic(dpp(), topic_name, topic, null_yield, nullptr);
if (ret < 0) {
cerr << "ERROR: could not get topic: " << cpp_strerror(-ret) << std::endl;
return -ret;
}

if (topic.dest.persistent_queue.empty()) {
cerr << "This topic does not have a persistent queue." << std::endl;
return ENOENT;
}

rgw::notify::rgw_topic_stats stats;
ret = rgw::notify::get_persistent_queue_stats_by_topic_name(
dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(), topic_name,
stats, null_yield);
ret = rgw::notify::get_persistent_queue_stats(
dpp(), static_cast<rgw::sal::RadosStore *>(driver)->getRados()->get_notif_pool_ctx(),
topic.dest.persistent_queue, stats, null_yield);
if (ret < 0) {
cerr << "ERROR: could not get persistent queue: " << cpp_strerror(-ret) << std::endl;
return -ret;
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ int RGWPubSub::remove_topic_v2(const DoutPrefixProvider* dpp,
const rgw_pubsub_dest& dest = topic.dest;
if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
ret = rgw::notify::remove_persistent_topic(topic.name, y);
ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
"persistent topic: " << cpp_strerror(ret) << dendl;
Expand Down Expand Up @@ -1138,7 +1138,7 @@ int RGWPubSub::remove_topic(const DoutPrefixProvider *dpp, const std::string& na

if (!dest.push_endpoint.empty() && dest.persistent &&
!dest.persistent_queue.empty()) {
ret = rgw::notify::remove_persistent_topic(name, y);
ret = rgw::notify::remove_persistent_topic(dest.persistent_queue, y);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: failed to remove queue for "
"persistent topic: " << cpp_strerror(ret) << dendl;
Expand Down
39 changes: 27 additions & 12 deletions src/rgw/rgw_rest_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
return false;
}

static bool topic_needs_queue(const rgw_pubsub_dest& dest)
{
return !dest.push_endpoint.empty() && dest.persistent;
}

auto get_policy_from_text(req_state* const s, const std::string& policy_text)
-> boost::optional<rgw::IAM::Policy>
{
Expand Down Expand Up @@ -306,12 +311,6 @@ class RGWPSCreateTopicOp : public RGWOp {
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present

// initialize the persistent queue's location. this cannot change for
// existing topics. use ':' as the namespace delimiter because its
// inclusion in a TopicName would break ARNs
dest.persistent_queue = string_cat_reserve(
get_account_or_tenant(s->owner.id), ":", topic_name);
} else if (ret < 0) {
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
<< "', with error:" << ret << dendl;
Expand Down Expand Up @@ -387,8 +386,16 @@ void RGWPSCreateTopicOp::execute(optional_yield y) {
return;
}
}
if (!dest.push_endpoint.empty() && dest.persistent) {
op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);

// don't add a persistent queue if we already have one
const bool already_persistent = topic && topic_needs_queue(topic->dest);
if (!already_persistent && topic_needs_queue(dest)) {
// initialize the persistent queue's location, using ':' as the namespace
// delimiter because its inclusion in a TopicName would break ARNs
dest.persistent_queue = string_cat_reserve(
get_account_or_tenant(s->owner.id), ":", topic_name);

op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
if (op_ret < 0) {
ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for "
"persistent topics. error:"
Expand Down Expand Up @@ -838,17 +845,25 @@ void RGWPSSetTopicAttributesOp::execute(optional_yield y) {
return;
}
}
if (!dest.push_endpoint.empty() && dest.persistent) {
op_ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
// don't add a persistent queue if we already have one
const bool already_persistent = topic_needs_queue(result.dest);
if (!already_persistent && topic_needs_queue(dest)) {
// initialize the persistent queue's location, using ':' as the namespace
// delimiter because its inclusion in a TopicName would break ARNs
dest.persistent_queue = string_cat_reserve(
get_account_or_tenant(s->owner.id), ":", topic_name);

op_ret = rgw::notify::add_persistent_topic(dest.persistent_queue, s->yield);
if (op_ret < 0) {
ldpp_dout(this, 4)
<< "SetTopicAttributes Action failed to create queue for "
"persistent topics. error:"
<< op_ret << dendl;
return;
}
} else { // changing the persistent topic to non-persistent.
op_ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
} else if (already_persistent) {
// changing the persistent topic to non-persistent.
op_ret = rgw::notify::remove_persistent_topic(result.dest.persistent_queue, s->yield);
if (op_ret != -ENOENT && op_ret < 0) {
ldpp_dout(this, 4) << "SetTopicAttributes Action failed to remove queue "
"for persistent topics. error:"
Expand Down

0 comments on commit 4bac81a

Please sign in to comment.