Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ else(WIN32)
DESTINATION
"/usr/local/include")
install(TARGETS ds3 DESTINATION lib)
set(CMAKE_BUILD_TYPE Release)
#set(CMAKE_BUILD_TYPE Debug)
endif(WIN32)
28 changes: 26 additions & 2 deletions src/ds3.c
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,30 @@ ds3_error* ds3_create_client_from_env(ds3_client** client) {
return NULL;
}

// Allow multiple ds3_clients to share a ds3_connection_pool
ds3_client* ds3_copy_client(const ds3_client const* client) {
if (client == NULL) {
return NULL;
}

ds3_client* copied_client = g_new0(ds3_client, 1);
copied_client->endpoint = ds3_str_dup(client->endpoint);
if (client->proxy) {
copied_client->proxy = ds3_str_dup(client->proxy);
}
copied_client->num_redirects = client->num_redirects;
copied_client->creds = ds3_create_creds(client->creds->access_id->value, client->creds->secret_key->value);

ds3_client_register_net( copied_client, net_process_request );
ds3_client_register_logging(copied_client, client->log->log_lvl, client->log->log_callback, NULL);

copied_client->connection_pool = client->connection_pool;
ds3_connection_pool_inc_ref(copied_client->connection_pool);
return copied_client;
}

void ds3_client_proxy(ds3_client* client, const char* proxy) {
ds3_str_free(client->proxy);
client->proxy = ds3_str_init(proxy);
}

Expand All @@ -1080,11 +1103,12 @@ void ds3_client_free(ds3_client* client) {
return;
}

// free client->connection_pool only if there are no remaining references
ds3_connection_pool_dec_ref(client->connection_pool);

ds3_str_free(client->endpoint);
ds3_str_free(client->proxy);
g_free(client->log);
ds3_connection_pool_clear(client->connection_pool);
g_free(client->connection_pool);
g_free(client);
}

Expand Down
1 change: 1 addition & 0 deletions src/ds3.h
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,7 @@ LIBRARY_API void ds3_client_free(ds3_client* client);
LIBRARY_API ds3_creds* ds3_create_creds(const char *const access_id, const char *const secret_key);
LIBRARY_API ds3_client* ds3_create_client(const char *const endpoint, ds3_creds* creds);
LIBRARY_API ds3_error* ds3_create_client_from_env(ds3_client** client);
LIBRARY_API ds3_client* ds3_copy_client(const ds3_client* client);
LIBRARY_API void ds3_client_register_logging(ds3_client* client, ds3_log_lvl log_lvl, void (* log_callback)(const char* log_message, void* user_data), void* user_data);
LIBRARY_API void ds3_client_register_net(ds3_client* client, ds3_error* (* net_callback)(const ds3_client* client,
const ds3_request* _request,
Expand Down
47 changes: 37 additions & 10 deletions src/ds3_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,52 @@
#include <string.h>
#include <curl/curl.h>
#include <glib.h>
#include <inttypes.h>
#include "ds3_net.h"

ds3_connection_pool* ds3_connection_pool_init(void) {
return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE);
}

ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) {
ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1);
pool->connections = g_new0(ds3_connection*, pool_size);
pool->num_connections = pool_size;
g_mutex_init(&pool->mutex);
g_cond_init(&pool->available_connections);
pool->ref_count = 1;
return pool;
}

void ds3_connection_pool_clear(ds3_connection_pool* pool) {
void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) {
int index;

if (pool == NULL) {
return;
}

g_mutex_lock(&pool->mutex);
if (already_locked == False) {
g_mutex_lock(&pool->mutex);
}

for (index = 0; index < CONNECTION_POOL_SIZE; index++) {
for (index = 0; index < pool->num_connections; index++) {
if (pool->connections[index] != NULL) {
curl_easy_cleanup(pool->connections[index]);
}
}

g_free(pool->connections);
g_mutex_unlock(&pool->mutex);
g_mutex_clear(&pool->mutex);
g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined
g_cond_clear(&pool->available_connections);
}

static int _pool_inc(ds3_connection_pool* pool, int index) {
return (index+1) % CONNECTION_POOL_SIZE;
static int _pool_inc(int index, uint16_t num_connections) {
return (index+1) % num_connections;
}

static int _pool_full(ds3_connection_pool* pool) {
return (_pool_inc(pool, pool->tail) == pool->head);
return (_pool_inc(pool->head, pool->num_connections) == pool->tail);
}


Expand All @@ -73,7 +84,7 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
} else {
connection = pool->connections[pool->head];
}
pool->head = _pool_inc(pool, pool->head);
pool->head = _pool_inc(pool->head, pool->num_connections);

g_mutex_unlock(&pool->mutex);

Expand All @@ -82,11 +93,27 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {

void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) {
g_mutex_lock(&pool->mutex);

curl_easy_reset(connection);
pool->tail = _pool_inc(pool, pool->tail);
pool->tail = _pool_inc(pool->tail, pool->num_connections);

g_mutex_unlock(&pool->mutex);
g_cond_signal(&pool->available_connections);
}

void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) {
g_mutex_lock(&pool->mutex);
pool->ref_count++;
g_mutex_unlock(&pool->mutex);
}

