diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index fefe56a0eb331..3762e89075ef4 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -71,7 +71,7 @@ To update a topic, use the same command used for topic creation, with the topic Action=CreateTopic &Name= [&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=] - [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker] + [&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable] [&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false] [&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker] [&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false] @@ -98,10 +98,11 @@ Request parameters: - port defaults to: 5672 - vhost defaults to: "/" - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1) - - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist: + - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist: - "none": message is considered "delivered" if sent to broker - "broker": message is considered "delivered" if acked by broker (default) + - "routable": message is considered "delivered" if broker can route to a consumer - Kafka endpoint diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 61cd4def207ff..fd3b9f021e64e 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -150,7 +150,7 @@ To update a topic, use the same command used for topic creation, with the topic :: - PUT /topics/[?OpaqueData=][&push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] + PUT /topics/[?OpaqueData=][&push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&use-ssl=true|false][&ca-location=]] Request parameters: @@ -173,10 +173,11 @@ The endpoint URI may include parameters depending with the type of endpoint: - port defaults to: 5672 - vhost defaults to: "/" - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1) - - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist: + - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist: - "none": message is considered "delivered" if sent to broker - "broker": message is considered "delivered" if acked by broker (default) + - "routable": message is considered "delivered" if broker can route to a consumer - Kafka endpoint @@ -348,7 +349,7 @@ Creates a new subscription. :: - PUT /subscriptions/?topic=[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=]] + PUT /subscriptions/?topic=[?push-endpoint=[&amqp-exchange=][&amqp-ack-level=none|broker|routable][&verify-ssl=true|false][&kafka-ack-level=none|broker][&ca-location=]] Request parameters: @@ -370,10 +371,11 @@ The endpoint URI may include parameters depending with the type of endpoint: - port defaults to: 5672 - vhost defaults to: "/" - amqp-exchange: the exchanges must exist and be able to route messages based on topics (mandatory parameter for AMQP0.9.1) - - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Two ack methods exist: + - amqp-ack-level: no end2end acking is required, as messages may persist in the broker before delivered into their final destination. Three ack methods exist: - "none": message is considered "delivered" if sent to broker - "broker": message is considered "delivered" if acked by broker (default) + - "routable": message is considered "delivered" if broker can route to a consumer - Kafka endpoint diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index a5f6ca00d84f4..78446e88f68e6 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -72,7 +72,7 @@ struct connection_id_t { }; std::string to_string(const connection_id_t& id) { - return id.host+":"+std::to_string(id.port)+"/"+id.vhost; + return id.host+":"+std::to_string(id.port)+id.vhost; } // connection_t state cleaner @@ -124,6 +124,8 @@ struct connection_t { mutable std::atomic ref_count; CephContext* cct; CallbackList callbacks; + ceph::coarse_real_clock::time_point next_reconnect; + bool mandatory; // default ctor connection_t() : @@ -135,7 +137,10 @@ struct connection_t { reply_type(AMQP_RESPONSE_NORMAL), reply_code(RGW_AMQP_NO_REPLY_CODE), ref_count(0), - cct(nullptr) {} + cct(nullptr), + next_reconnect(ceph::coarse_real_clock::now()), + mandatory(false) + {} // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -489,12 +494,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio // utility function to create a new connection connection_ptr_t create_new_connection(const amqp_connection_info& info, - const std::string& exchange, CephContext* cct) { + const std::string& exchange, bool mandatory_delivery, CephContext* cct) { // create connection state connection_ptr_t conn = new connection_t; conn->exchange = exchange; conn->user.assign(info.user); conn->password.assign(info.password); + conn->mandatory = mandatory_delivery; conn->cct = cct; return create_connection(conn, info); } @@ -542,6 +548,8 @@ class Manager { CephContext* const cct; mutable std::mutex connections_lock; std::thread runner; + const ceph::coarse_real_clock::duration idle_time; + const ceph::coarse_real_clock::duration reconnect_time; void publish_internal(message_wrapper_t* message) { const std::unique_ptr msg_owner(message); @@ -563,9 +571,9 @@ class Manager { CHANNEL_ID, amqp_cstring_bytes(conn->exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), - 1, // mandatory, TODO: take from conf + 0, // does not have to be routable 0, // not immediate - nullptr, + nullptr, // no properties needed amqp_cstring_bytes(message->message.c_str())); if (rc == AMQP_STATUS_OK) { ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl; @@ -589,7 +597,7 @@ class Manager { CONFIRMING_CHANNEL_ID, amqp_cstring_bytes(conn->exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), - 1, // mandatory, TODO: take from conf + conn->mandatory, 0, // not immediate &props, amqp_cstring_bytes(message->message.c_str())); @@ -653,21 +661,26 @@ class Manager { // try to reconnect the connection if it has an error if (!conn->is_ok()) { - // pointers are used temporarily inside the amqp_connection_info object - // as read-only values, hence the assignment, and const_cast are safe here - amqp_connection_info info; - info.host = const_cast(conn_it->first.host.c_str()); - info.port = conn_it->first.port; - info.vhost = const_cast(conn_it->first.vhost.c_str()); - info.user = const_cast(conn->user.c_str()); - info.password = const_cast(conn->password.c_str()); - ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; - if (create_connection(conn, info)->is_ok() == false) { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl; - // TODO: add error counter for failed retries - // TODO: add exponential backoff for retries - } else { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl; + const auto now = ceph::coarse_real_clock::now(); + if (now >= conn->next_reconnect) { + // pointers are used temporarily inside the amqp_connection_info object + // as read-only values, hence the assignment, and const_cast are safe here + amqp_connection_info info; + info.host = const_cast(conn_it->first.host.c_str()); + info.port = conn_it->first.port; + info.vhost = const_cast(conn_it->first.vhost.c_str()); + info.user = const_cast(conn->user.c_str()); + info.password = const_cast(conn->password.c_str()); + ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; + if (create_connection(conn, info)->is_ok() == false) { + ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " << + status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl; + // TODO: add error counter for failed retries + // TODO: add exponential backoff for retries + conn->next_reconnect = now + reconnect_time; + } else { + ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl; + } } INCREMENT_AND_CONTINUE(conn_it); } @@ -693,9 +706,9 @@ class Manager { } if (frame.frame_type != AMQP_FRAME_METHOD) { - ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl; + ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " + << unsigned(frame.frame_type) << dendl; // handler is for publish confirmation only - handle only method frames - // TODO: add a counter INCREMENT_AND_CONTINUE(conn_it); } @@ -722,6 +735,14 @@ class Manager { multiple = nack->multiple; break; } + case AMQP_BASIC_REJECT_METHOD: + { + result = RGW_AMQP_STATUS_BROKER_NACK; + const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded; + tag = reject->delivery_tag; + multiple = false; + break; + } case AMQP_CONNECTION_CLOSE_METHOD: // TODO on channel close, no need to reopen the connection case AMQP_CHANNEL_CLOSE_METHOD: @@ -733,13 +754,11 @@ class Manager { } case AMQP_BASIC_RETURN_METHOD: // message was not delivered, returned to sender - // TODO: add a counter - ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl; + ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl; INCREMENT_AND_CONTINUE(conn_it); break; default: // unexpected method - // TODO: add a counter ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl; INCREMENT_AND_CONTINUE(conn_it); } @@ -764,7 +783,6 @@ class Manager { conn->callbacks.erase(tag_it); } } else { - // TODO add counter for acks with no callback ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl; } // just increment the iterator @@ -772,7 +790,7 @@ class Manager { } // if no messages were received or published, sleep for 100ms if (count == 0 && !incoming_message) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(idle_time); } } } @@ -787,6 +805,8 @@ class Manager { size_t _max_inflight, size_t _max_queue, long _usec_timeout, + unsigned reconnect_time_ms, + unsigned idle_time_ms, CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), @@ -799,7 +819,9 @@ class Manager { queued(0), dequeued(0), cct(_cct), - runner(&Manager::run, this) { + runner(&Manager::run, this), + idle_time(std::chrono::milliseconds(idle_time_ms)), + reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) { // The hashmap has "max connections" as the initial number of buckets, // and allows for 10 collisions per bucket before rehash. // This is to prevent rehashing so that iterators are not invalidated @@ -829,9 +851,8 @@ class Manager { } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange) { + connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { if (stopped) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; return nullptr; } @@ -840,7 +861,6 @@ class Manager { // cache the URL so that parsing could happen in-place std::vector url_cache(url.c_str(), url.c_str()+url.size()+1); if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl; return nullptr; } @@ -850,11 +870,9 @@ class Manager { const auto it = connections.find(id); if (it != connections.end()) { if (it->second->marked_for_deletion) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl; return nullptr; } else if (it->second->exchange != exchange) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl; return nullptr; } @@ -865,11 +883,14 @@ class Manager { // connection not found, creating a new one if (connection_count >= max_connections) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(info, exchange, cct); + const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct); + if (!conn->is_ok()) { + ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" << + status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl; + } // create_new_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread @@ -885,15 +906,18 @@ class Manager { const std::string& topic, const std::string& message) { if (stopped) { + ldout(cct, 1) << "AMQP publish: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } if (!conn || !conn->is_ok()) { + ldout(cct, 1) << "AMQP publish: no connection" << dendl; return RGW_AMQP_STATUS_CONNECTION_CLOSED; } if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { ++queued; return AMQP_STATUS_OK; } + ldout(cct, 1) << "AMQP publish: queue is full" << dendl; return RGW_AMQP_STATUS_QUEUE_FULL; } @@ -902,15 +926,18 @@ class Manager { const std::string& message, reply_callback_t cb) { if (stopped) { + ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } if (!conn || !conn->is_ok()) { + ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl; return RGW_AMQP_STATUS_CONNECTION_CLOSED; } if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { ++queued; return AMQP_STATUS_OK; } + ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl; return RGW_AMQP_STATUS_QUEUE_FULL; } @@ -956,13 +983,17 @@ static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; +static const long READ_TIMEOUT_USEC = 100; +static const unsigned IDLE_TIME_MS = 100; +static const unsigned RECONNECT_TIME_MS = 100; bool init(CephContext* cct) { if (s_manager) { return false; } // TODO: take conf from CephContext - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct); + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, + READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct); return true; } @@ -971,9 +1002,9 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url, const std::string& exchange) { +connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { if (!s_manager) return nullptr; - return s_manager->connect(url, exchange); + return s_manager->connect(url, exchange, mandatory_delivery); } int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_amqp.h b/src/rgw/rgw_amqp.h index bbfce2d5dcd66..eaf97ed9dc01a 100644 --- a/src/rgw/rgw_amqp.h +++ b/src/rgw/rgw_amqp.h @@ -30,7 +30,7 @@ bool init(CephContext* cct); void shutdown(); // connect to an amqp endpoint -connection_ptr_t connect(const std::string& url, const std::string& exchange); +connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery); // publish a message over a connection that was already created int publish(connection_ptr_t& conn, diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 6230330d4a6a3..0f927e4f46fc3 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -170,11 +170,10 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { const std::string endpoint; const std::string topic; const std::string exchange; - amqp::connection_ptr_t conn; ack_level_t ack_level; - std::string str_ack_level; + amqp::connection_ptr_t conn; - static std::string get_exchange(const RGWHTTPArgs& args) { + std::string get_exchange(const RGWHTTPArgs& args) { bool exists; const auto exchange = args.get("amqp-exchange", &exists); if (!exists) { @@ -183,6 +182,22 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { return exchange; } + ack_level_t get_ack_level(const RGWHTTPArgs& args) { + bool exists; + const auto& str_ack_level = args.get("amqp-ack-level", &exists); + if (!exists || str_ack_level == "broker") { + // "broker" is default + return ack_level_t::Broker; + } + if (str_ack_level == "none") { + return ack_level_t::None; + } + if (str_ack_level == "routable") { + return ack_level_t::Routable; + } + throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); + } + // NoAckPublishCR implements async amqp publishing via coroutine // This coroutine ends when it send the message and does not wait for an ack class NoAckPublishCR : public RGWCoroutine { @@ -220,16 +235,14 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { const std::string topic; amqp::connection_ptr_t conn; const std::string message; - [[maybe_unused]] const ack_level_t ack_level; // TODO not used for now public: AckPublishCR(CephContext* cct, const std::string& _topic, amqp::connection_ptr_t& _conn, - const std::string& _message, - ack_level_t _ack_level) : + const std::string& _message) : RGWCoroutine(cct), - topic(_topic), conn(_conn), message(_message), ack_level(_ack_level) {} + topic(_topic), conn(_conn), message(_message) {} // send message to endpoint, waiting for reply int operate() override { @@ -273,7 +286,7 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { return nullptr; } }; - + public: RGWPubSubAMQPEndpoint(const std::string& _endpoint, const std::string& _topic, @@ -283,23 +296,11 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { endpoint(_endpoint), topic(_topic), exchange(get_exchange(args)), - conn(amqp::connect(endpoint, exchange)) { + ack_level(get_ack_level(args)), + conn(amqp::connect(endpoint, exchange, (ack_level == ack_level_t::Broker))) { if (!conn) { throw configuration_error("AMQP: failed to create connection to: " + endpoint); } - bool exists; - // get ack level - str_ack_level = args.get("amqp-ack-level", &exists); - if (!exists || str_ack_level == "broker") { - // "broker" is default - ack_level = ack_level_t::Broker; - } else if (str_ack_level == "none") { - ack_level = ack_level_t::None; - } else if (str_ack_level == "routable") { - ack_level = ack_level_t::Routable; - } else { - throw configuration_error("AMQP: invalid amqp-ack-level: " + str_ack_level); - } } RGWCoroutine* send_to_completion_async(const rgw_pubsub_event& event, RGWDataSyncEnv* env) override { @@ -307,9 +308,7 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { if (ack_level == ack_level_t::None) { return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } else { - // TODO: currently broker and routable are the same - this will require different flags - // but the same mechanism - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event), ack_level); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event)); } } @@ -318,9 +317,7 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { if (ack_level == ack_level_t::None) { return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); } else { - // TODO: currently broker and routable are the same - this will require different flags - // but the same mechanism - return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record), ack_level); + return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record)); } } @@ -405,7 +402,6 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { str += "\nURI: " + endpoint; str += "\nTopic: " + topic; str += "\nExchange: " + exchange; - str += "\nAck Level: " + str_ack_level; return str; } }; @@ -428,7 +424,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { kafka::connection_ptr_t conn; const ack_level_t ack_level; - static bool get_verify_ssl(const RGWHTTPArgs& args) { + bool get_verify_ssl(const RGWHTTPArgs& args) { bool exists; auto str_verify_ssl = args.get("verify-ssl", &exists); if (!exists) { @@ -445,7 +441,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { throw configuration_error("'verify-ssl' must be true/false, not: " + str_verify_ssl); } - static bool get_use_ssl(const RGWHTTPArgs& args) { + bool get_use_ssl(const RGWHTTPArgs& args) { bool exists; auto str_use_ssl = args.get("use-ssl", &exists); if (!exists) { @@ -462,7 +458,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint { throw configuration_error("'use-ssl' must be true/false, not: " + str_use_ssl); } - static ack_level_t get_ack_level(const RGWHTTPArgs& args) { + ack_level_t get_ack_level(const RGWHTTPArgs& args) { bool exists; // get ack level const auto str_ack_level = args.get("kafka-ack-level", &exists); diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 79d2ef95b7fb1..b5c3396dfc490 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -1375,7 +1375,7 @@ def test_ps_s3_notification_push_amqp_on_master(): topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() # without acks from broker - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification @@ -2883,7 +2883,7 @@ def test_ps_s3_metadata_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -2984,7 +2984,7 @@ def test_ps_s3_tags_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=routable' topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification diff --git a/src/test/rgw/test_rgw_amqp.cc b/src/test/rgw/test_rgw_amqp.cc index 0f11b817e8d19..13bab823f02cf 100644 --- a/src/test/rgw/test_rgw_amqp.cc +++ b/src/test/rgw/test_rgw_amqp.cc @@ -61,7 +61,7 @@ class TestAMQP : public ::testing::Test { TEST_F(TestAMQP, ConnectionOK) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost", "ex1"); + conn = amqp::connect("amqp://localhost", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -70,10 +70,10 @@ TEST_F(TestAMQP, ConnectionOK) TEST_F(TestAMQP, ConnectionReuse) { - amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1"); + amqp::connection_ptr_t conn1 = amqp::connect("amqp://localhost", "ex1", false); EXPECT_TRUE(conn1); const auto connection_number = amqp::get_connection_count(); - amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1"); + amqp::connection_ptr_t conn2 = amqp::connect("amqp://localhost", "ex1", false); EXPECT_TRUE(conn2); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn1, "topic", "message"); @@ -83,7 +83,7 @@ TEST_F(TestAMQP, ConnectionReuse) TEST_F(TestAMQP, NameResolutionFail) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://kaboom", "ex1"); + conn = amqp::connect("amqp://kaboom", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -93,7 +93,7 @@ TEST_F(TestAMQP, NameResolutionFail) TEST_F(TestAMQP, InvalidPort) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:1234", "ex1"); + conn = amqp::connect("amqp://localhost:1234", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -103,7 +103,7 @@ TEST_F(TestAMQP, InvalidPort) TEST_F(TestAMQP, InvalidHost) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://0.0.0.1", "ex1"); + conn = amqp::connect("amqp://0.0.0.1", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -113,7 +113,7 @@ TEST_F(TestAMQP, InvalidHost) TEST_F(TestAMQP, InvalidVhost) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost/kaboom", "ex1"); + conn = amqp::connect("amqp://localhost/kaboom", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -125,7 +125,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.1"); { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1"); + conn = amqp::connect("amqp://foo:bar@127.0.0.1", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -135,7 +135,7 @@ TEST_F(TestAMQP, UserPassword) amqp_mock::set_valid_host("127.0.0.2"); { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1"); + conn = amqp::connect("amqp://guest:guest@127.0.0.2", "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -147,7 +147,7 @@ TEST_F(TestAMQP, UserPassword) TEST_F(TestAMQP, URLParseError) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex1"); + conn = amqp::connect("http://localhost", "ex1", false); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -157,7 +157,7 @@ TEST_F(TestAMQP, URLParseError) TEST_F(TestAMQP, ExchangeMismatch) { const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("http://localhost", "ex2"); + conn = amqp::connect("http://localhost", "ex2", false); EXPECT_FALSE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number); auto rc = amqp::publish(conn, "topic", "message"); @@ -172,7 +172,7 @@ TEST_F(TestAMQP, MaxConnections) while (remaining_connections > 0) { const auto host = "127.10.0." + std::to_string(remaining_connections); amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_EQ(rc, 0); @@ -184,7 +184,7 @@ TEST_F(TestAMQP, MaxConnections) { const std::string host = "toomany"; amqp_mock::set_valid_host(host); - amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1"); + amqp::connection_ptr_t conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_FALSE(conn); auto rc = amqp::publish(conn, "topic", "message"); EXPECT_LT(rc, 0); @@ -246,7 +246,7 @@ TEST_F(TestAMQP, ReceiveAck) callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_ack); EXPECT_EQ(rc, 0); @@ -260,7 +260,7 @@ TEST_F(TestAMQP, ImplicitConnectionClose) callback_invoked = false; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); const auto NUMBER_OF_CALLS = 2000; for (auto i = 0; i < NUMBER_OF_CALLS; ++i) { @@ -278,7 +278,7 @@ TEST_F(TestAMQP, ReceiveMultipleAck) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); const auto NUMBER_OF_CALLS = 100; for (auto i=0; i < NUMBER_OF_CALLS; ++i) { @@ -296,7 +296,7 @@ TEST_F(TestAMQP, ReceiveAckForMultiple) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -315,7 +315,7 @@ TEST_F(TestAMQP, DynamicCallback) callbacks_invoked = 0; const std::string host("localhost1"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); amqp_mock::set_multiple(59); const auto NUMBER_OF_CALLS = 100; @@ -336,7 +336,7 @@ TEST_F(TestAMQP, ReceiveNack) amqp_mock::REPLY_ACK = false; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -353,7 +353,7 @@ TEST_F(TestAMQP, FailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost2"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0); @@ -370,7 +370,7 @@ TEST_F(TestAMQP, ClosedConnection) const auto current_connections = amqp::get_connection_count(); const std::string host("localhost3"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), current_connections + 1); EXPECT_TRUE(amqp::disconnect(conn)); @@ -389,7 +389,7 @@ TEST_F(TestAMQP, RetryInvalidHost) { const std::string host = "192.168.0.1"; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://"+host, "ex1"); + conn = amqp::connect("amqp://"+host, "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -406,7 +406,7 @@ TEST_F(TestAMQP, RetryInvalidPort) { const int port = 9999; const auto connection_number = amqp::get_connection_count(); - conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1"); + conn = amqp::connect("amqp://localhost:" + std::to_string(port), "ex1", false); EXPECT_TRUE(conn); EXPECT_EQ(amqp::get_connection_count(), connection_number + 1); auto rc = amqp::publish(conn, "topic", "message"); @@ -425,7 +425,7 @@ TEST_F(TestAMQP, RetryFailWrite) amqp_mock::FAIL_NEXT_WRITE = true; const std::string host("localhost4"); amqp_mock::set_valid_host(host); - conn = amqp::connect("amqp://" + host, "ex1"); + conn = amqp::connect("amqp://" + host, "ex1", false); EXPECT_TRUE(conn); auto rc = publish_with_confirm(conn, "topic", "message", my_callback_expect_nack); EXPECT_EQ(rc, 0);