Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ Debug/
*.ilk
*.pdb
ds3.dll
/win32/output/bin
/win32/output/bin
.idea/
cmake-build-debug/
*.dylib
1 change: 1 addition & 0 deletions src/ds3.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "ds3.h"
#include "ds3_net.h"
#include "ds3_connection.h"
#include "ds3_request.h"
#include "ds3_string_multimap_impl.h"
#include "ds3_utils.h"
Expand Down
59 changes: 41 additions & 18 deletions src/ds3_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,35 @@
#include <curl/curl.h>
#include <glib.h>
#include <inttypes.h>
#include "ds3_net.h"

#include "ds3_connection.h"

//-- Opaque struct
struct _ds3_connection_pool{
ds3_connection** connections;
uint16_t num_connections; // the number of connections created
ds3_connection** connection_queue;
uint16_t max_connections; // max number of possible connections, which the connections and queue arrays will be initialized to
int queue_head;
int queue_tail;
ds3_mutex mutex;
ds3_condition available_connection_notifier;
uint16_t ref_count;
};

ds3_connection_pool* ds3_connection_pool_init(void) {
return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE);
return ds3_connection_pool_init_with_size(DEFAULT_CONNECTION_POOL_SIZE);
}

ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) {
ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1);

pool->connections = g_new0(ds3_connection*, pool_size);
pool->num_connections = pool_size;
pool->connection_queue = g_new0(ds3_connection*, pool_size);

pool->max_connections = pool_size;

g_mutex_init(&pool->mutex);
g_cond_init(&pool->available_connections);
g_cond_init(&pool->available_connection_notifier);
pool->ref_count = 1;
return pool;
}
Expand All @@ -55,35 +71,39 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke
}

g_free(pool->connections);
g_free(pool->connection_queue);
g_mutex_unlock(&pool->mutex);
g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined
g_cond_clear(&pool->available_connections);
g_cond_clear(&pool->available_connection_notifier);
}

