Skip to content

Commit

Permalink
rgw/amqp: fix the "routable" delivery mode
Browse files Browse the repository at this point in the history
this option was not exposed to the configuration API
however, it was still set, as hardcoded value in the code
(details:
https://www.rabbitmq.com/confirms.html#publisher-confirms)

Fixes: https://tracker.ceph.com/issues/44915

Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
  • Loading branch information
yuvalif committed Apr 2, 2020
1 parent a01b4df commit c26074b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 110 deletions.
5 changes: 3 additions & 2 deletions doc/radosgw/notifications.rst
Expand Up @@ -67,7 +67,7 @@ To update a topic, use the same command used for topic creation, with the topic
&Name=<topic-name>
&push-endpoint=<endpoint>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
[&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
Expand All @@ -93,10 +93,11 @@ Request parameters:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down
10 changes: 6 additions & 4 deletions doc/radosgw/pubsub-module.rst
Expand Up @@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic

::

PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]

Request parameters:

Expand All @@ -173,10 +173,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down Expand Up @@ -348,7 +349,7 @@ Creates a new subscription.

::

PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]

Request parameters:

Expand All @@ -370,10 +371,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down
107 changes: 69 additions & 38 deletions src/rgw/rgw_amqp.cc
Expand Up @@ -72,7 +72,7 @@ struct connection_id_t {
};

std::string to_string(const connection_id_t& id) {
return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
return id.host+":"+std::to_string(id.port)+id.vhost;
}

