Permalink
Browse files

[fix] working stream

  • Loading branch information...
1 parent b1e49f4 commit bf0c9c9cb420d816d549ea3c1c153504f09b81c7 @bmeck committed Mar 17, 2013
Showing with 102 additions and 110 deletions.
  1. +86 −75 test.c
  2. +16 −35 uv_queue.c
View
161 test.c
@@ -5,43 +5,17 @@
#include "curl/include/curl/curl.h"
#include "uv_queue.c"
-CURL* curl_handle;
uv_loop_t *loop;
+CURLM* curl_handle;
//
// Lesson: curl is terrible
//
-size_t read_from_queue(char *ptr, size_t size, size_t nmemb, void *userdata) {
- printf("CURL DEMANDS MORE DATA!\n");
- uv_queue_t* queue = (uv_queue_t*) userdata;
- int to_write = size * nmemb;
- int offset = 0;
- while (to_write > 0) {
- uv_buf_t head = queue->buffers[0];
- if (!queue->length) {
- break;
- }
- int remaining = head.len - to_write;
- int writing;
- if (remaining >= 0) {
- writing = head.len;
- uv_queue_shift(queue, NULL);
- }
- else{
- writing = to_write;
- queue->buffers[0] = uv_buf_init(head.base, remaining);
- free(head.base);
- }
- memcpy(head.base, ptr + offset, writing);
- offset += writing;
- }
- if (!offset) {
- return CURL_READFUNC_PAUSE;
- }
- return offset;
-}
-void curl_attach_upload(uv_poll_t *req, int status, int events) {
+
+
+void curl_perform(uv_poll_t *req, int status, int events) {
+ printf("CURL DEMANDS PERFORMING STUFF!\n");
int running_handles;
int flags = 0;
if (events & UV_READABLE) flags |= CURL_CSELECT_IN;
@@ -57,34 +31,20 @@ void curl_attach_upload(uv_poll_t *req, int status, int events) {
switch (message->msg) {
case CURLMSG_DONE:
curl_easy_getinfo(message->easy_handle, CURLINFO_EFFECTIVE_URL, &done_url);
+ printf("%s DONE\n", done_url);
curl_multi_remove_handle(curl_handle, message->easy_handle);
curl_easy_cleanup(message->easy_handle);
break;
default:
+ fprintf(stderr, "CURLMSG default\n");
abort();
}
}
}
-
-uv_pipe_t* add_upload(CURLM *curl_handle, const char *url) {
- CURL *handle = curl_easy_init();
- uv_queue_t *queue = (uv_queue_t*) malloc(sizeof(uv_queue_t));
- uv_pipe_t *pipe = (uv_pipe_t*) malloc(sizeof(uv_pipe_t));
- uv_queue_init(queue, NULL);
- pipe->data = queue;
- //
- // Just staple the curl and pipe together, lol
- //
- curl_easy_setopt(handle, CURLOPT_READDATA, queue);
- curl_easy_setopt(handle, CURLOPT_READFUNCTION, read_from_queue);
- curl_easy_setopt(handle, CURLOPT_URL, url);
- curl_multi_add_handle(curl_handle, handle);
- return pipe;
-}
-
int handle_socket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
+ printf("CURL DEMANDS SOCKET STUFF!\n");
uv_poll_t *poll_fd;
if (action == CURL_POLL_IN || action == CURL_POLL_OUT) {
if (socketp) {
@@ -99,9 +59,10 @@ int handle_socket(CURL *easy, curl_socket_t s, int action, void *userp, void *so
switch (action) {
case CURL_POLL_IN:
+ uv_poll_start(poll_fd, UV_READABLE, curl_perform);
break;
case CURL_POLL_OUT:
- uv_poll_start(poll_fd, UV_WRITABLE, curl_attach_upload);
+ uv_poll_start(poll_fd, UV_WRITABLE, curl_perform);
break;
case CURL_POLL_REMOVE:
if (socketp) {
@@ -118,46 +79,96 @@ int handle_socket(CURL *easy, curl_socket_t s, int action, void *userp, void *so
}
-char* msg = "test\n";
+size_t read_from_queue(char *ptr, size_t size, size_t nmemb, void *userdata) {
+ printf("CURL DEMANDS MORE DATA!\n");
+ uv_queue_t* queue = (uv_queue_t*) userdata;
+ int to_write = size * nmemb;
+ int offset = 0;
+ while (to_write - offset > 0) {
+ if (!queue->length) {
+ break;
+ }
+ uv_buf_t head = queue->buffers[0];
+ int remaining_in_buf = head.len - to_write;
+ int writing;
+ if (remaining_in_buf >= 0) {
+ writing = to_write;
+ queue->buffers[0] = uv_buf_init(head.base, remaining_in_buf);
+ memcpy(head.base, head.base + writing, remaining_in_buf);
+ head.base = realloc(head.base, remaining_in_buf);
+ head.len = remaining_in_buf;
+ }
+ else{
+ writing = head.len;
+ uv_queue_shift(queue, NULL);
+ free(head.base);
+ }
+ memcpy(ptr + offset, head.base, writing);
+ offset += writing;
+ }
+ if (!offset) {
+ return CURL_READFUNC_PAUSE;
+ }
+ return offset;
+}
+
+uv_pipe_t* add_upload(CURLM *curl_handle, const char *url, uv_pipe_t* pipe) {
+ CURL *handle = curl_easy_init();
+ printf("SENDING TO %s\n", url);
+ curl_easy_setopt(handle, CURLOPT_READFUNCTION, read_from_queue);
+ curl_easy_setopt(handle, CURLOPT_UPLOAD, 1L);
+ curl_easy_setopt(handle, CURLOPT_READDATA, pipe->data);
+ curl_easy_setopt(handle, CURLOPT_URL, url);
+ return handle;
+}
+
+const char* msg = "test\n";
+uv_buf_t alloc_buffer(size_t suggested_size) {
+ return uv_buf_init((char*) malloc(suggested_size), suggested_size);
+}
void enqueue_msg(uv_timer_t* timer, int status) {
+ uv_buf_t buf = alloc_buffer(strlen(msg));
+ memcpy(buf.base, msg, strlen(msg));
uv_queue_t* queue = (uv_queue_t*) timer->data;
- uv_buf_t buf = uv_buf_init(msg,5);
- printf("%s READY\n",msg);
uv_queue_push(queue, buf);
- printf("%s ADDED\n",msg);
+ printf("ADDED MSG with base: %p\n", buf.base);
+ int count = 1;
+ curl_multi_perform( curl_handle, &count );
}
uv_timer_t* timer;
void setup_interval(uv_queue_t* queue) {
timer = (uv_timer_t*) malloc(sizeof(uv_timer_t));
uv_timer_init(loop, timer);
timer->data = queue;
- uv_timer_start(timer, enqueue_msg, 0, 1e4);
+ uv_timer_start(timer, enqueue_msg, 0, 1e3);
}
int main(int argc, char **argv) {
- loop = uv_default_loop();
-
- printf("ARGC TEST\n");
- if (argc <= 1)
- return 0;
-
- if (curl_global_init(CURL_GLOBAL_ALL)) {
- fprintf(stderr, "Could not init cURL\n");
- return 1;
- }
+ loop = uv_default_loop();
+
+ if (argc <= 1)
+ return 0;
- printf("CURL SETUP STARTING\n");
- curl_handle = curl_multi_init();
- curl_multi_setopt(curl_handle, CURLMOPT_SOCKETFUNCTION, handle_socket);
+ if (curl_global_init(CURL_GLOBAL_ALL)) {
+ fprintf(stderr, "Could not init cURL\n");
+ return 1;
+ }
- printf("PIPE SETUP STARTING\n");
- uv_pipe_t* pipe = add_upload(curl_handle, argv[0]);
- printf("PIPE SETUP DONE\n");
- setup_interval((uv_queue_t*)pipe->data);
- printf("INTERVAL SETUP DONE\n");
- uv_run(loop, UV_RUN_DEFAULT);
- curl_multi_cleanup(curl_handle);
- return 0;
+ uv_queue_t *queue = (uv_queue_t*) malloc(sizeof(uv_queue_t));
+ uv_queue_init(queue, NULL);
+ uv_pipe_t *pipe = (uv_pipe_t*) malloc(sizeof(uv_pipe_t));
+ uv_pipe_init(loop, pipe, 0);
+ pipe->data = queue;
+ curl_handle = curl_multi_init();
+ curl_multi_setopt(curl_handle, CURLMOPT_SOCKETFUNCTION, handle_socket);
+ curl_multi_add_handle(curl_handle, add_upload(curl_handle, argv[1], pipe));
+ setup_interval((uv_queue_t*)pipe->data);
+ int count = 1;
+ curl_multi_perform( curl_handle, &count );
+
+ uv_run(loop, UV_RUN_DEFAULT);
+ curl_multi_cleanup(curl_handle);
+ return 0;
}
View
@@ -6,9 +6,9 @@
typedef void (*uv_queue_change_cb)(int index, int change);
struct uv_queue_s {
int length;
- uv_buf_t* buffers;
uv_mutex_t* mutex;
uv_queue_change_cb onchange;
+ uv_buf_t* buffers;
};
//
// index - where data changed
@@ -24,60 +24,41 @@ int uv_queue_init (uv_queue_t* queue, uv_queue_change_cb* change_cb) {
queue->buffers = (uv_buf_t*) malloc(0);
queue->mutex = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
queue->onchange = NULL;
+ queue->length = 0;
if (!uv_mutex_init(queue->mutex)) {
return -1;
}
return 0;
}
-void uv_queue_free (uv_queue_t* queue) {
- //
- // Wait on other threads
- //
- while (uv_mutex_trylock(queue->mutex));
- int i = 0;
- for (i = 0; i < queue->length; i++) {
- free(queue->buffers[i].base);
- }
- free(queue->buffers);
- uv_mutex_destroy(queue->mutex);
-}
-
int uv_queue_push(uv_queue_t* queue, uv_buf_t buffer) {
while (uv_mutex_trylock(queue->mutex));
uv_buf_t* buffers = (uv_buf_t*) realloc(queue->buffers, (queue->length + 1) * sizeof(uv_buf_t));
if (!buffers) {
return -1;
}
+ queue->buffers = buffers;
queue->buffers[queue->length] = buffer;
++queue->length;
uv_mutex_unlock(queue->mutex);
return 0;
}
-int uv_queue_unshift(uv_queue_t* queue, uv_buf_t* buffer) {
- if (!queue->length) {
- return -1;
- }
+void uv_queue_free (uv_queue_t* queue) {
+ //
+ // Wait on other threads
+ //
while (uv_mutex_trylock(queue->mutex));
- uv_buf_t* old_buffers = queue->buffers;
- int new_length = queue->length + 1;
- int new_size = new_length * sizeof(uv_buf_t);
- uv_buf_t* buffers = (uv_buf_t*) malloc(new_size);
- if (!buffers) {
- uv_mutex_unlock(queue->mutex);
- return -1;
+ int i = 0;
+ for (i = 0; i < queue->length; i++) {
+ free(queue->buffers[i].base);
}
- memcpy(old_buffers, buffers + sizeof(uv_buf_t), new_size);
- buffers[0] = *buffer;
- queue->buffers = buffers;
- queue->length = new_length;
- uv_mutex_unlock(queue->mutex);
- free(old_buffers);
- return 0;
+ free(queue->buffers);
+ uv_mutex_destroy(queue->mutex);
}
int uv_queue_shift(uv_queue_t* queue, uv_buf_t* destination) {
+ printf("SHIFT!!!!!!!\n");
if (!queue->length) {
return -1;
}
@@ -93,15 +74,15 @@ int uv_queue_shift(uv_queue_t* queue, uv_buf_t* destination) {
uv_mutex_unlock(queue->mutex);
return -1;
}
- memcpy(old_buffers + sizeof(uv_buf_t*), buffers, new_size);
+ memcpy(buffers, old_buffers + sizeof(uv_buf_t*), new_size);
queue->buffers = buffers;
queue->length = new_length;
+ free(old_buffers);
uv_mutex_unlock(queue->mutex);
- free(old_buffers);
return 0;
}
void uv_queue_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
uv_queue_t* queue = stream->data;
- uv_queue_push(queue, &buf);
+ uv_queue_push(queue, buf);
}

0 comments on commit bf0c9c9

Please sign in to comment.