Skip to content

Commit

Permalink
rgw/pubsub: rgw_pubsub_dest stores persistent queue oid
Browse files Browse the repository at this point in the history
Signed-off-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed Apr 10, 2024
1 parent ba2566a commit 3ef1ab3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/rgw/rgw_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ void rgw_pubsub_dest::dump(Formatter *f) const
encode_json("push_endpoint_topic", arn_topic, f);
encode_json("stored_secret", stored_secret, f);
encode_json("persistent", persistent, f);
encode_json("persistent_queue", persistent_queue, f);
encode_json("time_to_live", time_to_live!=DEFAULT_GLOBAL_VALUE? std::to_string(time_to_live): DEFAULT_CONFIG, f);
encode_json("max_retries", max_retries!=DEFAULT_GLOBAL_VALUE? std::to_string(max_retries): DEFAULT_CONFIG, f);
encode_json("retry_sleep_duration", retry_sleep_duration!=DEFAULT_GLOBAL_VALUE? std::to_string(retry_sleep_duration): DEFAULT_CONFIG, f);
Expand Down Expand Up @@ -525,6 +526,7 @@ void rgw_pubsub_dest::decode_json(JSONObj* f) {
JSONDecoder::decode_json("push_endpoint_topic", arn_topic, f);
JSONDecoder::decode_json("stored_secret", stored_secret, f);
JSONDecoder::decode_json("persistent", persistent, f);
JSONDecoder::decode_json("persistent_queue", persistent_queue, f);
std::string ttl;
JSONDecoder::decode_json("time_to_live", ttl, f);
time_to_live = ttl == DEFAULT_CONFIG ? DEFAULT_GLOBAL_VALUE : std::stoul(ttl);
Expand Down
12 changes: 11 additions & 1 deletion src/rgw/rgw_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ struct rgw_pubsub_dest {
std::string arn_topic;
bool stored_secret = false;
bool persistent = false;
// rados object name of the persistent queue in the 'notif' pool
std::string persistent_queue;
uint32_t time_to_live;
uint32_t max_retries;
uint32_t retry_sleep_duration;

void encode(bufferlist& bl) const {
ENCODE_START(6, 1, bl);
ENCODE_START(7, 1, bl);
encode("", bl);
encode("", bl);
encode(push_endpoint, bl);
Expand All @@ -358,6 +360,7 @@ struct rgw_pubsub_dest {
encode(time_to_live, bl);
encode(max_retries, bl);
encode(retry_sleep_duration, bl);
encode(persistent_queue, bl);
ENCODE_FINISH(bl);
}

Expand All @@ -384,6 +387,13 @@ struct rgw_pubsub_dest {
decode(max_retries, bl);
decode(retry_sleep_duration, bl);
}
if (struct_v >= 7) {
decode(persistent_queue, bl);
} else if (persistent) {
// persistent topics created before v7 did not support tenant namespacing.
// continue to use 'arn_topic' alone as the queue's rados object name
persistent_queue = arn_topic;
}
DECODE_FINISH(bl);
}

Expand Down
6 changes: 6 additions & 0 deletions src/rgw/rgw_rest_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,12 @@ class RGWPSCreateTopicOp : public RGWOp {
ret = ps.get_topic(this, topic_name, result, y, nullptr);
if (ret == -ENOENT) {
// topic not present

// initialize the persistent queue's location. this cannot change for
// existing topics. use ':' as the namespace delimiter because its
// inclusion in a TopicName would break ARNs
dest.persistent_queue = string_cat_reserve(
get_account_or_tenant(s->owner.id), ":", topic_name);
} else if (ret < 0) {
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
<< "', with error:" << ret << dendl;
Expand Down

0 comments on commit 3ef1ab3

Please sign in to comment.