-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rgw/notification: Fix the caching issues of notification brokers, where the cache was not invalidated if topic attributes were changed #57537
base: main
Are you sure you want to change the base?
Conversation
02558db
to
a4307f1
Compare
src/rgw/driver/rados/rgw_notify.cc
Outdated
EntryProcessingResult process_entry(const ConfigProxy &conf, | ||
persistency_tracker &entry_persistency_tracker, | ||
const cls_queue_entry &entry, | ||
RGWPubSubEndpoint *push_endpoint, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: const RGWPubSubEndpoint* push_endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
src/rgw/driver/rados/rgw_notify.cc
Outdated
int create_push_endpoint(const std::string &queue_name, | ||
const std::vector<cls_queue_entry> &entries, | ||
spawn::yield_context yield, | ||
RGWPubSubEndpoint::Ptr *push_endpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why passing a pointer to a unique_ptr?
you can just pass the bare pointer from the unique_ptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or even better. since the caller ignore the return value from this function, you can return the unique_ptr.
you can return a unique_ptr initialized with nullptr in case of error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the caller does honor the return value, we do not process the entries if we cannot create the push_endpoint.
and i am passing the unique_ptr by ref and its created inside the function.
i do not want to verify based on whether the function returned nullptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. so please pass by ref, not a pointer. IMO, pointer to a unique_ptr is too confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the code to pass unique_ptr
src/rgw/driver/rados/rgw_notify.cc
Outdated
RGWPubSubEndpoint::Ptr push_endpoint; | ||
if (create_push_endpoint(queue_name, entries, yield, &push_endpoint) | ||
< 0) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that you stop processing of the queue. i think we should continue queue processing unless we lose ownership of queue is removed.
this will alllow users to fix conf errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i mean how can we continue processing the queue (entries) as we were not able to create the endpoint.
since no entries are processed, we just exit from the function ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you exit from the function, nothing will reschedule the processing of the queue and the rgw will release ownership.
handling should be similar to the other error cases (="continue"), and wait for the problem to be resolved.
note that we will not be scheduling the coroutine too often in this case, with the timer we have in line 426.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/rgw/rgw_amqp.cc
Outdated
@@ -53,14 +53,29 @@ static const int RGW_AMQP_NO_REPLY_CODE = 0x0; | |||
|
|||
// the amqp_connection_info struct does not hold any memory and just points to the URL string | |||
// so, strings are copied into connection_id_t | |||
connection_id_t::connection_id_t(const amqp_connection_info& info, const std::string& _exchange) | |||
: host(info.host), port(info.port), vhost(info.vhost), exchange(_exchange), ssl(info.ssl) {} | |||
connection_id_t::connection_id_t( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why changing the amqp code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coz amqp too faces same issue, if for some reason there is typo in the username and password, coz the connection is pooled, the new connection never uses the new userid and passwd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
amqp code needs a major rewrite (there are several issues fixed for kafka that still exist in amqp).
also, we would need to test that.
would recommend splitting amqp work to another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am afraid this will be forgotten while we refactor the amqp
since we are doing this for kafka, why not just do it for amqp as well, coz amqp also faces the same issue ? and its similar code changes so its logical to be part of same commit ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree that this is needed there. but there are many other (critical) fixes we did on the kafka code that are needed there as well.
main reason that i want it in a separate PR is testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, removed amqp code.
please add tests for both fixes:
|
This pull request can no longer be automatically merged: a rebase is needed and changes have to be manually resolved |
@@ -342,7 +338,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { | |||
|
|||
std::string to_str() const override { | |||
std::string str("Kafka Endpoint"); | |||
str += "\nBroker: " + conn_name; | |||
str += "\nBroker: " + conn_id.broker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
str += "\nBroker: " + conn_id.broker; | |
str += "\nBroker: " + to_string(conn_id); |
please rebase and resolve conflicts only after: #57630 |
…ate Pushendpoint and stop calling RGWPubSubEndpoint::create for every event. Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
a4307f1
to
fbb6ad5
Compare
… key along with endpoint for connection pooling. For kafka, currently connection pooling is done based on endpoint, so all the events with same endpoint share the same connection. but there are issues when userid & password is created/changed for the endpoint, coz the old connection is cached in broker manager and when new event with updated/new userid-password is sent, broker still uses the old connection that was created with old/no userid/password as currently only the `endpoint` is the key to connection pool. To fix this, use all the topic attributes that are part of connection as the key to connection pool and if any of the attribute changes create new kafka connection. Attibutes include userid, password, ssl, ca_laction. Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
fbb6ad5
to
abaf8b8
Compare
added both test cases. |
rebased and updated the commits. |
Fixes https://tracker.ceph.com/issues/66036
Checklist
Show available Jenkins commands
jenkins retest this please
jenkins test classic perf
jenkins test crimson perf
jenkins test signed
jenkins test make check
jenkins test make check arm64
jenkins test submodules
jenkins test dashboard
jenkins test dashboard cephadm
jenkins test api
jenkins test docs
jenkins render docs
jenkins test ceph-volume all
jenkins test ceph-volume tox
jenkins test windows
jenkins test rook e2e