Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Transparent curl_multi_* support. More WIP.

  • Loading branch information...
commit bfbf67e5a38634b69b0f64c91faa5faba72418f7 1 parent 4803bf3
@bnoordhuis authored
Showing with 148 additions and 30 deletions.
  1. +139 −21 curl.cc
  2. +9 −9 test.js
View
160 curl.cc
@@ -1,6 +1,7 @@
-#include <stdlib.h>
-#include <string.h>
-#include <assert.h>
+#include <cstdlib>
+#include <cstring>
+#include <cassert>
+#include <map>
#include <curl/curl.h>
@@ -67,15 +68,26 @@ class MultiHandle {
static bool Initialize();
static MultiHandle& Singleton();
Handle<Value> Add(EasyHandle& ch);
- Handle<Value> Remove(EasyHandle& ch);
private:
- static MultiHandle* singleton_;
+ typedef std::map<curl_socket_t, ev_io> SockFDs;
+
unsigned num_handles_;
+ SockFDs sockfds_;
CURLM* const mh_;
+ ev_timer timer_;
MultiHandle();
~MultiHandle();
+ bool ProcessEvents();
+
+ static int TimerFunction(CURLM* mh, long timeout, void* userp);
+ static int SocketFunction(
+ CURLM* mh, curl_socket_t sockfd, int events, void* userp, void* socketp);
+ static void IOEventFunction(ev_io* w, int events);
+ static void TimerEventFunction(ev_timer* w, int events);
+
+ static MultiHandle* singleton_;
};
//
@@ -121,7 +133,6 @@ EasyHandle::EasyHandle(): ch_(curl_easy_init()) {
EasyHandle::~EasyHandle() {
read_callback_.Dispose();
write_callback_.Dispose();
- MultiHandle::Singleton().Remove(*this);
curl_easy_cleanup(ch_);
}
@@ -150,10 +161,19 @@ Handle<Value> EasyHandle::InvokeWriteCallback(Buffer* data) {
//
MultiHandle* MultiHandle::singleton_;
-MultiHandle::MultiHandle(): mh_(curl_multi_init()) {
+MultiHandle::MultiHandle(): num_handles_(0), mh_(curl_multi_init()) {
if (mh_ == 0) {
Error("curl_multi_init() returned NULL!");
}
+ else {
+ curl_multi_setopt(mh_, CURLMOPT_SOCKETFUNCTION, SocketFunction);
+ curl_multi_setopt(mh_, CURLMOPT_SOCKETDATA, this);
+
+ curl_multi_setopt(mh_, CURLMOPT_TIMERFUNCTION, TimerFunction);
+ curl_multi_setopt(mh_, CURLMOPT_TIMERDATA, this);
+
+ ev_init(&timer_, TimerEventFunction);
+ }
}
MultiHandle::~MultiHandle() {
@@ -171,33 +191,131 @@ MultiHandle& MultiHandle::Singleton() {
return *singleton_;
}
-Handle<Value> MultiHandle::Add(EasyHandle& ch) {
- const CURLMcode status = curl_multi_add_handle(mh_, ch);
+bool MultiHandle::ProcessEvents() {
+ int running_handles;
+ CURLMcode status;
+
+ running_handles = 0;
+ do {
+ status = curl_multi_socket_all(mh_, &running_handles);
+ }
+ while (status == CURLM_CALL_MULTI_PERFORM);
+
if (status != CURLM_OK) {
- return CurlError(status);
+ CurlError(status); // safe to call, this code runs in the same thread as V8
}
- if (++num_handles_ == 1) {
- // start the event loop
- ev_ref();
+ if (running_handles == 0) {
+ ev_timer_stop(&timer_);
+ ev_unref();
}
- return Undefined();
+ int msgs_in_queue;
+ CURLMsg *msg;
+
+ msgs_in_queue = 0;
+ while ((msg = curl_multi_info_read(mh_, &msgs_in_queue))) {
+ if (msg->msg == CURLMSG_DONE) {
+ curl_multi_remove_handle(mh_, msg->easy_handle);
+ }
+ }
+ assert(msgs_in_queue == 0);
+
+ return status == CURLM_OK;
}
-Handle<Value> MultiHandle::Remove(EasyHandle& ch) {
- assert(num_handles_ > 0);
+void MultiHandle::TimerEventFunction(ev_timer* w, int events) {
+ MultiHandle& self = *reinterpret_cast<MultiHandle*>(w->data);
+
+ fprintf(stderr, "%s: events=%d\n", __func__, events);
+ self.ProcessEvents();
+}
+
+int MultiHandle::TimerFunction(CURLM* /*handle*/, long timeout, void* userp) {
+ MultiHandle& self = *reinterpret_cast<MultiHandle*>(userp);
+
+ if (timeout > 1000) timeout = 1000;
+
+ fprintf(stderr, "%s: timeout=%ld\n", __func__, timeout);
+ ev_timer_stop(&self.timer_);
+ ev_timer_set(&self.timer_, timeout / 1000., timeout / 1000.);
+ ev_timer_start(&self.timer_);
+ self.timer_.data = reinterpret_cast<void*>(&self);
+
+ return CURLM_OK;
+}
- const CURLMcode status = curl_multi_remove_handle(mh_, ch);
+int curl2ev(int events) {
+ if (events == CURL_POLL_IN) {
+ return EV_READ;
+ }
+ if (events == CURL_POLL_OUT) {
+ return EV_WRITE;
+ }
+ if (events == CURL_POLL_INOUT) {
+ return EV_READ | EV_WRITE;
+ }
+ return 0;
+}
+
+int MultiHandle::SocketFunction(
+ CURLM* /*handle*/, curl_socket_t sockfd, int events, void* userp, void* /*socketp*/)
+{
+ MultiHandle& self = *reinterpret_cast<MultiHandle*>(userp);
+
+ fprintf(stderr, "%s: sockfd=%d, events=%d\n", __func__, sockfd, events);
+
+ // translate curl flags to libev flags
+ events = curl2ev(events);
+
+ SockFDs::iterator it = self.sockfds_.find(sockfd);
+ if (it == self.sockfds_.end()) {
+ if (events) {
+ // create I/O watcher and add it to the list
+ ev_io& w = self.sockfds_.insert(SockFDs::value_type(sockfd, ev_io())).first->second;
+ ev_io_init(&w, IOEventFunction, sockfd, events);
+ ev_io_start(&w);
+ w.data = reinterpret_cast<void*>(&self);
+ }
+ else {
+ assert(0 && "CURL_POLL_NONE or CURL_POLL_REMOVE for bad socket");
+ }
+ }
+ else {
+ ev_io& w = it->second;
+ if (events) {
+ // update the event flags
+ ev_io_set(&w, sockfd, events);
+ }
+ else {
+ // disarm and dispose fd watcher
+ ev_io_stop(&w);
+ self.sockfds_.erase(it);
+ }
+ }
+
+ return CURLM_OK;
+}
+
+void MultiHandle::IOEventFunction(ev_io* w, int events) {
+ MultiHandle& self = *reinterpret_cast<MultiHandle*>(w->data);
+
+ fprintf(stderr, "%s: sockfd=%d, events=%d\n", __func__, w->fd, events);
+ self.ProcessEvents();
+}
+
+Handle<Value> MultiHandle::Add(EasyHandle& ch) {
+ CURLMcode status = curl_multi_add_handle(mh_, ch);
if (status != CURLM_OK) {
return CurlError(status);
}
- if (--num_handles_ == 0) {
- // stop the event loop
- ev_unref();
+ if (++num_handles_ == 1) {
+ ev_ref();
}
+ ProcessEvents();
+
return Undefined();
}
@@ -222,7 +340,7 @@ size_t WriteFunction(char* data, size_t size, size_t nmemb, void* arg) {
//
// bindings (glue)
//
-Handle<Value> curl_easy_init_g(const Arguments& args) {
+Handle<Value> curl_easy_init_g(const Arguments& /*args*/) {
return EasyHandle::New();
}
View
18 test.js
@@ -7,18 +7,18 @@ for (var k in curl) {
}
ch = curl_easy_init();
-curl_easy_setopt(ch, CURLOPT_URL, 'https://encrypted.google.com/');
-curl_easy_setopt(ch, CURLOPT_CERTINFO, 1);
+//curl_easy_setopt(ch, CURLOPT_URL, 'https://encrypted.google.com/');
+//curl_easy_setopt(ch, CURLOPT_CERTINFO, 1);
+//curl_easy_setopt(ch, CURLOPT_URL, 'http://127.0.0.1/~bnoordhuis/');
+curl_easy_setopt(ch, CURLOPT_URL, 'http://127.0.0.1:4242/');
curl_easy_setopt(ch, CURLOPT_WRITEFUNCTION, function(data) {
console.error(data.toString());
});
curl_easy_perform(ch, function(ex) {
- // TODO
-});
-
-for (var k in global) {
- if (0 == k.indexOf('CURLINFO_')) {
- console.error(k, curl_easy_getinfo(ch, global[k]));
+ for (var k in global) {
+ if (0 == k.indexOf('CURLINFO_')) {
+ console.error(k, curl_easy_getinfo(ch, global[k]));
+ }
}
-}
+});
Please sign in to comment.
Something went wrong with that request. Please try again.