Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw/pubsub: fix doc on updates. fix multi-notifications #27931

Merged
merged 4 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/radosgw/pubsub-module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ This will create a new topic. Topic creation is needed both for both flavors of
Optionally the topic could be provided with push endpoint parameters that would be used later
when an S3-compatible notification is created.
Upon successful request, the response will include the topic ARN that could be later used to reference this topic in an S3-compatible notification request.
To update a topic, use the same command used for topic creation, with the topic name of an existing topic and different endpoint values.

.. tip:: Any S3-compatible notification already associated with the topic needs to be re-created for the topic update to take effect

::

Expand Down
16 changes: 11 additions & 5 deletions src/rgw/rgw_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,28 +247,34 @@ int RGWUserPubSub::Bucket::create_notification(const string& topic_name, const E

int ret = ps->get_topic(topic_name, &user_topic_info);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topic info: ret=" << ret << dendl;
ldout(store->ctx(), 1) << "ERROR: failed to read topic '" << topic_name << "' info: ret=" << ret << dendl;
return ret;
}
ldout(store->ctx(), 20) << "successfully read topic '" << topic_name << "' info" << dendl;

RGWObjVersionTracker objv_tracker;
rgw_pubsub_bucket_topics bucket_topics;

ret = read_topics(&bucket_topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read topics from bucket '" <<
bucket.name << "': ret=" << ret << dendl;
return ret;
}
ldout(store->ctx(), 20) << "successfully read " << bucket_topics.topics.size() << " topics from bucket '" <<
bucket.name << "'" << dendl;

auto& topic_filter = bucket_topics.topics[topic_name];
topic_filter.topic = user_topic_info.topic;
topic_filter.events = events;

ret = write_topics(bucket_topics, &objv_tracker);
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to write topics info: ret=" << ret << dendl;
ldout(store->ctx(), 1) << "ERROR: failed to write topics to bucket '" << bucket.name << "': ret=" << ret << dendl;
return ret;
}

ldout(store->ctx(), 20) << "successfully wrote " << bucket_topics.topics.size() << " topics to bucket '" << bucket.name << "'" << dendl;

return 0;
}
Expand All @@ -288,7 +294,7 @@ int RGWUserPubSub::Bucket::remove_notification(const string& topic_name)
rgw_pubsub_bucket_topics bucket_topics;

ret = read_topics(&bucket_topics, &objv_tracker);
if (ret < 0 && ret != -ENOENT) {
if (ret < 0) {
ldout(store->ctx(), 1) << "ERROR: failed to read bucket topics info: ret=" << ret << dendl;
return ret;
}
Expand Down
31 changes: 31 additions & 0 deletions src/rgw/rgw_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,15 +457,31 @@ class RGWUserPubSub
rgw_bucket bucket;
rgw_raw_obj bucket_meta_obj;

// read the list of topics associated with a bucket and populate into result
// use version tacker to enforce atomicity between read/write
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int read_topics(rgw_pubsub_bucket_topics *result, RGWObjVersionTracker *objv_tracker);
// set the list of topics associated with a bucket
// use version tacker to enforce atomicity between read/write
// return 0 on success, error code otherwise
int write_topics(const rgw_pubsub_bucket_topics& topics, RGWObjVersionTracker *objv_tracker);
public:
Bucket(RGWUserPubSub *_ps, const rgw_bucket& _bucket) : ps(_ps), bucket(_bucket) {
ps->get_bucket_meta_obj(bucket, &bucket_meta_obj);
}

// read the list of topics associated with a bucket and populate into result
// return 0 on success or if no topic was associated with the bucket, error code otherwise
int get_topics(rgw_pubsub_bucket_topics *result);
// adds a topic + filter (event list) to a bucket
// if the topic already exist on the bucket, the filter event list may be updated
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
int create_notification(const string& topic_name, const EventTypeList& events);
// remove a topic and filter from bucket
// if the topic does not exists on the bucket it is a no-op (considered success)
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
int remove_notification(const string& topic_name);
};

Expand Down Expand Up @@ -554,10 +570,24 @@ class RGWUserPubSub
*obj = rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, sub_meta_oid(name));
}

// get all topics defined for the user and populate them into "result"
// return 0 on success or if no topics exist, error code otherwise
int get_user_topics(rgw_pubsub_user_topics *result);
// get a topic by its name and populate it into "result"
// return -ENOENT if the topic does not exists
// return 0 on success, error code otherwise
int get_topic(const string& name, rgw_pubsub_topic_subs *result);
// create a topic with a name only
// if the topic already exists it is a no-op (considered success)
// return 0 on success, error code otherwise
int create_topic(const string& name);
// create a topic with push destination information and ARN
// if the topic already exists the destination and ARN values may be updated (considered succsess)
// return 0 on success, error code otherwise
int create_topic(const string& name, const rgw_pubsub_sub_dest& dest, const std::string& arn);
// remove a topic according to its name
// if the topic does not exists it is a no-op (considered success)
// return 0 on success, error code otherwise
int remove_topic(const string& name);
};

