Skip to content

Commit

Permalink
Merge pull request #10998 from ceph/wip-16695
Browse files Browse the repository at this point in the history
rgw: work around curl_multi_wait bug with non-blocking reads

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
cbodley committed Sep 14, 2016
2 parents 0f9c434 + 0359be6 commit c2ac199
Showing 1 changed file with 101 additions and 12 deletions.
113 changes: 101 additions & 12 deletions src/rgw/rgw_http_client.cc
Expand Up @@ -418,8 +418,83 @@ int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
}


static int clear_signal(int fd)
{
// since we're in non-blocking mode, we can try to read a lot more than
// one signal from signal_thread() to avoid later wakeups. non-blocking reads
// are also required to support the curl_multi_wait bug workaround
std::array<char, 256> buf;
int ret = ::read(fd, (void *)buf.data(), buf.size());
if (ret < 0) {
ret = -errno;
return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
}
return 0;
}

#if HAVE_CURL_MULTI_WAIT

static std::once_flag detect_flag;
static bool curl_multi_wait_bug_present = false;

static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
int write_fd, int read_fd)
{
int ret = 0;

// write to write_fd so that read_fd becomes readable
uint32_t buf = 0;
ret = ::write(write_fd, &buf, sizeof(buf));
if (ret < 0) {
ret = -errno;
ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
return ret;
}

// pass read_fd in extra_fds for curl_multi_wait()
int num_fds;
struct curl_waitfd wait_fd;

wait_fd.fd = read_fd;
wait_fd.events = CURL_WAIT_POLLIN;
wait_fd.revents = 0;

ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
if (ret != CURLM_OK) {
ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
return -EIO;
}

// curl_multi_wait should flag revents when extra_fd is readable. if it
// doesn't, the bug is present and we can't rely on revents
if (wait_fd.revents == 0) {
curl_multi_wait_bug_present = true;
ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
"bug in curl_multi_wait(). enabling a workaround that may degrade "
"performance slightly. please upgrade to a more recent version of "
"libcurl." << dendl;
}

return clear_signal(read_fd);
}

static bool is_signaled(const curl_waitfd& wait_fd)
{
if (wait_fd.fd < 0) {
// no fd to signal
return false;
}

if (curl_multi_wait_bug_present) {
// we can't rely on revents, so we always return true if a wait_fd is given.
// this means we'll be trying a non-blocking read on this fd every time that
// curl_multi_wait() wakes up
return true;
}

return wait_fd.revents > 0;
}

static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
{
int num_fds;
Expand All @@ -431,16 +506,14 @@ static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)

int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
if (ret) {
dout(0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
return -EIO;
}

if (wait_fd.revents > 0) {
uint32_t buf;
ret = read(signal_fd, (void *)&buf, sizeof(buf));
if (is_signaled(wait_fd)) {
ret = clear_signal(signal_fd);
if (ret < 0) {
ret = -errno;
dout(0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
return ret;
}
}
Expand All @@ -463,7 +536,7 @@ static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
/* get file descriptors from the transfers */
int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
if (ret) {
generic_dout(0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
return -EIO;
}

Expand All @@ -486,16 +559,14 @@ static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
if (ret < 0) {
ret = -errno;
dout(0) << "ERROR: select returned " << ret << dendl;
ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
return ret;
}

if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
uint32_t buf;
ret = read(signal_fd, (void *)&buf, sizeof(buf));
ret = clear_signal(signal_fd);
if (ret < 0) {
ret = -errno;
dout(0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
return ret;
}
}
Expand Down Expand Up @@ -791,6 +862,24 @@ int RGWHTTPManager::set_threaded()
return r;
}

// enable non-blocking reads
r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
if (r < 0) {
r = -errno;
ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
return r;
}

#ifdef HAVE_CURL_MULTI_WAIT
// on first initialization, use this pipe to detect whether we're using a
// buggy version of libcurl
std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
static_cast<CURLM*>(multi_handle),
thread_pipe[1], thread_pipe[0]);
#endif

is_threaded = true;
reqs_thread = new ReqsThread(this);
reqs_thread->create("http_manager");
Expand Down

0 comments on commit c2ac199

Please sign in to comment.