// connection_t state cleaner
Expand Down Expand Up @@ -124,6 +124,8 @@ struct connection_t {
mutable std::atomic<int> ref_count;
CephContext* cct;
CallbackList callbacks;
std::chrono::system_clock::time_point next_reconnect;
bool mandatory;

// default ctor
connection_t() :
Expand All @@ -135,7 +137,10 @@ struct connection_t {
reply_type(AMQP_RESPONSE_NORMAL),
reply_code(RGW_AMQP_NO_REPLY_CODE),
ref_count(0),
cct(nullptr) {}
cct(nullptr),
next_reconnect(std::chrono::system_clock::now()),
mandatory(false)
{}

// cleanup of all internal connection resource
// the object can still remain, and internal connection
Expand Down Expand Up @@ -489,12 +494,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio

// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
const std::string& exchange, CephContext* cct) {
const std::string& exchange, bool mandatory_delivery, CephContext* cct) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->user.assign(info.user);
conn->password.assign(info.password);
conn->mandatory = mandatory_delivery;
conn->cct = cct;
return create_connection(conn, info);
}
Expand Down Expand Up @@ -542,6 +548,8 @@ class Manager {
CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
const std::chrono::system_clock::duration idle_time;
const std::chrono::system_clock::duration reconnect_time;

void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_owner(message);
Expand All @@ -563,9 +571,9 @@ class Manager {
CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
1, // mandatory, TODO: take from conf
0, // does not have to be routable
0, // not immediate
nullptr,
nullptr, // no properties needed
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
Expand All @@ -589,7 +597,7 @@ class Manager {
CONFIRMING_CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
1, // mandatory, TODO: take from conf
conn->mandatory,
0, // not immediate
&props,
amqp_cstring_bytes(message->message.c_str()));
Expand Down Expand Up @@ -653,21 +661,26 @@ class Manager {

// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
// pointers are used temporarily inside the amqp_connection_info object
// as read-only values, hence the assignment, and const_cast are safe here
amqp_connection_info info;
info.host = const_cast<char*>(conn_it->first.host.c_str());
info.port = conn_it->first.port;
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
} else {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
const auto now = std::chrono::system_clock::now();
if (now >= conn->next_reconnect) {
// pointers are used temporarily inside the amqp_connection_info object
// as read-only values, hence the assignment, and const_cast are safe here
amqp_connection_info info;
info.host = const_cast<char*>(conn_it->first.host.c_str());
info.port = conn_it->first.port;
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
conn->next_reconnect = now + reconnect_time;
} else {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
}
}
INCREMENT_AND_CONTINUE(conn_it);
}
Expand All @@ -693,9 +706,9 @@ class Manager {
}

if (frame.frame_type != AMQP_FRAME_METHOD) {
ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
<< unsigned(frame.frame_type) << dendl;
// handler is for publish confirmation only - handle only method frames
// TODO: add a counter
INCREMENT_AND_CONTINUE(conn_it);
}

Expand All @@ -722,6 +735,14 @@ class Manager {
multiple = nack->multiple;
break;
}
case AMQP_BASIC_REJECT_METHOD:
{
result = RGW_AMQP_STATUS_BROKER_NACK;
const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
tag = reject->delivery_tag;
multiple = false;
break;
}
case AMQP_CONNECTION_CLOSE_METHOD:
// TODO on channel close, no need to reopen the connection
case AMQP_CHANNEL_CLOSE_METHOD:
Expand All @@ -733,13 +754,11 @@ class Manager {
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
// TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
// TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
Expand All @@ -764,7 +783,6 @@ class Manager {
conn->callbacks.erase(tag_it);
}
} else {
// TODO add counter for acks with no callback
ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
Expand All @@ -787,6 +805,8 @@ class Manager {
size_t _max_inflight,
size_t _max_queue,
long _usec_timeout,
unsigned reconnect_time_ms,
unsigned idle_time_ms,
CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
Expand All @@ -799,7 +819,9 @@ class Manager {
queued(0),
dequeued(0),
cct(_cct),
runner(&Manager::run, this) {
runner(&Manager::run, this),
idle_time(idle_time_ms),
reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
// This is to prevent rehashing so that iterators are not invalidated
Expand Down Expand Up @@ -829,9 +851,8 @@ class Manager {
}

// connect to a broker, or reuse an existing connection if already connected
connection_ptr_t connect(const std::string& url, const std::string& exchange) {
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
}
Expand All @@ -840,7 +861,6 @@ class Manager {
// cache the URL so that parsing could happen in-place
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
return nullptr;
}
Expand All @@ -850,11 +870,9 @@ class Manager {
const auto it = connections.find(id);
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
return nullptr;
} else if (it->second->exchange != exchange) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
Expand All @@ -865,11 +883,14 @@ class Manager {

// connection not found, creating a new one
if (connection_count >= max_connections) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
const auto conn = create_new_connection(info, exchange, cct);
const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
if (!conn->is_ok()) {
ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
}
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
Expand All @@ -885,15 +906,18 @@ class Manager {
const std::string& topic,
const std::string& message) {
if (stopped) {
ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
ldout(cct, 1) << "AMQP publish: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
++queued;
return AMQP_STATUS_OK;
}
ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}

Expand All @@ -902,15 +926,18 @@ class Manager {
const std::string& message,
reply_callback_t cb) {
if (stopped) {
ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
++queued;
return AMQP_STATUS_OK;
}
ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}

Expand Down Expand Up @@ -956,13 +983,17 @@ static Manager* s_manager = nullptr;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
static const long READ_TIMEOUT_USEC = 100;
static const unsigned IDLE_TIME_MS = 100;
static const unsigned RECONNECT_TIME_MS = 100;

bool init(CephContext* cct) {
if (s_manager) {
return false;
}
// TODO: take conf from CephContext
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
return true;
}

Expand All @@ -971,9 +1002,9 @@ void shutdown() {
s_manager = nullptr;
}

connection_ptr_t connect(const std::string& url, const std::string& exchange) {
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (!s_manager) return nullptr;
return s_manager->connect(url, exchange);
return s_manager->connect(url, exchange, mandatory_delivery);
}

int publish(connection_ptr_t& conn,
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_amqp.h
Expand Up @@ -30,7 +30,7 @@ bool init(CephContext* cct);
void shutdown();

// connect to an amqp endpoint
connection_ptr_t connect(const std::string& url, const std::string& exchange);
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);

// publish a message over a connection that was already created
int publish(connection_ptr_t& conn,
Expand Down

0 comments on commit c26074b

Please sign in to comment.