Expand Down Expand Up @@ -598,6 +628,7 @@ int RGWUserPubSub::write(const rgw_raw_obj& obj, const T& info, RGWObjVersionTra
return ret;
}

obj_ctx.invalidate(obj);
return 0;
}

Expand Down
34 changes: 23 additions & 11 deletions src/rgw/rgw_sync_module_pubsub_rest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ void RGWPSCreateTopicOp::execute()
ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->create_topic(topic_name, dest, topic_arn);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create topic, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
}

// command: PUT /topics/<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]
Expand Down Expand Up @@ -121,6 +122,7 @@ void RGWPSListTopicsOp::execute()
ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully got topics" << dendl;
}

// command: GET /topics
Expand Down Expand Up @@ -177,6 +179,7 @@ void RGWPSGetTopicOp::execute()
ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
}

// command: GET /topics/<topic-name>
Expand Down Expand Up @@ -235,9 +238,10 @@ void RGWPSDeleteTopicOp::execute()
ups = std::make_unique<RGWUserPubSub>(store, s->owner.get_id());
op_ret = ups->remove_topic(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove topic, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
}

// command: DELETE /topics/<topic-name>
Expand Down Expand Up @@ -324,9 +328,10 @@ void RGWPSCreateSubOp::execute()
auto sub = ups->get_sub(sub_name);
op_ret = sub->subscribe(topic_name, dest);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create subscription, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
}

// command: PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>[&<arg1>=<value1>]]...
Expand Down Expand Up @@ -387,9 +392,10 @@ void RGWPSGetSubOp::execute()
auto sub = ups->get_sub(sub_name);
op_ret = sub->get_conf(&result);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
}

// command: GET /subscriptions/<sub-name>
Expand Down Expand Up @@ -448,9 +454,10 @@ void RGWPSDeleteSubOp::execute()
auto sub = ups->get_sub(sub_name);
op_ret = sub->unsubscribe(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove subscription, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
}

// command: DELETE /subscriptions/<sub-name>
Expand Down Expand Up @@ -498,9 +505,10 @@ void RGWPSAckSubEventOp::execute()
auto sub = ups->get_sub_with_events(sub_name);
op_ret = sub->remove_event(event_id);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to ack event, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
}

// command: POST /subscriptions/<sub-name>?ack&event-id=<event-id>
Expand Down Expand Up @@ -561,14 +569,15 @@ void RGWPSPullSubEventsOp::execute()
sub = ups->get_sub_with_events(sub_name);
if (!sub) {
op_ret = -ENOENT;
ldout(s->cct, 1) << "failed to get subscription, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
return;
}
op_ret = sub->list_events(marker, max_entries);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to get subscription events, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
}

// command: GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
Expand Down Expand Up @@ -746,9 +755,10 @@ void RGWPSCreateNotif_ObjStore_Ceph::execute()
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->create_notification(topic_name, events);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to create notification, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to create notification for topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully created notification for topic '" << topic_name << "'" << dendl;
}

namespace {
Expand Down Expand Up @@ -923,7 +933,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
dest.bucket_name = string(conf["data_bucket_prefix"]) + s->owner.get_id().to_str() + "-" + unique_topic_name;
dest.oid_prefix = string(conf["data_oid_prefix"]) + sub_name + "/";
auto sub = ups->get_sub(sub_name);
op_ret = sub->subscribe(unique_topic_name, dest, c.id);
op_ret = sub->subscribe(unique_topic_name, dest, sub_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to auto-generate subscription '" << sub_name << "', ret=" << op_ret << dendl;
// rollback generated notification (ignore return value)
Expand All @@ -932,6 +942,7 @@ void RGWPSCreateNotif_ObjStore_S3::execute() {
ups->remove_topic(unique_topic_name);
return;
}
ldout(s->cct, 20) << "successfully auto-generated subscription '" << sub_name << "'" << dendl;
}
}

Expand Down Expand Up @@ -1001,9 +1012,10 @@ void RGWPSDeleteNotif_ObjStore_Ceph::execute() {
auto b = ups->get_bucket(bucket_info.bucket);
op_ret = b->remove_notification(topic_name);
if (op_ret < 0) {
ldout(s->cct, 1) << "failed to remove notification, ret=" << op_ret << dendl;
ldout(s->cct, 1) << "failed to remove notification from topic '" << topic_name << "', ret=" << op_ret << dendl;
return;
}
ldout(s->cct, 20) << "successfully removed notification from topic '" << topic_name << "'" << dendl;
}

// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/services/svc_sys_obj_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class RGWSysObjectCtxBase {
assert (!obj.empty());
objs_state[obj].prefetch_data = true;
}
void invalidate(rgw_raw_obj& obj) {
void invalidate(const rgw_raw_obj& obj) {
RWLock::WLocker wl(lock);
auto iter = objs_state.find(obj);
if (iter == objs_state.end()) {
Expand Down