Browse files

moving more s3-centric curl

  • Loading branch information...
1 parent 72406ab commit 65ab9c8a75fe76b2f726b24a9dc50fe4bc8868ac @benlemasurier committed Feb 7, 2012
Showing with 158 additions and 146 deletions.
  1. +8 −138 src/curl.c
  2. +1 −1 src/curl.h
  3. +147 −6 src/s3-curl.c
  4. +1 −0 src/s3-curl.h
  5. +1 −1 src/s3.c
View
146 src/curl.c
@@ -32,7 +32,6 @@
#include "s3-curl.h" // fixme: temp.
#define CURL_RETRIES 3
-#define MAX_REQUESTS 100
#define POOL_SIZE 100
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
@@ -262,6 +261,12 @@ rfc2822_timestamp(void)
return date;
}
+CURLM *
+get_multi_handle(void)
+{
+ return curl.multi;
+}
+
static int
http_response_errno(CURLcode response_code, CURL *handle)
{
@@ -353,7 +358,7 @@ set_curl_defaults(CURL *c)
curl_easy_setopt(c, CURLOPT_SHARE, curl.share);
// curl_easy_setopt(c, CURLOPT_TCP_NODELAY, 1);
- // curl_easy_setopt(c, CURLOPT_VERBOSE, 1L);
+ curl_easy_setopt(c, CURLOPT_VERBOSE, 1L);
// curl_easy_setopt(c, CURLOPT_FORBID_REUSE, 1);
return 0;
@@ -415,7 +420,7 @@ destroy_curl_handle(CURL *c)
return 0;
}
-CURL_HANDLE *
+static CURL_HANDLE *
create_pooled_handle(const char *url)
{
CURL_HANDLE *ch = g_new0(CURL_HANDLE, 1);
@@ -620,141 +625,6 @@ stormfs_curl_head(const char *path, GList **headers)
}
int
-stormfs_curl_head_multi(const char *path, GList *files)
-{
- int running_handles;
- size_t i, n_running, last_req_idx = 0;
- size_t n_files = g_list_length(files);
- HTTP_REQUEST *requests = g_new0(HTTP_REQUEST, n_files);
- GList *head = NULL, *next = NULL;
-
- i = 0;
- n_running = 0;
- head = g_list_first(files);
- while(head != NULL) {
- next = head->next;
- struct file *f = head->data;
-
- CURLMcode err;
- requests[i].headers = NULL;
- requests[i].response.memory = g_malloc0(1);
- requests[i].response.size = 0;
- requests[i].path = get_path(path, f->name);
- requests[i].done = false;
-
- if(n_running < MAX_REQUESTS && n_running < n_files) {
- char *url = get_url(requests[i].path);
- requests[i].c = get_pooled_handle(url);
- sign_request("HEAD", &requests[i].headers, requests[i].path);
- curl_easy_setopt(requests[i].c, CURLOPT_NOBODY, 1L); // HEAD
- curl_easy_setopt(requests[i].c, CURLOPT_FILETIME, 1L); // Last-Modified
- curl_easy_setopt(requests[i].c, CURLOPT_HTTPHEADER, requests[i].headers);
- curl_easy_setopt(requests[i].c, CURLOPT_HEADERDATA, (void *) &requests[i].response);
- curl_easy_setopt(requests[i].c, CURLOPT_HEADERFUNCTION, write_memory_cb);
- g_free(url);
-
- if((err = curl_multi_add_handle(curl.multi, requests[i].c)) != CURLM_OK)
- return -EIO;
-
- n_running++;
- last_req_idx = i;
- }
-
- i++;
- head = next;
- }
-
- curl_multi_perform(curl.multi, &running_handles);
- while(running_handles) {
- if(running_handles) {
- int max_fd = -1;
- long curl_timeout = -1;
- struct timeval timeout;
- CURLMcode err;
-
- fd_set fd_r;
- fd_set fd_w;
- fd_set fd_e;
- FD_ZERO(&fd_r);
- FD_ZERO(&fd_w);
- FD_ZERO(&fd_e);
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-
- curl_multi_timeout(curl.multi, &curl_timeout);
- if(curl_timeout >= 0) {
- timeout.tv_sec = curl_timeout / 1000;
- if(timeout.tv_sec > 1)
- timeout.tv_sec = 1;
- else
- timeout.tv_usec = (curl_timeout % 1000) * 1000;
- }
-
- err = curl_multi_fdset(curl.multi, &fd_r, &fd_w, &fd_e, &max_fd);
- if(err != CURLM_OK)
- return -EIO;
-
- if(select(max_fd + 1, &fd_r, &fd_w, &fd_e, &timeout) == -1)
- return -errno;
- }
-
- curl_multi_perform(curl.multi, &running_handles);
-
- CURLMsg *msg;
- int remaining;
- while((msg = curl_multi_info_read(curl.multi, &remaining))) {
- if(msg->msg != CURLMSG_DONE)
- continue;
-
- for(i = 0; i < n_files; i++) {
- // requests *might* share the same handle out of the pool,
- // make sure the request hasn't also been marked as completed
- if(msg->easy_handle == requests[i].c && !requests[i].done)
- break;
- }
-
- struct file *f = g_list_nth_data(files, i);
- extract_meta(requests[i].response.memory, &(f->headers));
- g_free(requests[i].response.memory);
- curl_slist_free_all(requests[i].headers);
- curl_multi_remove_handle(curl.multi, requests[i].c);
- release_pooled_handle(requests[i].c);
- requests[i].done = true;
- n_running--;
-
- if(n_running < MAX_REQUESTS && last_req_idx < (n_files - 1)) {
- CURLMcode err;
- last_req_idx++;;
-
- char *url = get_url(requests[last_req_idx].path);
- requests[last_req_idx].c = get_pooled_handle(url);
- sign_request("HEAD", &requests[last_req_idx].headers, requests[last_req_idx].path);
- curl_easy_setopt(requests[last_req_idx].c, CURLOPT_NOBODY, 1L); // HEAD
- curl_easy_setopt(requests[last_req_idx].c, CURLOPT_FILETIME, 1L); // Last-Modified
- curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HTTPHEADER, requests[last_req_idx].headers);
- curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HEADERDATA, (void *) &requests[last_req_idx].response);
- curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HEADERFUNCTION, write_memory_cb);
- g_free(url);
-
- if((err = curl_multi_add_handle(curl.multi, requests[last_req_idx].c)) != CURLM_OK)
- return -EIO;
-
- n_running++;
- }
- }
- }
-
- for(i = 0; i < n_files; i++) {
- if(requests[i].c != NULL)
- release_pooled_handle(requests[i].c);
- g_free(requests[i].path);
- }
- g_free(requests);
-
- return 0;
-}
-
-int
stormfs_curl_upload(const char *path, GList *headers, int fd)
{
FILE *f;
View
2 src/curl.h
@@ -59,6 +59,7 @@ char *uid_to_s(uid_t uid);
char *url_encode(char *s);
HTTP_REQUEST *new_request(const char *path);
+CURLM *get_multi_handle(void);
GList *add_header(GList *headers, HTTP_HEADER *h);
GList *strip_header(GList *headers, const char *key);
@@ -78,7 +79,6 @@ void stormfs_curl_destroy();
int stormfs_curl_get(const char *path, char **data);
int stormfs_curl_get_file(const char *path, FILE *f);
int stormfs_curl_head(const char *path, GList **meta);
-int stormfs_curl_head_multi(const char *path, GList *files);
int stormfs_curl_init(struct stormfs *stormfs);
int stormfs_curl_put(const char *path, GList *headers);
int stormfs_curl_rename(const char *from, const char *to);
View
153 src/s3-curl.c
@@ -27,6 +27,7 @@
#define SHA1_BLOCK_SIZE 64
#define SHA1_LENGTH 20
+#define MAX_REQUESTS 100
struct s3_curl {
struct stormfs *stormfs;
@@ -359,7 +360,7 @@ get_url(const char *path)
char *url;
char *encoded_path = url_encode((char *) path);
- if(asprintf(&url, "%s%s?delimiter=/", s3_curl.stormfs->url, encoded_path) == -1) {
+ if(asprintf(&url, "%s%s?delimiter=/", s3_curl.stormfs->virtual_url, encoded_path) == -1) {
fprintf(stderr, "unable to allocate memory\n");
exit(EXIT_FAILURE);
}
@@ -374,7 +375,7 @@ get_multipart_url(const char *path)
char *url;
char *encoded_path = url_encode((char *) path);
- if(asprintf(&url, "%s%s?uploads", s3_curl.stormfs->url, encoded_path) == -1) {
+ if(asprintf(&url, "%s%s?uploads", s3_curl.stormfs->virtual_url, encoded_path) == -1) {
fprintf(stderr, "unable to allocate memory\n");
exit(EXIT_FAILURE);
}
@@ -390,7 +391,7 @@ get_upload_part_url(const char *path, FILE_PART *fp)
char *encoded_path = url_encode((char *) path);
if(asprintf(&url, "%s%s?partNumber=%d&uploadId=%s",
- s3_curl.stormfs->url, encoded_path, fp->part_num, fp->upload_id) == -1) {
+ s3_curl.stormfs->virtual_url, encoded_path, fp->part_num, fp->upload_id) == -1) {
fprintf(stderr, "unable to allocate memory\n");
exit(EXIT_FAILURE);
}
@@ -406,7 +407,7 @@ get_complete_multipart_url(const char *path, char *upload_id)
char *encoded_path = url_encode((char *) path);
if(asprintf(&url, "%s%s?uploadId=%s",
- s3_curl.stormfs->url, encoded_path, upload_id) == -1) {
+ s3_curl.stormfs->virtual_url, encoded_path, upload_id) == -1) {
fprintf(stderr, "unable to allocate memory\n");
exit(EXIT_FAILURE);
}
@@ -424,10 +425,10 @@ get_list_bucket_url(const char *path, const char *next_marker)
if(strlen(path) > 1)
result = asprintf(&url, "%s?delimiter=/&marker=%s&prefix=%s/",
- s3_curl.stormfs->url, next_marker, encoded_path + 1);
+ s3_curl.stormfs->virtual_url, next_marker, encoded_path + 1);
else
result = asprintf(&url, "%s?delimiter=/&marker=%s&prefix=",
- s3_curl.stormfs->url, next_marker);
+ s3_curl.stormfs->virtual_url, next_marker);
if(result == -1) {
fprintf(stderr, "unable to allocate memory\n");
@@ -626,6 +627,10 @@ sign_request(const char *method,
exit(EXIT_FAILURE);
}
+ printf("----------------------------------TO SIGN:\n");
+ printf("%s\n", to_sign);
+ printf("----------------------------------\n");
+
signature = hmac_sha1(s3_curl.stormfs->secret_key, to_sign);
if(asprintf(&authorization, "Authorization: AWS %s:%s",
@@ -1155,3 +1160,139 @@ free_parts(GList *parts)
g_list_foreach(parts, (GFunc) free_part, NULL);
g_list_free(parts);
}
+
+int
+s3_curl_head_multi(const char *path, GList *files)
+{
+ int running_handles;
+ size_t i, n_running, last_req_idx = 0;
+ size_t n_files = g_list_length(files);
+ HTTP_REQUEST *requests = g_new0(HTTP_REQUEST, n_files);
+ GList *head = NULL, *next = NULL;
+ CURLM *multi = get_multi_handle();
+
+ i = 0;
+ n_running = 0;
+ head = g_list_first(files);
+ while(head != NULL) {
+ next = head->next;
+ struct file *f = head->data;
+
+ CURLMcode err;
+ requests[i].headers = NULL;
+ requests[i].response.memory = g_malloc0(1);
+ requests[i].response.size = 0;
+ requests[i].path = get_path(path, f->name);
+ requests[i].done = false;
+
+ if(n_running < MAX_REQUESTS && n_running < n_files) {
+ char *url = get_url(requests[i].path);
+ requests[i].c = get_pooled_handle(url);
+ sign_request("HEAD", &requests[i].headers, requests[i].path);
+ curl_easy_setopt(requests[i].c, CURLOPT_NOBODY, 1L); // HEAD
+ curl_easy_setopt(requests[i].c, CURLOPT_FILETIME, 1L); // Last-Modified
+ curl_easy_setopt(requests[i].c, CURLOPT_HTTPHEADER, requests[i].headers);
+ curl_easy_setopt(requests[i].c, CURLOPT_HEADERDATA, (void *) &requests[i].response);
+ curl_easy_setopt(requests[i].c, CURLOPT_HEADERFUNCTION, write_memory_cb);
+ g_free(url);
+
+ if((err = curl_multi_add_handle(multi, requests[i].c)) != CURLM_OK)
+ return -EIO;
+
+ n_running++;
+ last_req_idx = i;
+ }
+
+ i++;
+ head = next;
+ }
+
+ curl_multi_perform(multi, &running_handles);
+ while(running_handles) {
+ if(running_handles) {
+ int max_fd = -1;
+ long curl_timeout = -1;
+ struct timeval timeout;
+ CURLMcode err;
+
+ fd_set fd_r;
+ fd_set fd_w;
+ fd_set fd_e;
+ FD_ZERO(&fd_r);
+ FD_ZERO(&fd_w);
+ FD_ZERO(&fd_e);
+ timeout.tv_sec = 1;
+ timeout.tv_usec = 0;
+
+ curl_multi_timeout(multi, &curl_timeout);
+ if(curl_timeout >= 0) {
+ timeout.tv_sec = curl_timeout / 1000;
+ if(timeout.tv_sec > 1)
+ timeout.tv_sec = 1;
+ else
+ timeout.tv_usec = (curl_timeout % 1000) * 1000;
+ }
+
+ err = curl_multi_fdset(multi, &fd_r, &fd_w, &fd_e, &max_fd);
+ if(err != CURLM_OK)
+ return -EIO;
+
+ if(select(max_fd + 1, &fd_r, &fd_w, &fd_e, &timeout) == -1)
+ return -errno;
+ }
+
+ curl_multi_perform(multi, &running_handles);
+
+ CURLMsg *msg;
+ int remaining;
+ while((msg = curl_multi_info_read(multi, &remaining))) {
+ if(msg->msg != CURLMSG_DONE)
+ continue;
+
+ for(i = 0; i < n_files; i++) {
+ // requests *might* share the same handle out of the pool,
+ // make sure the request hasn't also been marked as completed
+ if(msg->easy_handle == requests[i].c && !requests[i].done)
+ break;
+ }
+
+ struct file *f = g_list_nth_data(files, i);
+ extract_meta(requests[i].response.memory, &(f->headers));
+ g_free(requests[i].response.memory);
+ curl_slist_free_all(requests[i].headers);
+ curl_multi_remove_handle(multi, requests[i].c);
+ release_pooled_handle(requests[i].c);
+ requests[i].done = true;
+ n_running--;
+
+ if(n_running < MAX_REQUESTS && last_req_idx < (n_files - 1)) {
+ CURLMcode err;
+ last_req_idx++;;
+
+ char *url = get_url(requests[last_req_idx].path);
+ requests[last_req_idx].c = get_pooled_handle(url);
+ sign_request("HEAD", &requests[last_req_idx].headers, requests[last_req_idx].path);
+ curl_easy_setopt(requests[last_req_idx].c, CURLOPT_NOBODY, 1L); // HEAD
+ curl_easy_setopt(requests[last_req_idx].c, CURLOPT_FILETIME, 1L); // Last-Modified
+ curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HTTPHEADER, requests[last_req_idx].headers);
+ curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HEADERDATA, (void *) &requests[last_req_idx].response);
+ curl_easy_setopt(requests[last_req_idx].c, CURLOPT_HEADERFUNCTION, write_memory_cb);
+ g_free(url);
+
+ if((err = curl_multi_add_handle(multi, requests[last_req_idx].c)) != CURLM_OK)
+ return -EIO;
+
+ n_running++;
+ }
+ }
+ }
+
+ for(i = 0; i < n_files; i++) {
+ if(requests[i].c != NULL)
+ release_pooled_handle(requests[i].c);
+ g_free(requests[i].path);
+ }
+ g_free(requests);
+
+ return 0;
+}
View
1 src/s3-curl.h
@@ -72,5 +72,6 @@ int headers_to_stat(GList *headers, struct stat *stbuf);
void s3_curl_destroy(void);
int s3_curl_init(struct stormfs *stormfs);
int s3_curl_list_bucket(const char *path, char **xml);
+int s3_curl_head_multi(const char *path, GList *files);
#endif // s3_curl_H
View
2 src/s3.c
@@ -118,7 +118,7 @@ s3_getattr_multi(const char *path, GList *files)
{
int result;
GList *head = NULL, *next = NULL;
- result = stormfs_curl_head_multi(path, files);
+ result = s3_curl_head_multi(path, files);
head = g_list_first(files);
while(head != NULL) {

0 comments on commit 65ab9c8

Please sign in to comment.