Skip to content

Commit

Permalink
Merge pull request #34376 from yuvalif/fix_amqp_routable_option
Browse files Browse the repository at this point in the history
rgw/amqp: fix the "routable" delivery mode

Reviewed-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed May 1, 2020
2 parents fcf222a + 73db460 commit e68c60a
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 105 deletions.
5 changes: 3 additions & 2 deletions doc/radosgw/notifications.rst
Expand Up @@ -71,7 +71,7 @@ To update a topic, use the same command used for topic creation, with the topic
Action=CreateTopic
&Name=<topic-name>
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
[&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
Expand All @@ -98,10 +98,11 @@ Request parameters:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down
10 changes: 6 additions & 4 deletions doc/radosgw/pubsub-module.rst
Expand Up @@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic

::

PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]
PUT /topics/<topic-name>[?OpaqueData=<opaque data>][&push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=<file path>]]

Request parameters:

Expand All @@ -173,10 +173,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down Expand Up @@ -348,7 +349,7 @@ Creates a new subscription.

::

PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]
PUT /subscriptions/<sub-name>?topic=<topic-name>[?push-endpoint=<endpoint>[&amqp-exchange=<exchange>][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=<file path>]]

Request parameters:

Expand All @@ -370,10 +371,11 @@ The endpoint URI may include parameters depending with the type of endpoint:
- port defaults to: 5672
- vhost defaults to: "/"
- amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1)
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist:
- amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist:

- "none": message is considered "delivered" if sent to broker
- "broker": message is considered "delivered" if acked by broker (default)
- "routable": message is considered "delivered" if broker can route to a consumer

- Kafka endpoint

Expand Down
109 changes: 70 additions & 39 deletions src/rgw/rgw_amqp.cc
Expand Up @@ -72,7 +72,7 @@ struct connection_id_t {
};

std::string to_string(const connection_id_t& id) {
return id.host+":"+std::to_string(id.port)+"/"+id.vhost;
return id.host+":"+std::to_string(id.port)+id.vhost;
}