static int _pool_inc(int index, uint16_t num_connections) {
return (index+1) % num_connections;
static int _queue_inc(int index, uint16_t size) {
return (index+1) % size;
}

static int _pool_full(ds3_connection_pool* pool) {
return (_pool_inc(pool->head, pool->num_connections) == pool->tail);
static int _queue_is_empty(ds3_connection_pool* pool) {
int queue_head = pool->queue_head;
return pool->queue_tail == queue_head && pool->connection_queue[queue_head] == NULL;
}

ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) {
ds3_connection* connection = NULL;

g_mutex_lock(&pool->mutex);
while (_pool_full(pool)) {
g_cond_wait(&pool->available_connections, &pool->mutex);
while (_queue_is_empty(pool) && pool->num_connections >= pool->max_connections) {
g_cond_wait(&pool->available_connection_notifier, &pool->mutex);
}

if (pool->connections[pool->head] == NULL) {
if (_queue_is_empty(pool)) {
connection = curl_easy_init();

pool->connections[pool->head] = connection;
pool->connections[pool->num_connections] = connection;
pool->num_connections++;
} else {
connection = pool->connections[pool->head];
connection = pool->connection_queue[pool->queue_tail];
pool->connection_queue[pool->queue_tail] = NULL;
pool->queue_tail = _queue_inc(pool->queue_tail, pool->max_connections);
}
pool->head = _pool_inc(pool->head, pool->num_connections);

g_mutex_unlock(&pool->mutex);

Expand All @@ -94,10 +114,13 @@ void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connectio
g_mutex_lock(&pool->mutex);

curl_easy_reset(connection);
pool->tail = _pool_inc(pool->tail, pool->num_connections);

pool->connection_queue[pool->queue_head] = connection;

pool->queue_head = _queue_inc(pool->queue_head, pool->max_connections);

g_cond_signal(&pool->available_connection_notifier);
g_mutex_unlock(&pool->mutex);
g_cond_signal(&pool->available_connections);
}

void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) {
Expand Down
14 changes: 3 additions & 11 deletions src/ds3_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,16 @@ extern "C" {

#include <curl/curl.h>
#include <glib.h>
#include "ds3.h"

#define CONNECTION_POOL_SIZE 10
#define DEFAULT_CONNECTION_POOL_SIZE 10

typedef GMutex ds3_mutex;
typedef GCond ds3_condition;

typedef CURL ds3_connection;

//-- Opaque struct
struct _ds3_connection_pool{
ds3_connection** connections;
uint16_t num_connections;
int head;
int tail;
ds3_mutex mutex;
ds3_condition available_connections;
uint16_t ref_count;
};
typedef struct _ds3_connection_pool ds3_connection_pool;

ds3_connection_pool* ds3_connection_pool_init(void);
ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size);
Expand Down
2 changes: 0 additions & 2 deletions src/ds3_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
#include <curl/curl.h>

#include "ds3_request.h"
#include "ds3.h"
#include "ds3_net.h"
#include "ds3_utils.h"
#include "ds3_string_multimap.h"
#include "ds3_string_multimap_impl.h"
#include "ds3_connection.h"

Expand Down
1 change: 0 additions & 1 deletion src/ds3_net.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ extern "C" {

#include "ds3.h"
#include "ds3_string_multimap.h"
#include "ds3_connection.h"

char* escape_url(const char* url);
char* escape_url_extended(const char* url, const char** delimiters, uint32_t num_delimiters);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ add_executable(ds3_c_tests
search_tests.cpp
service_tests.cpp
connection_tests.cpp
put_directory.cpp
test.cpp)

add_test(regression_tests ds3_c_tests)
Expand Down
28 changes: 13 additions & 15 deletions test/connection_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE( ds3_client_create_free ) {
printf("-----Testing ds3_client create and free-------\n");

ds3_client* client = get_client();
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1);
BOOST_CHECK(client->connection_pool != NULL);
ds3_creds_free(client->creds);
ds3_client_free(client);
}
Expand All @@ -34,10 +34,8 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) {
printf("-----Testing ds3_copy_client-------\n");

ds3_client* client = get_client();
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1);

ds3_client* client_copy = ds3_copy_client(client);
BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 2);
BOOST_CHECK_EQUAL(client->endpoint->value, client_copy->endpoint->value);
if (client->proxy) {
BOOST_CHECK_EQUAL(client->proxy->value, client_copy->proxy->value);
Expand All @@ -53,7 +51,7 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) {
ds3_creds_free(client->creds);
ds3_client_free(client);

BOOST_CHECK_EQUAL(client_copy->connection_pool->ref_count, 1);
BOOST_CHECK(client_copy->connection_pool != NULL);
ds3_creds_free(client_copy->creds);
ds3_client_free(client_copy);
}
Expand All @@ -63,18 +61,18 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) {

ds3_client* client = get_client();
ds3_connection_pool* cp = client->connection_pool;
BOOST_CHECK_EQUAL(cp->ref_count, 1);
BOOST_CHECK(cp != NULL);

ds3_client* client_copy = ds3_copy_client(client);
BOOST_CHECK_EQUAL(cp->ref_count, 2);
BOOST_CHECK_EQUAL(cp, client_copy->connection_pool);

const char* client_bucket_name = "create_bucket_from_original_client";
ds3_error* error = create_bucket_with_data_policy(client, client_bucket_name, ids.data_policy_id->value);
handle_error(error);
clear_bucket(client, client_bucket_name);
ds3_creds_free(client->creds);
ds3_client_free(client);
BOOST_CHECK_EQUAL(cp->ref_count, 1);
BOOST_CHECK(client_copy->connection_pool != NULL);

const char* copied_client_bucket_name = "create_bucket_from_copied_client";
error = create_bucket_with_data_policy(client_copy, copied_client_bucket_name, ids.data_policy_id->value);
Expand Down Expand Up @@ -162,7 +160,7 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) {

ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id);

GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False);
GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, NULL, bucket_name, bulk_response, chunk_response, num_threads, False);

GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0));
GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1));
Expand Down Expand Up @@ -204,7 +202,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {

ds3_master_object_list_response* sequential_chunks = ensure_available_chunks(client, mol->job_id);

GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, sequential_bucket_name, mol, sequential_chunks, 1, False);
GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, sequential_bucket_name, mol, sequential_chunks, 1, False);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -238,7 +236,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) {

ds3_master_object_list_response* parallel_chunks = ensure_available_chunks(client, mol->job_id);

GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, parallel_bucket_name, mol, parallel_chunks, 4, False);
GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, parallel_bucket_name, mol, parallel_chunks, 4, False);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -315,8 +313,8 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) {
ds3_master_object_list_response* client1_chunks = ensure_available_chunks(client1, mol1->job_id);
ds3_master_object_list_response* client2_chunks = ensure_available_chunks(client2, mol2->job_id);

GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, client1_bucket_name, mol1, client1_chunks, 1, True);
GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, client2_bucket_name, mol2, client2_chunks, 1, True);
GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, NULL, client1_bucket_name, mol1, client1_chunks, 1, True);
GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, NULL, client2_bucket_name, mol2, client2_chunks, 1, True);

// capture sequential test start time
clock_gettime(CLOCK_MONOTONIC, &start_time_t);
Expand Down Expand Up @@ -407,9 +405,9 @@ BOOST_AUTO_TEST_CASE( performance_bulk_put ) {
ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id);
ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id);

GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True);
GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True);
GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True);
GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, NULL, bucket_name1, bulk_response1, chunks_response1, 1, True);
GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, NULL, bucket_name2, bulk_response2, chunks_response2, 1, True);
GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, NULL, bucket_name3, bulk_response3, chunks_response3, 1, True);

// capture sequential test start time
struct timespec start_time_t, end_time_t;
Expand Down
98 changes: 98 additions & 0 deletions test/put_directory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* ******************************************************************************
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* ****************************************************************************
*/

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <glib.h>
#include <sys/stat.h>
#include <boost/test/unit_test.hpp>
#include <inttypes.h>
#include "ds3.h"
#include "ds3_net.h"
#include "ds3_utils.h"
#include "test.h"

BOOST_AUTO_TEST_CASE( put_directory_4_threads) {
printf("-----Testing PUT all objects in a directory with 4 threads-------\n");

const char* dir_path = getenv("DS3_TEST_DIRECTORY");
if (dir_path == NULL) {
printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory_4_threads test.\n");
return;
}

const char* bucket_name = "test_bulk_put_directory";
printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name);

ds3_client* client = get_client_at_loglvl(DS3_DEBUG);
int client_thread=1;
ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging

ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value);

char* objects_list[100];
uint64_t num_objs = 0;
GDir* dir_info = g_dir_open(dir_path, 0, NULL);
for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) {
objects_list[num_objs++] = current_obj;
printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]);
}

ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path);

ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list);
ds3_master_object_list_response* mol;
error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol);
ds3_request_free(request);
ds3_bulk_object_list_response_free(bulk_object_list);
handle_error(error);

// Allocate cache
ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id);

// Use helper functions from test.cpp
const uint8_t num_threads = 4;
GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, num_threads, True); // Last param indicates verbose logging in the spawned thread


// capture test start time
struct timespec start_time_t, end_time_t;
double elapsed_t;
clock_gettime(CLOCK_MONOTONIC, &start_time_t);

GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0));
GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1));
GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2));
GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3));

// Block and cleanup GThread(s)
g_thread_join(put_dir_xfer_thread_0);
g_thread_join(put_dir_xfer_thread_1);
g_thread_join(put_dir_xfer_thread_2);
g_thread_join(put_dir_xfer_thread_3);

// find elapsed CPU and real time
clock_gettime(CLOCK_MONOTONIC, &end_time_t);
elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t);
ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t);

g_dir_close(dir_info);
ds3_master_object_list_response_free(chunks_list);
ds3_master_object_list_response_free(mol);
put_chunks_threads_args_free(put_dir_args);
clear_bucket(client, bucket_name);
free_client(client);
}
Loading