Skip to content

Commit

Permalink
rgw/pubsub: add/remove_persistent_topic() takes topic queue, not name
Browse files Browse the repository at this point in the history
Signed-off-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed Apr 10, 2024
1 parent a75c3ac commit ba2566a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
36 changes: 18 additions & 18 deletions src/rgw/driver/rados/rgw_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,36 +730,36 @@ class Manager : public DoutPrefixProvider {
ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
}

int add_persistent_topic(const std::string& topic_name, optional_yield y) {
if (topic_name == Q_LIST_OBJECT_NAME) {
int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (topic_queue == Q_LIST_OBJECT_NAME) {
ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
return -EINVAL;
}
librados::ObjectWriteOperation op;
op.create(true);
cls_2pc_queue_init(op, topic_name, max_queue_size);
cls_2pc_queue_init(op, topic_queue, max_queue_size);
auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y);
if (ret == -EEXIST) {
// queue already exists - nothing to do
ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to create queue
ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
return ret;
}

bufferlist empty_bl;
std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
op.omap_set(new_topic);
ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
return ret;
}
ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
return 0;
}
};
Expand Down Expand Up @@ -805,37 +805,37 @@ int add_persistent_topic(const std::string& topic_name, optional_yield y) {
return s_manager->add_persistent_topic(topic_name, y);
}

int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) {
librados::ObjectWriteOperation op;
op.remove();
auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y);
if (ret == -ENOENT) {
// queue already removed - nothing to do
ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already removed. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to remove queue
ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_queue << ". error: " << ret << dendl;
return ret;
}

std::set<std::string> topic_to_remove{{topic_name}};
std::set<std::string> topic_to_remove{{topic_queue}};
op.omap_rm_keys(topic_to_remove);
ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_queue << " from queue list. error: " << ret << dendl;
return ret;
}
ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " removed from queue list" << dendl;
return 0;
}

int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_name, y);
return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}

rgw::sal::Object* get_object_with_attributes(
Expand Down
10 changes: 5 additions & 5 deletions src/rgw/driver/rados/rgw_notify.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ bool init(CephContext* cct, rgw::sal::RadosStore* store,
void shutdown();

// create persistent delivery queue for a topic (endpoint)
// this operation also add a topic name to the common (to all RGWs) list of all topics
int add_persistent_topic(const std::string& topic_name, optional_yield y);
// this operation also add a topic queue to the common (to all RGWs) list of all topics
int add_persistent_topic(const std::string& topic_queue, optional_yield y);

// remove persistent delivery queue for a topic (endpoint)
// this operation also remove the topic name from the common (to all RGWs) list of all topics
int remove_persistent_topic(const std::string& topic_name, optional_yield y);
// this operation also remove the topic queue from the common (to all RGWs) list of all topics
int remove_persistent_topic(const std::string& topic_queue, optional_yield y);

// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y);
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);

// struct holding reservation information
// populated in the publish_reserve call
Expand Down

0 comments on commit ba2566a

Please sign in to comment.