void ds3_connection_pool_dec_ref(ds3_connection_pool* pool) {
g_mutex_lock(&pool->mutex);
pool->ref_count--;

if (pool->ref_count == 0) {
ds3_connection_pool_clear(pool, True);
g_free(pool);
} else {
g_mutex_unlock(&pool->mutex);
}
}
20 changes: 13 additions & 7 deletions src/ds3_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern "C" {
#include <curl/curl.h>
#include <glib.h>

#define CONNECTION_POOL_SIZE 100
#define CONNECTION_POOL_SIZE 10

typedef GMutex ds3_mutex;
typedef GCond ds3_condition;
Expand All @@ -36,19 +36,25 @@ typedef CURL ds3_connection;

//-- Opaque struct
struct _ds3_connection_pool{
ds3_connection* connections[CONNECTION_POOL_SIZE];
int head;
int tail;
ds3_mutex mutex;
ds3_condition available_connections;
ds3_connection** connections;
uint16_t num_connections;
int head;
int tail;
ds3_mutex mutex;
ds3_condition available_connections;
uint16_t ref_count;
};

ds3_connection_pool* ds3_connection_pool_init(void);
void ds3_connection_pool_clear(ds3_connection_pool* pool);
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size);
void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked);

ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool);
void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* handle);

void ds3_connection_pool_inc_ref(ds3_connection_pool* pool);
void ds3_connection_pool_dec_ref(ds3_connection_pool* pool);

#ifdef __cplusplus
}
#endif
Expand Down
9 changes: 8 additions & 1 deletion src/ds3_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,12 @@ ds3_error* net_process_request(const ds3_client* client,
url = g_strconcat(client->endpoint->value, request->path->value,"?",query_params, NULL);
g_free(query_params);
}
ds3_log_message(client->log, DS3_DEBUG, "URL[%s]", url);

while (retry_count < client->num_redirects) {
handle = (CURL*)ds3_connection_acquire(client->connection_pool);
ds3_log_message(client->log, DS3_DEBUG, "Acquiring connection...");
handle = ds3_connection_acquire(client->connection_pool);
ds3_log_message(client->log, DS3_DEBUG, "Connection acquired.");

if (handle) {
char* amz_headers;
Expand Down Expand Up @@ -509,14 +512,18 @@ ds3_error* net_process_request(const ds3_client* client,

curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);

ds3_log_message(client->log, DS3_DEBUG, "Attempt curl_easy_perform...");
res = curl_easy_perform(handle);
ds3_log_message(client->log, DS3_DEBUG, "curl_easy_perform done.");

g_free(date);
g_free(date_header);
g_free(signature);
g_free(auth_header);
curl_slist_free_all(headers);
ds3_log_message(client->log, DS3_DEBUG, "Releasing connection...");
ds3_connection_release(client->connection_pool, handle);
ds3_log_message(client->log, DS3_DEBUG, "Connection released.");

//process the response
if (res != CURLE_OK) {
Expand Down
3 changes: 3 additions & 0 deletions src/ds3_string.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ ds3_str* ds3_str_init_with_size(const char* string, size_t size) {
}

ds3_str* ds3_str_dup(const ds3_str* string) {
if (string == NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for null check

return NULL;
}
ds3_str* str = g_new0(ds3_str, 1);
str->value = g_strndup(string->value, string->size);
str->size = string->size;
Expand Down
3 changes: 3 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ else(WIN32) # POSIX
link_directories("${PROJECT_SOURCE_DIR}/../install/lib")
include_directories("${PROJECT_SOURCE_DIR}/../install/include")

set(CMAKE_BUILD_TYPE Release)
#set(CMAKE_BUILD_TYPE Debug)
endif(WIN32)

add_executable(ds3_c_tests
Expand All @@ -74,6 +76,7 @@ add_executable(ds3_c_tests
negative_tests.cpp
search_tests.cpp
service_tests.cpp
connection_tests.cpp
test.cpp)

add_test(regression_tests ds3_c_tests)
Expand Down
Loading