From 5fdca846aaea9afd16828d33ccc279c4dcdbcd4c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 5 May 2016 10:57:05 -0700 Subject: [PATCH] rgw: RGWHTTPClient requests can unregister themselves early No need to wait for req_state to complete anymore. Signed-off-by: Yehuda Sadeh (cherry picked from commit 91f61d68bf5fc39152d75fbc633f088e17d53d9e) Conflicts: src/rgw/rgw_http_client.cc --- src/rgw/rgw_http_client.cc | 272 ++++++++++++++++++++++++++++--------- src/rgw/rgw_http_client.h | 7 +- 2 files changed, 214 insertions(+), 65 deletions(-) diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index 93edd1f4f6890..05d9acd204d2a 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -14,7 +14,62 @@ #define dout_subsys ceph_subsys_rgw -static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info) +struct rgw_http_req_data : public RefCountedObject { + CURL *easy_handle; + curl_slist *h; + uint64_t id; + int ret; + atomic_t done; + RGWHTTPClient *client; + void *user_info; + bool registered; + RGWHTTPManager *mgr; + char error_buf[CURL_ERROR_SIZE]; + + Mutex lock; + Cond cond; + + rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), + client(nullptr), user_info(nullptr), registered(false), + mgr(NULL), lock("rgw_http_req_data::lock") { + memset(error_buf, 0, sizeof(error_buf)); + } + + int wait() { + Mutex::Locker l(lock); + cond.Wait(lock); + return ret; + } + + void finish(int r) { + Mutex::Locker l(lock); + ret = r; + if (easy_handle) + curl_easy_cleanup(easy_handle); + + if (h) + curl_slist_free_all(h); + + easy_handle = NULL; + h = NULL; + done.set(1); + cond.Signal(); + } + + bool is_done() { + return done.read() != 0; + } + + int get_retcode() { + Mutex::Locker l(lock); + return ret; + } +}; + +/* + * the simple set of callbacks will be called on RGWHTTPClient::process() + */ +static size_t simple_receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info) { RGWHTTPClient *client = static_cast(_info); size_t len = size * nmemb; @@ -26,7 +81,7 @@ static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_i return len; } -static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +static size_t simple_receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info) { RGWHTTPClient *client = static_cast(_info); size_t len = size * nmemb; @@ -38,7 +93,7 @@ static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_inf return len; } -static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +static size_t simple_send_http_data(void *ptr, size_t size, size_t nmemb, void *_info) { RGWHTTPClient *client = static_cast(_info); int ret = client->send_data(ptr, size * nmemb); @@ -49,6 +104,66 @@ static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info) return ret; } +/* + * the following set of callbacks will be called either on RGWHTTPManager::process(), + * or via the RGWHTTPManager async processing. + */ +static size_t receive_http_header(void *ptr, size_t size, size_t nmemb, void *_info) +{ + rgw_http_req_data *req_data = static_cast(_info); + size_t len = size * nmemb; + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return len; + } + + int ret = req_data->client->receive_header(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl; + } + + return len; +} + +static size_t receive_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +{ + rgw_http_req_data *req_data = static_cast(_info); + size_t len = size * nmemb; + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return len; + } + + int ret = req_data->client->receive_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; + } + + return len; +} + +static size_t send_http_data(void *ptr, size_t size, size_t nmemb, void *_info) +{ + rgw_http_req_data *req_data = static_cast(_info); + + Mutex::Locker l(req_data->lock); + + if (!req_data->registered) { + return 0; + } + + int ret = req_data->client->send_data(ptr, size * nmemb); + if (ret < 0) { + dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; + } + + return ret; +} + static curl_slist *headers_to_slist(list >& headers) { curl_slist *h = NULL; @@ -79,6 +194,10 @@ static curl_slist *headers_to_slist(list >& headers) return h; } +/* + * process a single simple one off request, not going through RGWHTTPManager. Not using + * req_data. + */ int RGWHTTPClient::process(const char *method, const char *url) { int ret = 0; @@ -99,15 +218,15 @@ int RGWHTTPClient::process(const char *method, const char *url) curl_easy_setopt(curl_handle, CURLOPT_URL, url); curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, receive_http_header); + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header); curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, receive_http_data); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); if (h) { curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); } - curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, send_http_data); + curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data); curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); if (has_send_len) { @@ -131,57 +250,6 @@ int RGWHTTPClient::process(const char *method, const char *url) return ret; } -struct rgw_http_req_data : public RefCountedObject { - CURL *easy_handle; - curl_slist *h; - uint64_t id; - int ret; - atomic_t done; - RGWHTTPClient *client; - void *user_info; - RGWHTTPManager *mgr; - char error_buf[CURL_ERROR_SIZE]; - - Mutex lock; - Cond cond; - - rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), - client(nullptr), user_info(nullptr), - mgr(NULL), lock("rgw_http_req_data::lock") { - memset(error_buf, 0, sizeof(error_buf)); - } - - int wait() { - Mutex::Locker l(lock); - cond.Wait(lock); - return ret; - } - - void finish(int r) { - Mutex::Locker l(lock); - ret = r; - if (easy_handle) - curl_easy_cleanup(easy_handle); - - if (h) - curl_slist_free_all(h); - - easy_handle = NULL; - h = NULL; - done.set(1); - cond.Signal(); - } - - bool is_done() { - return done.read() != 0; - } - - int get_retcode() { - Mutex::Locker l(lock); - return ret; - } -}; - string RGWHTTPClient::to_str() { string method_str = (last_method.empty() ? "" : last_method); @@ -198,6 +266,9 @@ int RGWHTTPClient::get_req_retcode() return req_data->get_retcode(); } +/* + * init request, will be used later with RGWHTTPManager + */ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data) { assert(!req_data); @@ -224,15 +295,15 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); - curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); - curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); if (h) { curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); } curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); - curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)this); + curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); if (has_send_len) { curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); @@ -242,6 +313,9 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re return 0; } +/* + * wait for async request to complete + */ int RGWHTTPClient::wait() { if (!req_data->is_done()) { @@ -254,7 +328,8 @@ int RGWHTTPClient::wait() RGWHTTPClient::~RGWHTTPClient() { if (req_data) { - wait(); + req_data->mgr->remove_request(this); + req_data->put(); } } @@ -350,6 +425,9 @@ void *RGWHTTPManager::ReqsThread::entry() return NULL; } +/* + * RGWHTTPManager has two modes of operation: threaded and non-threaded. + */ RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), completion_mgr(_cm), is_threaded(false), reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), @@ -370,11 +448,21 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data) { RWLock::WLocker rl(reqs_lock); req_data->id = num_reqs; + req_data->registered = true; reqs[num_reqs] = req_data; num_reqs++; ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; } +void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker rl(reqs_lock); + req_data->get(); + req_data->registered = false; + unregistered_reqs.push_back(req_data); + ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; +} + void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) { RWLock::WLocker rl(reqs_lock); @@ -390,6 +478,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) if (completion_mgr) { completion_mgr->complete(NULL, req_data->user_info); } + req_data->put(); } @@ -405,6 +494,9 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) _complete_request(req_data); } +/* + * hook request to the curl multi handle + */ int RGWHTTPManager::link_request(rgw_http_req_data *req_data) { ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; @@ -416,10 +508,30 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data) return 0; } -void RGWHTTPManager::link_pending_requests() +/* + * unhook request from the curl multi handle, and finish request if it wasn't finished yet as + * there will be no more processing on this request + */ +void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) +{ + if (req_data->easy_handle) { + curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle); + } + if (!req_data->is_done()) { + _finish_request(req_data, -ECANCELED); + } +} + +void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) +{ + RWLock::WLocker wl(reqs_lock); + _unlink_request(req_data); +} + +void RGWHTTPManager::manage_pending_requests() { reqs_lock.get_read(); - if (max_threaded_req == num_reqs) { + if (max_threaded_req == num_reqs && unregistered_reqs.empty()) { reqs_lock.unlock(); return; } @@ -427,6 +539,15 @@ void RGWHTTPManager::link_pending_requests() RWLock::WLocker wl(reqs_lock); + if (!unregistered_reqs.empty()) { + for (auto& r : unregistered_reqs) { + _unlink_request(r); + r->put(); + } + + unregistered_reqs.clear(); + } + map::iterator iter = reqs.find(max_threaded_req); list > remove_reqs; @@ -483,6 +604,26 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const return ret; } +int RGWHTTPManager::remove_request(RGWHTTPClient *client) +{ + rgw_http_req_data *req_data = client->get_req_data(); + + if (!is_threaded) { + unlink_request(req_data); + return 0; + } + unregister_request(req_data); + int ret = signal_thread(); + if (ret < 0) { + return ret; + } + + return 0; +} + +/* + * the synchronous, non-threaded request processing method. + */ int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) { assert(!is_threaded); @@ -537,6 +678,9 @@ int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) return 0; } +/* + * the synchronous, non-threaded request processing completion method. + */ int RGWHTTPManager::complete_requests() { bool done; @@ -605,7 +749,7 @@ void *RGWHTTPManager::reqs_thread_entry() return NULL; } - link_pending_requests(); + manage_pending_requests(); mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); switch (mstatus) { diff --git a/src/rgw/rgw_http_client.h b/src/rgw/rgw_http_client.h index c4dcd88585a03..6283d310b7b12 100644 --- a/src/rgw/rgw_http_client.h +++ b/src/rgw/rgw_http_client.h @@ -111,6 +111,7 @@ class RGWHTTPManager { RWLock reqs_lock; map reqs; + list unregistered_reqs; map complete_reqs; int64_t num_reqs; int64_t max_threaded_req; @@ -119,11 +120,14 @@ class RGWHTTPManager { void register_request(rgw_http_req_data *req_data); void complete_request(rgw_http_req_data *req_data); void _complete_request(rgw_http_req_data *req_data); + void unregister_request(rgw_http_req_data *req_data); + void _unlink_request(rgw_http_req_data *req_data); + void unlink_request(rgw_http_req_data *req_data); void finish_request(rgw_http_req_data *req_data, int r); void _finish_request(rgw_http_req_data *req_data, int r); int link_request(rgw_http_req_data *req_data); - void link_pending_requests(); + void manage_pending_requests(); class ReqsThread : public Thread { RGWHTTPManager *manager; @@ -147,6 +151,7 @@ class RGWHTTPManager { void stop(); int add_request(RGWHTTPClient *client, const char *method, const char *url); + int remove_request(RGWHTTPClient *client); /* only for non threaded case */ int process_requests(bool wait_for_data, bool *done);