Skip to content

Commit

Permalink
rgw/notification: start/stop endpoint managers in notification manager
Browse files Browse the repository at this point in the history
Fixes: https://tracker.ceph.com/issues/65337

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
  • Loading branch information
yuvalif committed Apr 18, 2024
1 parent 926eb10 commit c4d2dcf
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 73 deletions.
111 changes: 74 additions & 37 deletions src/rgw/driver/rados/rgw_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void publish_commit_completion(rados_completion_t completion, void *arg) {
};

class Manager : public DoutPrefixProvider {
bool shutdown = false;
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queues_update_retry_ms;
Expand Down Expand Up @@ -305,7 +306,7 @@ class Manager : public DoutPrefixProvider {

// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, spawn::yield_context yield) {
while (true) {
while (!shutdown) {
ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
const auto now = ceph::coarse_real_time::clock::now();
const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
Expand Down Expand Up @@ -340,6 +341,23 @@ class Manager : public DoutPrefixProvider {
boost::system::error_code ec;
timer.async_wait(yield[ec]);
}
ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
}

// unlock (lose ownership) queue
int unlock_queue(const std::string& queue_name, librados::IoCtx& ioctx) {
const auto ret = rados::cls::lock::unlock(&ioctx, queue_name, queue_name+"_lock", lock_cookie);
if (ret == -ENOENT) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". was removed. nothing to unlock" << dendl;
return 0;
}
if (ret == -EBUSY) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". already owned by another RGW. no need to unlock" << dendl;
return 0;
}
return ret;
}

// processing of a specific queue
Expand All @@ -355,7 +373,7 @@ class Manager : public DoutPrefixProvider {

CountersManager queue_counters_container(queue_name, this->get_cct());

while (true) {
while (!shutdown) {
// if queue was empty the last time, sleep for idle timeout
if (is_idle) {
Timer timer(io_context);
Expand Down Expand Up @@ -582,6 +600,7 @@ class Manager : public DoutPrefixProvider {
queue_counters_container.set(l_rgw_persistent_topic_size, entries_size);
}
}
ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queue: " << queue_name << dendl;
}

// lits of owned queues
Expand All @@ -603,7 +622,8 @@ class Manager : public DoutPrefixProvider {

std::vector<std::string> queue_gc;
std::mutex queue_gc_lock;
while (true) {
auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
while (!shutdown) {
Timer timer(io_context);
const auto duration = (has_error ?
std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
Expand Down Expand Up @@ -634,7 +654,7 @@ class Manager : public DoutPrefixProvider {
failover_time,
LOCK_FLAG_MAY_RENEW);

ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -EBUSY) {
// lock is already taken by another RGW
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
Expand Down Expand Up @@ -679,16 +699,50 @@ class Manager : public DoutPrefixProvider {
queue_gc.clear();
}
}
for (const auto& queue_name : owned_queues) {
const auto ret = unlock_queue(queue_name, rados_ioctx);
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: failed to unlock queue: " << queue_name << ". ownership would still move if not renewed" << dendl;
}
}
ldpp_dout(this, 5) << "INFO: manager stopped. done processing for queues" << dendl;
}

public:

~Manager() {
work_guard.reset();
}

void stop() {
shutdown = true;
io_context.stop();
work_guard.reset();
std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
}

void init() {
spawn::spawn(io_context, [this](spawn::yield_context yield) {
process_queues(yield);
}, make_stack_allocator());

// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
workers.emplace_back([this]() {
try {
io_context.run();
} catch (const std::exception& err) {
ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
throw err;
}
});
const auto rc = ceph_pthread_setname(workers.back().native_handle(),
(WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
ceph_assert(rc == 0);
}
ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
}

// ctor: start all threads
Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
Expand All @@ -708,28 +762,7 @@ class Manager : public DoutPrefixProvider {
reservations_cleanup_period_s(_reservations_cleanup_period_s),
site(site),
rados_store(*store)
{
spawn::spawn(io_context, [this](spawn::yield_context yield) {
process_queues(yield);
}, make_stack_allocator());

// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
workers.emplace_back([this]() {
try {
io_context.run();
} catch (const std::exception& err) {
ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
throw(err);
}
});
const auto rc = ceph_pthread_setname(workers.back().native_handle(),
(WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
ceph_assert(rc == 0);
}
ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
}
{}

int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (topic_queue == Q_LIST_OBJECT_NAME) {
Expand Down Expand Up @@ -765,10 +798,7 @@ class Manager : public DoutPrefixProvider {
}
};

// singleton manager
// note that the manager itself is not a singleton, and multiple instances may co-exist
// TODO make the pointer atomic in allocation and deallocation to avoid race conditions
static Manager* s_manager = nullptr;
std::unique_ptr<Manager> s_manager;

constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
Expand All @@ -779,24 +809,31 @@ constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds

bool init(CephContext* cct, rgw::sal::RadosStore* store,
const SiteConfig& site, const DoutPrefixProvider *dpp) {
bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
const SiteConfig& site) {
if (s_manager) {
ldpp_dout(dpp, 1) << "ERROR: failed to init notification manager: already exists" << dendl;
return false;
}
if (!RGWPubSubEndpoint::init_all(dpp->get_cct())) {
return false;
}
// TODO: take conf from CephContext
s_manager = new Manager(cct, MAX_QUEUE_SIZE,
s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE,
Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
WORKER_COUNT,
store, site);
s_manager->init();
return true;
}

void shutdown() {
delete s_manager;
s_manager = nullptr;
if (!s_manager) return;
RGWPubSubEndpoint::shutdown_all();
s_manager->stop();
s_manager.reset();
}

int add_persistent_topic(const std::string& topic_name, optional_yield y) {
Expand Down Expand Up @@ -836,7 +873,7 @@ int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
return remove_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}

rgw::sal::Object* get_object_with_attributes(
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/driver/rados/rgw_notify.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ namespace rgw::notify {
// initialize the notification manager
// notification manager is dequeuing the 2-phase-commit queues
// and send the notifications to the endpoints
bool init(CephContext* cct, rgw::sal::RadosStore* store,
const rgw::SiteConfig& site, const DoutPrefixProvider *dpp);
bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
const rgw::SiteConfig& site);

// shutdown the notification manager
void shutdown();
Expand Down
58 changes: 57 additions & 1 deletion src/rgw/driver/rados/rgw_pubsub_push.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <string>
#include <sstream>
#include <algorithm>
#include <curl/curl.h>
#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
Expand All @@ -23,6 +24,8 @@
#include <functional>
#include "rgw_perf_counters.h"

#define dout_subsys ceph_subsys_rgw_notification

using namespace rgw;

template<typename EventType>
Expand Down Expand Up @@ -52,6 +55,8 @@ bool get_bool(const RGWHTTPArgs& args, const std::string& name, bool default_val
return value;
}

static std::unique_ptr<RGWHTTPManager> s_http_manager;

class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
private:
CephContext* const cct;
Expand Down Expand Up @@ -83,6 +88,10 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
}

int send(const rgw_pubsub_s3_event& event, optional_yield y) override {
if (!s_http_manager) {
ldout(cct, 1) << "ERROR: send failed. http endpoint manager not running" << dendl;
return -ESRCH;
}
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
const auto post_data = json_format_pubsub_event(event);
Expand All @@ -101,7 +110,10 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
request.set_send_length(post_data.length());
request.append_header("Content-Type", "application/json");
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
const auto rc = RGWHTTP::process(&request, y);
auto rc = s_http_manager->add_request(&request);
if (rc == 0) {
rc = request.wait(y);
}
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
// TODO: use read_bl to process return code and handle according to ack level
return rc;
Expand Down Expand Up @@ -393,3 +405,47 @@ RGWPubSubEndpoint::Ptr RGWPubSubEndpoint::create(const std::string& endpoint,
return nullptr;
}

bool init_http_manager(CephContext* cct) {
if (s_http_manager) return false;
curl_global_init(CURL_GLOBAL_ALL);
s_http_manager = std::make_unique<RGWHTTPManager>(cct);
return (s_http_manager->start() == 0);
}

void shutdown_http_manager() {
if (s_http_manager) {
s_http_manager->stop();
s_http_manager.reset();
}
}

bool RGWPubSubEndpoint::init_all(CephContext* cct) {
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
if (!amqp::init(cct)) {
ldout(cct, 1) << "ERROR: failed to init amqp endpoint manager" << dendl;
return false;
}
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
if (!kafka::init(cct)) {
ldout(cct, 1) << "ERROR: failed to init kafka endpoint manager" << dendl;
return false;
}
#endif
if (!init_http_manager(cct)) {
ldout(cct, 1) << "ERROR: failed to init http endpoint manager" << dendl;
return false;
}
return true;
}

void RGWPubSubEndpoint::shutdown_all() {
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
amqp::shutdown();
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
kafka::shutdown();
#endif
shutdown_http_manager();
}

6 changes: 6 additions & 0 deletions src/rgw/driver/rados/rgw_pubsub_push.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,11 @@ class RGWPubSubEndpoint {
configuration_error(const std::string& what_arg) :
std::logic_error("pubsub endpoint configuration error: " + what_arg) {}
};

// init all supported endpoints
static bool init_all(CephContext* cct);
// shutdown all supported endpoints
static void shutdown_all();

};

6 changes: 2 additions & 4 deletions src/rgw/driver/rados/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1356,10 +1356,8 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
index_completion_manager = new RGWIndexCompletionManager(this);

if (run_notification_thread) {
ret = rgw::notify::init(cct, driver, *svc.site, dpp);
if (ret < 0 ) {
ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
return ret;
if (!rgw::notify::init(dpp, driver, *svc.site)) {
ldpp_dout(dpp, 0) << "ERROR: failed to initialize notification manager" << dendl;
}

using namespace rgw;
Expand Down
26 changes: 0 additions & 26 deletions src/rgw/rgw_appmain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@
#include "rgw_kmip_client_impl.h"
#include "rgw_perf_counters.h"
#include "rgw_signal.h"
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
#include "rgw_amqp.h"
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
#include "rgw_kafka.h"
#endif
#ifdef WITH_ARROW_FLIGHT
#include "rgw_flight_frontend.h"
#endif
Expand Down Expand Up @@ -555,20 +549,6 @@ void rgw::AppMain::init_tracepoints()
tracing::rgw::tracer.init(dpp->get_cct(), "rgw");
} /* init_tracepoints() */

void rgw::AppMain::init_notification_endpoints()
{
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
if (!rgw::amqp::init(dpp->get_cct())) {
derr << "ERROR: failed to initialize AMQP manager" << dendl;
}
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
if (!rgw::kafka::init(dpp->get_cct())) {
derr << "ERROR: failed to initialize Kafka manager" << dendl;
}
#endif
} /* init_notification_endpoints */

void rgw::AppMain::init_lua()
{
rgw::sal::Driver* driver = env.driver;
Expand Down Expand Up @@ -645,12 +625,6 @@ void rgw::AppMain::shutdown(std::function<void(void)> finalize_async_signals)
rgw::curl::cleanup_curl();
g_conf().remove_observer(implicit_tenant_context.get());
implicit_tenant_context.reset(); // deletes
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
rgw::amqp::shutdown();
#endif
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
rgw::kafka::shutdown();
#endif
rgw_perf_stop(g_ceph_context);
ratelimiter.reset(); // deletes--ensure this happens before we destruct
} /* AppMain::shutdown */

0 comments on commit c4d2dcf

Please sign in to comment.