// connection_t state cleaner
Expand Down Expand Up @@ -124,6 +124,8 @@ struct connection_t {
mutable std::atomic<int> ref_count;
CephContext* cct;
CallbackList callbacks;
ceph::coarse_real_clock::time_point next_reconnect;
bool mandatory;

// default ctor
connection_t() :
Expand All @@ -135,7 +137,10 @@ struct connection_t {
reply_type(AMQP_RESPONSE_NORMAL),
reply_code(RGW_AMQP_NO_REPLY_CODE),
ref_count(0),
cct(nullptr) {}
cct(nullptr),
next_reconnect(ceph::coarse_real_clock::now()),
mandatory(false)
{}

// cleanup of all internal connection resource
// the object can still remain, and internal connection
Expand Down Expand Up @@ -489,12 +494,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio

// utility function to create a new connection
connection_ptr_t create_new_connection(const amqp_connection_info& info,
const std::string& exchange, CephContext* cct) {
const std::string& exchange, bool mandatory_delivery, CephContext* cct) {
// create connection state
connection_ptr_t conn = new connection_t;
conn->exchange = exchange;
conn->user.assign(info.user);
conn->password.assign(info.password);
conn->mandatory = mandatory_delivery;
conn->cct = cct;
return create_connection(conn, info);
}
Expand Down Expand Up @@ -542,6 +548,8 @@ class Manager {
CephContext* const cct;
mutable std::mutex connections_lock;
std::thread runner;
const ceph::coarse_real_clock::duration idle_time;
const ceph::coarse_real_clock::duration reconnect_time;

void publish_internal(message_wrapper_t* message) {
const std::unique_ptr<message_wrapper_t> msg_owner(message);
Expand All @@ -563,9 +571,9 @@ class Manager {
CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
1, // mandatory, TODO: take from conf
0, // does not have to be routable
0, // not immediate
nullptr,
nullptr, // no properties needed
amqp_cstring_bytes(message->message.c_str()));
if (rc == AMQP_STATUS_OK) {
ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl;
Expand All @@ -589,7 +597,7 @@ class Manager {
CONFIRMING_CHANNEL_ID,
amqp_cstring_bytes(conn->exchange.c_str()),
amqp_cstring_bytes(message->topic.c_str()),
1, // mandatory, TODO: take from conf
conn->mandatory,
0, // not immediate
&props,
amqp_cstring_bytes(message->message.c_str()));
Expand Down Expand Up @@ -653,21 +661,26 @@ class Manager {

// try to reconnect the connection if it has an error
if (!conn->is_ok()) {
// pointers are used temporarily inside the amqp_connection_info object
// as read-only values, hence the assignment, and const_cast are safe here
amqp_connection_info info;
info.host = const_cast<char*>(conn_it->first.host.c_str());
info.port = conn_it->first.port;
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
} else {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
const auto now = ceph::coarse_real_clock::now();
if (now >= conn->next_reconnect) {
// pointers are used temporarily inside the amqp_connection_info object
// as read-only values, hence the assignment, and const_cast are safe here
amqp_connection_info info;
info.host = const_cast<char*>(conn_it->first.host.c_str());
info.port = conn_it->first.port;
info.vhost = const_cast<char*>(conn_it->first.vhost.c_str());
info.user = const_cast<char*>(conn->user.c_str());
info.password = const_cast<char*>(conn->password.c_str());
ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl;
if (create_connection(conn, info)->is_ok() == false) {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " <<
status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl;
// TODO: add error counter for failed retries
// TODO: add exponential backoff for retries
conn->next_reconnect = now + reconnect_time;
} else {
ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl;
}
}
INCREMENT_AND_CONTINUE(conn_it);
}
Expand All @@ -693,9 +706,9 @@ class Manager {
}

if (frame.frame_type != AMQP_FRAME_METHOD) {
ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl;
ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: "
<< unsigned(frame.frame_type) << dendl;
// handler is for publish confirmation only - handle only method frames
// TODO: add a counter
INCREMENT_AND_CONTINUE(conn_it);
}

Expand All @@ -722,6 +735,14 @@ class Manager {
multiple = nack->multiple;
break;
}
case AMQP_BASIC_REJECT_METHOD:
{
result = RGW_AMQP_STATUS_BROKER_NACK;
const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded;
tag = reject->delivery_tag;
multiple = false;
break;
}
case AMQP_CONNECTION_CLOSE_METHOD:
// TODO on channel close, no need to reopen the connection
case AMQP_CHANNEL_CLOSE_METHOD:
Expand All @@ -733,13 +754,11 @@ class Manager {
}
case AMQP_BASIC_RETURN_METHOD:
// message was not delivered, returned to sender
// TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl;
ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
break;
default:
// unexpected method
// TODO: add a counter
ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl;
INCREMENT_AND_CONTINUE(conn_it);
}
Expand All @@ -764,15 +783,14 @@ class Manager {
conn->callbacks.erase(tag_it);
}
} else {
// TODO add counter for acks with no callback
ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl;
}
// just increment the iterator
++conn_it;
}
// if no messages were received or published, sleep for 100ms
if (count == 0 && !incoming_message) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(idle_time);
}
}
}
Expand All @@ -787,6 +805,8 @@ class Manager {
size_t _max_inflight,
size_t _max_queue,
long _usec_timeout,
unsigned reconnect_time_ms,
unsigned idle_time_ms,
CephContext* _cct) :
max_connections(_max_connections),
max_inflight(_max_inflight),
Expand All @@ -799,7 +819,9 @@ class Manager {
queued(0),
dequeued(0),
cct(_cct),
runner(&Manager::run, this) {
runner(&Manager::run, this),
idle_time(std::chrono::milliseconds(idle_time_ms)),
reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) {
// The hashmap has "max connections" as the initial number of buckets,
// and allows for 10 collisions per bucket before rehash.
// This is to prevent rehashing so that iterators are not invalidated
Expand Down Expand Up @@ -829,9 +851,8 @@ class Manager {
}

// connect to a broker, or reuse an existing connection if already connected
connection_ptr_t connect(const std::string& url, const std::string& exchange) {
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (stopped) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl;
return nullptr;
}
Expand All @@ -840,7 +861,6 @@ class Manager {
// cache the URL so that parsing could happen in-place
std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1);
if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl;
return nullptr;
}
Expand All @@ -850,11 +870,9 @@ class Manager {
const auto it = connections.find(id);
if (it != connections.end()) {
if (it->second->marked_for_deletion) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl;
return nullptr;
} else if (it->second->exchange != exchange) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl;
return nullptr;
}
Expand All @@ -865,11 +883,14 @@ class Manager {

// connection not found, creating a new one
if (connection_count >= max_connections) {
// TODO: increment counter
ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl;
return nullptr;
}
const auto conn = create_new_connection(info, exchange, cct);
const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct);
if (!conn->is_ok()) {
ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" <<
status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl;
}
// create_new_connection must always return a connection object
// even if error occurred during creation.
// in such a case the creation will be retried in the main thread
Expand All @@ -885,15 +906,18 @@ class Manager {
const std::string& topic,
const std::string& message) {
if (stopped) {
ldout(cct, 1) << "AMQP publish: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
ldout(cct, 1) << "AMQP publish: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) {
++queued;
return AMQP_STATUS_OK;
}
ldout(cct, 1) << "AMQP publish: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}

Expand All @@ -902,15 +926,18 @@ class Manager {
const std::string& message,
reply_callback_t cb) {
if (stopped) {
ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl;
return RGW_AMQP_STATUS_MANAGER_STOPPED;
}
if (!conn || !conn->is_ok()) {
ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl;
return RGW_AMQP_STATUS_CONNECTION_CLOSED;
}
if (messages.push(new message_wrapper_t(conn, topic, message, cb))) {
++queued;
return AMQP_STATUS_OK;
}
ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl;
return RGW_AMQP_STATUS_QUEUE_FULL;
}

Expand Down Expand Up @@ -956,13 +983,17 @@ static Manager* s_manager = nullptr;
static const size_t MAX_CONNECTIONS_DEFAULT = 256;
static const size_t MAX_INFLIGHT_DEFAULT = 8192;
static const size_t MAX_QUEUE_DEFAULT = 8192;
static const long READ_TIMEOUT_USEC = 100;
static const unsigned IDLE_TIME_MS = 100;
static const unsigned RECONNECT_TIME_MS = 100;

bool init(CephContext* cct) {
if (s_manager) {
return false;
}
// TODO: take conf from CephContext
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct);
s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT,
READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct);
return true;
}

Expand All @@ -971,9 +1002,9 @@ void shutdown() {
s_manager = nullptr;
}

connection_ptr_t connect(const std::string& url, const std::string& exchange) {
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) {
if (!s_manager) return nullptr;
return s_manager->connect(url, exchange);
return s_manager->connect(url, exchange, mandatory_delivery);
}

int publish(connection_ptr_t& conn,
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_amqp.h
Expand Up @@ -30,7 +30,7 @@ bool init(CephContext* cct);
void shutdown();

// connect to an amqp endpoint
connection_ptr_t connect(const std::string& url, const std::string& exchange);
connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery);

// publish a message over a connection that was already created
int publish(connection_ptr_t& conn,
Expand Down

0 comments on commit e68c60a

Please sign in to comment.