diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c9a71e58..52baed81 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -82,4 +82,6 @@ else(WIN32) DESTINATION "/usr/local/include") install(TARGETS ds3 DESTINATION lib) + set(CMAKE_BUILD_TYPE Release) + #set(CMAKE_BUILD_TYPE Debug) endif(WIN32) diff --git a/src/ds3.c b/src/ds3.c index a89ab95e..2bca9492 100644 --- a/src/ds3.c +++ b/src/ds3.c @@ -1061,7 +1061,30 @@ ds3_error* ds3_create_client_from_env(ds3_client** client) { return NULL; } +// Allow multiple ds3_clients to share a ds3_connection_pool +ds3_client* ds3_copy_client(const ds3_client const* client) { + if (client == NULL) { + return NULL; + } + + ds3_client* copied_client = g_new0(ds3_client, 1); + copied_client->endpoint = ds3_str_dup(client->endpoint); + if (client->proxy) { + copied_client->proxy = ds3_str_dup(client->proxy); + } + copied_client->num_redirects = client->num_redirects; + copied_client->creds = ds3_create_creds(client->creds->access_id->value, client->creds->secret_key->value); + + ds3_client_register_net( copied_client, net_process_request ); + ds3_client_register_logging(copied_client, client->log->log_lvl, client->log->log_callback, NULL); + + copied_client->connection_pool = client->connection_pool; + ds3_connection_pool_inc_ref(copied_client->connection_pool); + return copied_client; +} + void ds3_client_proxy(ds3_client* client, const char* proxy) { + ds3_str_free(client->proxy); client->proxy = ds3_str_init(proxy); } @@ -1080,11 +1103,12 @@ void ds3_client_free(ds3_client* client) { return; } + // free client->connection_pool only if there are no remaining references + ds3_connection_pool_dec_ref(client->connection_pool); + ds3_str_free(client->endpoint); ds3_str_free(client->proxy); g_free(client->log); - ds3_connection_pool_clear(client->connection_pool); - g_free(client->connection_pool); g_free(client); } diff --git a/src/ds3.h b/src/ds3.h index 8150fcdc..85134628 100644 --- a/src/ds3.h +++ b/src/ds3.h @@ -2329,6 +2329,7 @@ LIBRARY_API void ds3_client_free(ds3_client* client); LIBRARY_API ds3_creds* ds3_create_creds(const char *const access_id, const char *const secret_key); LIBRARY_API ds3_client* ds3_create_client(const char *const endpoint, ds3_creds* creds); LIBRARY_API ds3_error* ds3_create_client_from_env(ds3_client** client); +LIBRARY_API ds3_client* ds3_copy_client(const ds3_client* client); LIBRARY_API void ds3_client_register_logging(ds3_client* client, ds3_log_lvl log_lvl, void (* log_callback)(const char* log_message, void* user_data), void* user_data); LIBRARY_API void ds3_client_register_net(ds3_client* client, ds3_error* (* net_callback)(const ds3_client* client, const ds3_request* _request, diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 7472e89b..42a7be61 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -20,41 +20,52 @@ #include #include #include +#include #include "ds3_net.h" ds3_connection_pool* ds3_connection_pool_init(void) { + return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE); +} + +ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); + pool->connections = g_new0(ds3_connection*, pool_size); + pool->num_connections = pool_size; g_mutex_init(&pool->mutex); g_cond_init(&pool->available_connections); + pool->ref_count = 1; return pool; } -void ds3_connection_pool_clear(ds3_connection_pool* pool) { +void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked) { int index; if (pool == NULL) { return; } - g_mutex_lock(&pool->mutex); + if (already_locked == False) { + g_mutex_lock(&pool->mutex); + } - for (index = 0; index < CONNECTION_POOL_SIZE; index++) { + for (index = 0; index < pool->num_connections; index++) { if (pool->connections[index] != NULL) { curl_easy_cleanup(pool->connections[index]); } } + g_free(pool->connections); g_mutex_unlock(&pool->mutex); - g_mutex_clear(&pool->mutex); + g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined g_cond_clear(&pool->available_connections); } -static int _pool_inc(ds3_connection_pool* pool, int index) { - return (index+1) % CONNECTION_POOL_SIZE; +static int _pool_inc(int index, uint16_t num_connections) { + return (index+1) % num_connections; } static int _pool_full(ds3_connection_pool* pool) { - return (_pool_inc(pool, pool->tail) == pool->head); + return (_pool_inc(pool->head, pool->num_connections) == pool->tail); } @@ -73,7 +84,7 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { } else { connection = pool->connections[pool->head]; } - pool->head = _pool_inc(pool, pool->head); + pool->head = _pool_inc(pool->head, pool->num_connections); g_mutex_unlock(&pool->mutex); @@ -82,11 +93,27 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { g_mutex_lock(&pool->mutex); - curl_easy_reset(connection); - pool->tail = _pool_inc(pool, pool->tail); + pool->tail = _pool_inc(pool->tail, pool->num_connections); g_mutex_unlock(&pool->mutex); g_cond_signal(&pool->available_connections); } +void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) { + g_mutex_lock(&pool->mutex); + pool->ref_count++; + g_mutex_unlock(&pool->mutex); +} + +void ds3_connection_pool_dec_ref(ds3_connection_pool* pool) { + g_mutex_lock(&pool->mutex); + pool->ref_count--; + + if (pool->ref_count == 0) { + ds3_connection_pool_clear(pool, True); + g_free(pool); + } else { + g_mutex_unlock(&pool->mutex); + } +} diff --git a/src/ds3_connection.h b/src/ds3_connection.h index 448dbe4f..5374bddd 100644 --- a/src/ds3_connection.h +++ b/src/ds3_connection.h @@ -27,7 +27,7 @@ extern "C" { #include #include -#define CONNECTION_POOL_SIZE 100 +#define CONNECTION_POOL_SIZE 10 typedef GMutex ds3_mutex; typedef GCond ds3_condition; @@ -36,19 +36,25 @@ typedef CURL ds3_connection; //-- Opaque struct struct _ds3_connection_pool{ - ds3_connection* connections[CONNECTION_POOL_SIZE]; - int head; - int tail; - ds3_mutex mutex; - ds3_condition available_connections; + ds3_connection** connections; + uint16_t num_connections; + int head; + int tail; + ds3_mutex mutex; + ds3_condition available_connections; + uint16_t ref_count; }; ds3_connection_pool* ds3_connection_pool_init(void); -void ds3_connection_pool_clear(ds3_connection_pool* pool); +ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size); +void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locked); ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool); void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* handle); +void ds3_connection_pool_inc_ref(ds3_connection_pool* pool); +void ds3_connection_pool_dec_ref(ds3_connection_pool* pool); + #ifdef __cplusplus } #endif diff --git a/src/ds3_net.c b/src/ds3_net.c index 45d67834..8d64b263 100644 --- a/src/ds3_net.c +++ b/src/ds3_net.c @@ -398,9 +398,12 @@ ds3_error* net_process_request(const ds3_client* client, url = g_strconcat(client->endpoint->value, request->path->value,"?",query_params, NULL); g_free(query_params); } + ds3_log_message(client->log, DS3_DEBUG, "URL[%s]", url); while (retry_count < client->num_redirects) { - handle = (CURL*)ds3_connection_acquire(client->connection_pool); + ds3_log_message(client->log, DS3_DEBUG, "Acquiring connection..."); + handle = ds3_connection_acquire(client->connection_pool); + ds3_log_message(client->log, DS3_DEBUG, "Connection acquired."); if (handle) { char* amz_headers; @@ -509,14 +512,18 @@ ds3_error* net_process_request(const ds3_client* client, curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers); + ds3_log_message(client->log, DS3_DEBUG, "Attempt curl_easy_perform..."); res = curl_easy_perform(handle); + ds3_log_message(client->log, DS3_DEBUG, "curl_easy_perform done."); g_free(date); g_free(date_header); g_free(signature); g_free(auth_header); curl_slist_free_all(headers); + ds3_log_message(client->log, DS3_DEBUG, "Releasing connection..."); ds3_connection_release(client->connection_pool, handle); + ds3_log_message(client->log, DS3_DEBUG, "Connection released."); //process the response if (res != CURLE_OK) { diff --git a/src/ds3_string.c b/src/ds3_string.c index 2750f324..8b81f777 100644 --- a/src/ds3_string.c +++ b/src/ds3_string.c @@ -30,6 +30,9 @@ ds3_str* ds3_str_init_with_size(const char* string, size_t size) { } ds3_str* ds3_str_dup(const ds3_str* string) { + if (string == NULL) { + return NULL; + } ds3_str* str = g_new0(ds3_str, 1); str->value = g_strndup(string->value, string->size); str->size = string->size; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 80edbd32..7d303973 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -59,6 +59,8 @@ else(WIN32) # POSIX link_directories("${PROJECT_SOURCE_DIR}/../install/lib") include_directories("${PROJECT_SOURCE_DIR}/../install/include") + set(CMAKE_BUILD_TYPE Release) + #set(CMAKE_BUILD_TYPE Debug) endif(WIN32) add_executable(ds3_c_tests @@ -74,6 +76,7 @@ add_executable(ds3_c_tests negative_tests.cpp search_tests.cpp service_tests.cpp + connection_tests.cpp test.cpp) add_test(regression_tests ds3_c_tests) diff --git a/test/bulk_put.cpp b/test/bulk_put.cpp index 14c60d07..91eff97b 100644 --- a/test/bulk_put.cpp +++ b/test/bulk_put.cpp @@ -20,180 +20,11 @@ #include #include #include +#include #include "ds3.h" #include "ds3_net.h" #include "test.h" -#define BUFF_SIZE 16 - -/** - * Create a ds3_bulk_object_list_response with the same name many times, append a number - */ -ds3_bulk_object_list_response* create_bulk_object_list_single_file(const char* file_name, size_t num_files) { - char put_filename[BUFF_SIZE]; - - struct stat file_info; - memset(&file_info, 0, sizeof(struct stat)); - stat(file_name, &file_info); - - ds3_bulk_object_list_response* obj_list = ds3_init_bulk_object_list(); - - GPtrArray* ds3_bulk_object_response_array = g_ptr_array_new(); - for (size_t index = 0; index < num_files; index++) { - g_snprintf(put_filename, BUFF_SIZE, "file_%05lu", index); - - ds3_bulk_object_response* obj = g_new0(ds3_bulk_object_response, 1); - obj->name = ds3_str_init(put_filename); - obj->length = file_info.st_size; - g_ptr_array_add(ds3_bulk_object_response_array, obj); - } - - obj_list->objects = (ds3_bulk_object_response**)ds3_bulk_object_response_array->pdata; - obj_list->num_objects = ds3_bulk_object_response_array->len; - g_ptr_array_free(ds3_bulk_object_response_array, FALSE); - - return obj_list; -} - -/** - * g_thread_new only takes a single parameter to pass to the spawned thread, so its necessary - * to wrap multiple parameters in a struct to be passed along. - */ -typedef struct { - uint8_t num_threads; - uint8_t thread_num; - ds3_client* client; - char* job_id; - char* src_object_name; - char* bucket_name; - ds3_master_object_list_response* chunks_list; -} test_put_chunks_args; - -void put_chunks(void* args) { - test_put_chunks_args* put_chunks_args = (test_put_chunks_args*)args; - ds3_objects_response* chunk_object_list = NULL; - - for (size_t chunk_index = 0; chunk_index < put_chunks_args->chunks_list->num_objects; chunk_index++) { - chunk_object_list = put_chunks_args->chunks_list->objects[chunk_index]; - for (size_t object_index = 0; object_index < chunk_object_list->num_objects; object_index++) { - - // Work distribution - if (object_index % put_chunks_args->num_threads == put_chunks_args->thread_num) { - ds3_bulk_object_response* object = chunk_object_list->objects[object_index]; - // Send the same file every time, give it a different destination name - FILE* file = fopen(put_chunks_args->src_object_name, "r"); - if (file == NULL) { - printf("Unable to open %s for read (FILE NULL), skipping put to bucket %s!\n", put_chunks_args->src_object_name, put_chunks_args->bucket_name); - return; - } - - ds3_request* request = ds3_init_put_object_request(put_chunks_args->bucket_name, object->name->value, object->length); - ds3_request_set_job(request, put_chunks_args->job_id); - if (object->offset > 0) { - fseek(file, object->offset, SEEK_SET); - } - ds3_error* error = ds3_put_object_request(put_chunks_args->client, request, file, ds3_read_from_file); - ds3_request_free(request); - - fclose(file); - handle_error(error); - } - } - } -} - -BOOST_AUTO_TEST_CASE( bulk_put_10k_very_small_files ) { - printf("-----Testing Bulk PUT of 10k very small files-------\n"); - ds3_request* request = NULL; - const char* bucket_name = "test_bulk_put_10k_very_small_files_bucket"; - const char* object_name = "resources/very_small_file.txt"; - ds3_master_object_list_response* bulk_response = NULL; - ds3_bulk_object_list_response* object_list = create_bulk_object_list_single_file(object_name, 10000); - ds3_client* client = get_client(); - - ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); - - request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, object_list); - error = ds3_put_bulk_job_spectra_s3_request(client, request, &bulk_response); - ds3_request_free(request); - ds3_bulk_object_list_response_free(object_list); - handle_error(error); - - test_put_chunks_args* put_chunks_args = g_new0(test_put_chunks_args, 1); - put_chunks_args->client = client; - put_chunks_args->num_threads = 1; - put_chunks_args->thread_num = 0; - put_chunks_args->job_id = bulk_response->job_id->value; - put_chunks_args->src_object_name = (char*)object_name; - put_chunks_args->bucket_name = (char*)bucket_name; - put_chunks_args->chunks_list = ensure_available_chunks(client, bulk_response->job_id); - - put_chunks(put_chunks_args); - - ds3_master_object_list_response_free(put_chunks_args->chunks_list); - ds3_master_object_list_response_free(bulk_response); - g_free(put_chunks_args); - - clear_bucket(client, bucket_name); - free_client(client); -} - - -BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) { - printf("-----Testing Bulk PUT of 200 very small files multithreaded-------\n"); - const char* bucket_name = "test_bulk_put_200_very_small_files_multithreaded"; - const char* object_name = "resources/very_small_file.txt"; - ds3_request* request = NULL; - ds3_master_object_list_response* bulk_response = NULL; - ds3_bulk_object_list_response* object_list = create_bulk_object_list_single_file(object_name, 200); - ds3_client* client = get_client(); - - ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); - - request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, object_list); - - error = ds3_put_bulk_job_spectra_s3_request(client, request, &bulk_response); - ds3_request_free(request); - ds3_bulk_object_list_response_free(object_list); - handle_error(error); - - ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id); - - // send to child thread 1 - test_put_chunks_args* put_odd_objects_args = g_new0(test_put_chunks_args, 1); - put_odd_objects_args->client = client; - put_odd_objects_args->job_id = bulk_response->job_id->value; - put_odd_objects_args->src_object_name = (char*)object_name; - put_odd_objects_args->bucket_name = (char*)bucket_name; - put_odd_objects_args->chunks_list = chunk_response; - put_odd_objects_args->thread_num = 0; - put_odd_objects_args->num_threads = 2; - - // send to child thread 2 - test_put_chunks_args* put_even_objects_args = g_new0(test_put_chunks_args, 1); - put_even_objects_args->client = client; - put_even_objects_args->job_id = bulk_response->job_id->value; - put_even_objects_args->src_object_name = (char*)object_name; - put_even_objects_args->bucket_name = (char*)bucket_name; - put_even_objects_args->chunks_list = chunk_response; - put_even_objects_args->thread_num = 1; - put_even_objects_args->num_threads = 2; - - GThread* even_chunks_thread = g_thread_new("even_objects", (GThreadFunc)put_chunks, put_even_objects_args); - GThread* odd_chunks_thread = g_thread_new("odd_objects", (GThreadFunc)put_chunks, put_odd_objects_args); - - // Block and cleanup GThreads - g_thread_join(even_chunks_thread); - g_thread_join(odd_chunks_thread); - - ds3_master_object_list_response_free(chunk_response); - ds3_master_object_list_response_free(bulk_response); - g_free(put_odd_objects_args); - g_free(put_even_objects_args); - - clear_bucket(client, bucket_name); - free_client(client); -} BOOST_AUTO_TEST_CASE( put_utf_object_name ) { printf("-----Testing PUT object with UTF Characters in name-------\n"); diff --git a/test/connection_tests.cpp b/test/connection_tests.cpp new file mode 100644 index 00000000..fe8ccb3b --- /dev/null +++ b/test/connection_tests.cpp @@ -0,0 +1,454 @@ +/* + * ****************************************************************************** + * Copyright 2014-2017 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +#include +#include "ds3.h" +#include "ds3_net.h" +#include "ds3_utils.h" +#include "test.h" +#include +#include + +BOOST_AUTO_TEST_CASE( ds3_client_create_free ) { + printf("-----Testing ds3_client create and free-------\n"); + + ds3_client* client = get_client(); + BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); + ds3_creds_free(client->creds); + ds3_client_free(client); +} + +BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) { + printf("-----Testing ds3_copy_client-------\n"); + + ds3_client* client = get_client(); + BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); + + ds3_client* client_copy = ds3_copy_client(client); + BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 2); + BOOST_CHECK_EQUAL(client->endpoint->value, client_copy->endpoint->value); + if (client->proxy) { + BOOST_CHECK_EQUAL(client->proxy->value, client_copy->proxy->value); + } + BOOST_CHECK_EQUAL(client->num_redirects, client_copy->num_redirects); + if (client->creds) { + BOOST_CHECK_EQUAL(client->creds->access_id->value, client_copy->creds->access_id->value); + BOOST_CHECK_EQUAL(client->creds->secret_key->value, client_copy->creds->secret_key->value); + } + BOOST_CHECK_EQUAL(client->net_callback, client_copy->net_callback); + BOOST_CHECK_EQUAL(client->connection_pool, client_copy->connection_pool); + + ds3_creds_free(client->creds); + ds3_client_free(client); + + BOOST_CHECK_EQUAL(client_copy->connection_pool->ref_count, 1); + ds3_creds_free(client_copy->creds); + ds3_client_free(client_copy); +} + +BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) { + printf("-----Testing create bucket with copied client-------\n"); + + ds3_client* client = get_client(); + ds3_connection_pool* cp = client->connection_pool; + BOOST_CHECK_EQUAL(cp->ref_count, 1); + + ds3_client* client_copy = ds3_copy_client(client); + BOOST_CHECK_EQUAL(cp->ref_count, 2); + + const char* client_bucket_name = "create_bucket_from_original_client"; + ds3_error* error = create_bucket_with_data_policy(client, client_bucket_name, ids.data_policy_id->value); + handle_error(error); + clear_bucket(client, client_bucket_name); + ds3_creds_free(client->creds); + ds3_client_free(client); + BOOST_CHECK_EQUAL(cp->ref_count, 1); + + const char* copied_client_bucket_name = "create_bucket_from_copied_client"; + error = create_bucket_with_data_policy(client_copy, copied_client_bucket_name, ids.data_policy_id->value); + handle_error(error); + + ds3_request* request = ds3_init_get_service_request(); + ds3_list_all_my_buckets_result_response* bucket_list = NULL; + error = ds3_get_service_request(client_copy, request, &bucket_list); + ds3_request_free(request); + handle_error(error); + BOOST_CHECK(bucket_list != NULL); + BOOST_CHECK_EQUAL(bucket_list->num_buckets, 1); + BOOST_CHECK_EQUAL(bucket_list->buckets[0]->name->value, copied_client_bucket_name); + ds3_list_all_my_buckets_result_response_free(bucket_list); + + request = ds3_init_get_bucket_request(copied_client_bucket_name); + ds3_list_bucket_result_response* bucket_info = NULL; + error = ds3_get_bucket_request(client_copy, request, &bucket_info); + ds3_request_free(request); + handle_error(error); + BOOST_CHECK(bucket_info != NULL); + BOOST_CHECK_EQUAL(bucket_info->num_objects, 0); + BOOST_CHECK_EQUAL(bucket_info->name->value, copied_client_bucket_name); + ds3_list_bucket_result_response_free(bucket_info); + clear_bucket(client_copy, copied_client_bucket_name); + + ds3_creds_free(client_copy->creds); + ds3_client_free(client_copy); +} + +BOOST_AUTO_TEST_CASE( bulk_put_10k_very_small_files ) { + printf("-----Testing Bulk PUT of 10k very small files-------\n"); + ds3_request* request = NULL; + const char* bucket_name = "test_bulk_put_10k_very_small_files_bucket"; + const char* object_name = "resources/very_small_file.txt"; + ds3_master_object_list_response* bulk_response = NULL; + ds3_bulk_object_list_response* object_list = create_bulk_object_list_single_file(object_name, 10000); + ds3_client* client = get_client_at_loglvl(DS3_INFO); + ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); + + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, object_list); + error = ds3_put_bulk_job_spectra_s3_request(client, request, &bulk_response); + ds3_request_free(request); + ds3_bulk_object_list_response_free(object_list); + handle_error(error); + + put_chunks_args* put_chunks_args_single_thread = g_new0(put_chunks_args, 1); + put_chunks_args_single_thread->client = client; + put_chunks_args_single_thread->num_threads = 1; + put_chunks_args_single_thread->thread_num = 0; + put_chunks_args_single_thread->job_id = bulk_response->job_id->value; + put_chunks_args_single_thread->src_object_name = (char*)object_name; + put_chunks_args_single_thread->bucket_name = (char*)bucket_name; + put_chunks_args_single_thread->chunks_list = ensure_available_chunks(client, bulk_response->job_id); + put_chunks_args_single_thread->verbose = False; + + put_chunks_from_file(put_chunks_args_single_thread); + + ds3_master_object_list_response_free(put_chunks_args_single_thread->chunks_list); + ds3_master_object_list_response_free(bulk_response); + g_free(put_chunks_args_single_thread); + + clear_bucket(client, bucket_name); + free_client(client); +} + +BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) { + printf("-----Testing Bulk PUT of 200 very small files multithreaded-------\n"); + const char* bucket_name = "test_bulk_put_200_very_small_files_multithreaded"; + const uint8_t num_threads = 2; + const char* object_name = "resources/very_small_file.txt"; + ds3_request* request = NULL; + ds3_master_object_list_response* bulk_response = NULL; + ds3_bulk_object_list_response* object_list = create_bulk_object_list_single_file(object_name, 200); + ds3_client* client = get_client(); + + ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); + + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, object_list); + + error = ds3_put_bulk_job_spectra_s3_request(client, request, &bulk_response); + ds3_request_free(request); + ds3_bulk_object_list_response_free(object_list); + handle_error(error); + + ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id); + + GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False); + + GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0)); + GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1)); + + // Block and cleanup GThreads + g_thread_join(chunks_thread_0); + g_thread_join(chunks_thread_1); + + ds3_master_object_list_response_free(chunk_response); + ds3_master_object_list_response_free(bulk_response); + put_chunks_threads_args_free(put_objs_args_array); + + clear_bucket(client, bucket_name); + free_client(client); +} + +BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) { + printf("-----Testing BULK_PUT of objects in parallel (4 threads) vs sequentially (1 thread)-------\n"); + + const char* sequential_bucket_name = "test_bulk_put_sequential"; + const char* obj_name = "resources/ulysses_46mb.txt"; + + struct timespec start_time_t, end_time_t; + double elapsed_sequential_t, elapsed_parallel_t; + + ds3_bulk_object_list_response* obj_list = create_bulk_object_list_single_file(obj_name, 100); + ds3_client* client = get_client(); + ds3_master_object_list_response* mol = NULL; + ds3_request* request = NULL; + + + // *** Start Sequential test config *** + ds3_error* error = create_bucket_with_data_policy(client, sequential_bucket_name, ids.data_policy_id->value); + + request = ds3_init_put_bulk_job_spectra_s3_request(sequential_bucket_name, obj_list); + error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); + ds3_request_free(request); + handle_error(error); + + ds3_master_object_list_response* sequential_chunks = ensure_available_chunks(client, mol->job_id); + + GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, sequential_bucket_name, mol, sequential_chunks, 1, False); + + // capture sequential test start time + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + GThread* xfer_sequential_thread = g_thread_new("sequential_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_sequential_objs_threads_array, 0)); + + // Block and cleanup GThreads + g_thread_join(xfer_sequential_thread); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_sequential_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + printf(" Sequential elapsed time[%f]\n", elapsed_sequential_t); + + ds3_master_object_list_response_free(sequential_chunks); + ds3_master_object_list_response_free(mol); + put_chunks_threads_args_free(put_sequential_objs_threads_array); + + clear_bucket(client, sequential_bucket_name); + + + // *** Start Parallel test config *** + const char* parallel_bucket_name = "test_bulk_put_parallel"; + + error = create_bucket_with_data_policy(client, parallel_bucket_name, ids.data_policy_id->value); + + request = ds3_init_put_bulk_job_spectra_s3_request(parallel_bucket_name, obj_list); + error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); + ds3_request_free(request); + handle_error(error); + + ds3_master_object_list_response* parallel_chunks = ensure_available_chunks(client, mol->job_id); + + GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, parallel_bucket_name, mol, parallel_chunks, 4, False); + + // capture sequential test start time + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + GThread* xfer_parallel_thread_0 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 0)); + GThread* xfer_parallel_thread_1 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 1)); + GThread* xfer_parallel_thread_2 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 2)); + GThread* xfer_parallel_thread_3 = g_thread_new("parallel_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_parallel_objs_threads_array, 3)); + + // Block and cleanup GThreads + g_thread_join(xfer_parallel_thread_0); + g_thread_join(xfer_parallel_thread_1); + g_thread_join(xfer_parallel_thread_2); + g_thread_join(xfer_parallel_thread_3); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_parallel_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + printf(" Parallel elapsed time[%f]\n", elapsed_parallel_t); + + ds3_master_object_list_response_free(parallel_chunks); + ds3_master_object_list_response_free(mol); + put_chunks_threads_args_free(put_parallel_objs_threads_array); + + clear_bucket(client, parallel_bucket_name); + + ds3_bulk_object_list_response_free(obj_list); + free_client(client); +} + +BOOST_AUTO_TEST_CASE( multiple_client_xfer ) { + printf("-----Testing BULK_PUT of objects from 2 clients in parallel-------\n"); + + const char* client1_bucket_name = "test_bulk_put_client1"; + const char* client2_bucket_name = "test_bulk_put_client2"; + const char* obj_name = "resources/ulysses_46mb.txt"; + + struct timespec start_time_t, end_time_t; + double elapsed_t; + + ds3_bulk_object_list_response* obj_list = create_bulk_object_list_single_file(obj_name, 100); + ds3_client* client1 = get_client(); + ds3_client* client2 = ds3_copy_client(client1); // share the connection pool + + int client1_thread=1, client2_thread=2; + ds3_client_register_logging(client1, DS3_INFO, test_log, (void*)&client1_thread); + ds3_client_register_logging(client2, DS3_INFO, test_log, (void*)&client2_thread); + + ds3_master_object_list_response* mol1 = NULL; + ds3_master_object_list_response* mol2 = NULL; + ds3_request* request = NULL; + + ds3_log_message(client1->log, DS3_INFO, "Create bucket1"); + ds3_error* error = create_bucket_with_data_policy(client1, client1_bucket_name, ids.data_policy_id->value); + handle_error(error); + ds3_log_message(client2->log, DS3_INFO, "Create bucket2"); + error = create_bucket_with_data_policy(client2, client2_bucket_name, ids.data_policy_id->value); + handle_error(error); + + ds3_log_message(client1->log, DS3_INFO, "init put_bulk bucket1"); + request = ds3_init_put_bulk_job_spectra_s3_request(client1_bucket_name, obj_list); + ds3_log_message(client1->log, DS3_INFO, "put_bulk bucket1"); + error = ds3_put_bulk_job_spectra_s3_request(client1, request, &mol1); + ds3_request_free(request); + handle_error(error); + + ds3_log_message(client2->log, DS3_INFO, "init put_bulk bucket2"); + request = ds3_init_put_bulk_job_spectra_s3_request(client2_bucket_name, obj_list); + ds3_log_message(client2->log, DS3_INFO, "put_bulk bucket2"); + error = ds3_put_bulk_job_spectra_s3_request(client2, request, &mol2); + ds3_request_free(request); + handle_error(error); + + ds3_master_object_list_response* client1_chunks = ensure_available_chunks(client1, mol1->job_id); + ds3_master_object_list_response* client2_chunks = ensure_available_chunks(client2, mol2->job_id); + + GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, client1_bucket_name, mol1, client1_chunks, 1, True); + GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, client2_bucket_name, mol2, client2_chunks, 1, True); + + // capture sequential test start time + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + GThread* client1_xfer_thread = g_thread_new("client1_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(client1_put_objs_args, 0)); + GThread* client2_xfer_thread = g_thread_new("client2_objs_xfer", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(client2_put_objs_args, 0)); + + // Block and cleanup GThreads + g_thread_join(client1_xfer_thread); + g_thread_join(client2_xfer_thread); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + ds3_log_message(client1->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + + ds3_master_object_list_response_free(client1_chunks); + ds3_master_object_list_response_free(mol1); + put_chunks_threads_args_free(client1_put_objs_args); + clear_bucket(client1, client1_bucket_name); + free_client(client1); + + ds3_master_object_list_response_free(client2_chunks); + ds3_master_object_list_response_free(mol2); + put_chunks_threads_args_free(client2_put_objs_args); + clear_bucket(client2, client2_bucket_name); + free_client(client2); + + ds3_bulk_object_list_response_free(obj_list); +} + + +/* + * Transfer from a memory buffer rather than a file on disk to eliminate any Disk Read bottleneck. + */ +BOOST_AUTO_TEST_CASE( performance_bulk_put ) { + printf("-----Testing BULK_PUT performance-------\n"); + + const char* bucket_name1 = "bulk_put_performance_bucket1"; + const char* bucket_name2 = "bulk_put_performance_bucket2"; + const char* bucket_name3 = "bulk_put_performance_bucket3"; + ds3_request* request = NULL; + ds3_master_object_list_response* bulk_response1 = NULL; + ds3_master_object_list_response* bulk_response2 = NULL; + ds3_master_object_list_response* bulk_response3 = NULL; + + ds3_client* client1 = get_client(); + ds3_client* client2 = ds3_copy_client(client1); // share the connection pool + ds3_client* client3 = ds3_copy_client(client1); // share the connection pool + // Log per thread + int client1_thread=1, client2_thread=2, client3_thread=3; + ds3_client_register_logging(client1, DS3_INFO, test_log, (void*)&client1_thread); + ds3_client_register_logging(client2, DS3_INFO, test_log, (void*)&client2_thread); + ds3_client_register_logging(client3, DS3_INFO, test_log, (void*)&client3_thread); + + + ds3_error* error = create_bucket_with_data_policy(client1, bucket_name1, ids.data_policy_id->value); + handle_error(error); + error = create_bucket_with_data_policy(client1, bucket_name2, ids.data_policy_id->value); + handle_error(error); + error = create_bucket_with_data_policy(client1, bucket_name3, ids.data_policy_id->value); + handle_error(error); + + // Create the list of fake files to transfer + size_t obj_size = 512 * 1024 * 1024; // 512MB + const char* obj_prefix = "perf_obj"; + size_t num_files = 10; + ds3_bulk_object_list_response* obj_list = create_bulk_object_list_from_prefix_with_size(obj_prefix, num_files, obj_size); + + // Create the BULK_PUT jobs + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name1, obj_list); + error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response1); + handle_error(error); + ds3_request_free(request); + + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name2, obj_list); + error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response2); + handle_error(error); + ds3_request_free(request); + + request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name3, obj_list); + error = ds3_put_bulk_job_spectra_s3_request(client1, request, &bulk_response3); + handle_error(error); + ds3_request_free(request); + + // Ensure cache space for the jobs + ds3_master_object_list_response* chunks_response1 = ensure_available_chunks(client1, bulk_response1->job_id); + ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id); + ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id); + + GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True); + GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True); + GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True); + + // capture sequential test start time + struct timespec start_time_t, end_time_t; + double elapsed_t; + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + // Spawn threads + GThread* xfer_thread_1 = g_thread_new("performance_objs_xfer_1", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array1, 0)); + GThread* xfer_thread_2 = g_thread_new("performance_objs_xfer_2", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array2, 0)); + GThread* xfer_thread_3 = g_thread_new("performance_objs_xfer_3", (GThreadFunc)put_chunks_from_mem, g_ptr_array_index(put_perf_objs_threads_array3, 0)); + + // Block and cleanup GThreads + g_thread_join(xfer_thread_1); + g_thread_join(xfer_thread_2); + g_thread_join(xfer_thread_3); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + printf(" Elapsed time[%f]\n", elapsed_t); + + ds3_master_object_list_response_free(bulk_response1); + ds3_master_object_list_response_free(chunks_response1); + put_chunks_threads_args_free(put_perf_objs_threads_array1); + clear_bucket(client1, bucket_name1); + free_client(client1); + + ds3_master_object_list_response_free(bulk_response2); + ds3_master_object_list_response_free(chunks_response2); + put_chunks_threads_args_free(put_perf_objs_threads_array2); + clear_bucket(client2, bucket_name2); + free_client(client2); + + ds3_master_object_list_response_free(bulk_response3); + ds3_master_object_list_response_free(chunks_response3); + put_chunks_threads_args_free(put_perf_objs_threads_array3); + clear_bucket(client3, bucket_name3); + free_client(client3); + + ds3_bulk_object_list_response_free(obj_list); +} + diff --git a/test/countdown_latch.c b/test/countdown_latch.c new file mode 100644 index 00000000..2859a51c --- /dev/null +++ b/test/countdown_latch.c @@ -0,0 +1,54 @@ + +#include +#include "countdown_latch.h" + +struct _countdown_latch { + guint initial_count; + volatile guint count; + GCond waiters; + GMutex lock; +}; + +countdown_latch* countdown_latch_new(guint count) { + countdown_latch* latch = g_new0(countdown_latch, 1); + latch->initial_count = count; + latch->count = count; + g_mutex_init(&(latch->lock)); + g_cond_init(&(latch->waiters)); + return latch; +} + +void countdownlatch_free(countdown_latch* latch) { + g_cond_clear(&(latch->waiters)); + g_mutex_clear(&(latch->lock)); + g_free(latch); +} + +void countdownlatch_wait(countdown_latch* latch) { + g_mutex_lock(&(latch->lock)); + while(latch->count > 0) { + g_cond_wait(&(latch->waiters), &(latch->lock)); + } + g_mutex_unlock(&(latch->lock)); +} + +void countdownlatch_count_down(countdown_latch* latch) { + g_mutex_lock(&(latch->lock)); + (latch->count)--; + if(latch->count == 0) { + g_cond_broadcast(&(latch->waiters)); + } + g_mutex_unlock(&(latch->lock)); +} + +void countdownlatch_count_down_wait(countdown_latch* latch) { + g_mutex_lock(&(latch->lock)); + (latch->count)--; + if(latch->count == 0) { + g_cond_broadcast(&(latch->waiters)); + } else { + g_cond_wait(&(latch->waiters), &(latch->lock)); + } + g_mutex_unlock(&(latch->lock)); +} + diff --git a/test/countdown_latch.h b/test/countdown_latch.h new file mode 100644 index 00000000..0761f934 --- /dev/null +++ b/test/countdown_latch.h @@ -0,0 +1,14 @@ +#ifndef COUNTDOWN_LATCH_H_ +#define COUNTDOWN_LATCH_H_ + +typedef struct _countdown_latch countdown_latch; + +countdown_latch* countdown_latch_new(guint count); +void countdown_latch_free(countdown_latch* latch); + +void countdown_latch_wait(countdown_latch* latch); +void countdown_latch_count_down(countdown_latch* latch); +void countdown_latch_count_down_and_wait(countdown_latch* latch); + +#endif /* COUNTDOWN_LATCH_H_ */ +#endif diff --git a/test/search_tests.cpp b/test/search_tests.cpp old mode 100755 new mode 100644 diff --git a/test/test.cpp b/test/test.cpp index c4761aa0..1e27dd64 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -19,10 +19,13 @@ #include #include #include "ds3.h" +#include "ds3_utils.h" #include "test.h" #include #include +#define BUFF_SIZE 64 + TempStorageIds ids; struct BoostTestFixture { @@ -43,7 +46,12 @@ struct BoostTestFixture { BOOST_GLOBAL_FIXTURE( BoostTestFixture ); void test_log(const char* message, void* user_data) { - fprintf(stderr, "Log Message: %s\n", message); + if (user_data) { + int client_num = *((int*)user_data); + fprintf(stderr, "ClientNum[%d], Log Message: %s\n", client_num, message); + } else { + fprintf(stderr, "Log Message: %s\n", message); + } } ds3_client* get_client_at_loglvl(ds3_log_lvl log_lvl) { @@ -397,7 +405,6 @@ ds3_error* get_bucket_data_policy_checksum_type(ds3_client* client, const char* // Get bucket data policy ID error = get_bucket_data_policy_id(client, bucket_name, data_policy_id); if (error != NULL) { - ds3_data_policy_response_free(data_policy_response); return error; } @@ -418,3 +425,194 @@ ds3_error* get_bucket_data_policy_checksum_type(ds3_client* client, const char* return NULL; } + + +double timespec_to_seconds(struct timespec* ts) { + return (double)ts->tv_sec + (double)ts->tv_nsec / 1000000000.0; +} + + +/** + * Find the size of a local file then create a ds3_bulk_object_list_response with the same name many times, append a number + */ +ds3_bulk_object_list_response* create_bulk_object_list_single_file(const char* file_name, size_t num_files) { + struct stat file_info; + memset(&file_info, 0, sizeof(struct stat)); + stat(file_name, &file_info); + + return create_bulk_object_list_from_prefix_with_size(file_name, num_files, file_info.st_size); + +} + +/** + * Create a ds3_bulk_object_list_response with the same name many times, append a number. + */ +ds3_bulk_object_list_response* create_bulk_object_list_from_prefix_with_size(const char* put_name_prefix, size_t num_files, size_t size) { + char put_filename[BUFF_SIZE]; + + ds3_bulk_object_list_response* obj_list = ds3_init_bulk_object_list(); + + GPtrArray* ds3_bulk_object_response_array = g_ptr_array_new(); + for (size_t index = 0; index < num_files; index++) { + g_snprintf(put_filename, BUFF_SIZE, "%s_%05lu", put_name_prefix, index); + + ds3_bulk_object_response* obj = g_new0(ds3_bulk_object_response, 1); + obj->name = ds3_str_init(put_filename); + obj->length = size; + g_ptr_array_add(ds3_bulk_object_response_array, obj); + } + + obj_list->objects = (ds3_bulk_object_response**)ds3_bulk_object_response_array->pdata; + obj_list->num_objects = ds3_bulk_object_response_array->len; + g_ptr_array_free(ds3_bulk_object_response_array, FALSE); + + return obj_list; +} + +GPtrArray* new_put_chunks_threads_args(ds3_client* client, + const char* src_obj_name, + const char* dest_bucket_name, + const ds3_master_object_list_response* bulk_response, + ds3_master_object_list_response* available_chunks, + const uint8_t num_threads, + const ds3_bool verbose) { + GPtrArray* put_chunks_args_array = g_ptr_array_new(); + + for (uint8_t thread_index = 0; thread_index < num_threads; thread_index++) { + put_chunks_args* put_objects_args = g_new0(put_chunks_args, 1); + put_objects_args->client = client; + put_objects_args->job_id = bulk_response->job_id->value; + put_objects_args->src_object_name = (char*)src_obj_name; + put_objects_args->bucket_name = (char*)dest_bucket_name; + put_objects_args->chunks_list = available_chunks; + put_objects_args->thread_num = thread_index; + put_objects_args->num_threads = num_threads; + put_objects_args->verbose = verbose; + g_ptr_array_add(put_chunks_args_array, put_objects_args); + } + + return put_chunks_args_array; +} + +void put_chunks_threads_args_free(GPtrArray* array) { + for (size_t index = 0; index < array->len; index++) { + g_free(g_ptr_array_index(array, index)); + } + + g_ptr_array_free(array, TRUE); +} + +/** + * To be passed as GThreadFunc arg to g_thread_new() along with a put_chunks_args struct + */ +void put_chunks_from_file(void* args) { + put_chunks_args* _args = (put_chunks_args*)args; + ds3_objects_response* chunk_object_list = NULL; + ds3_error* error = NULL; + + FILE* file; + + for (size_t chunk_index = 0; chunk_index < _args->chunks_list->num_objects; chunk_index++) { + chunk_object_list = _args->chunks_list->objects[chunk_index]; + for (size_t object_index = 0; object_index < chunk_object_list->num_objects; object_index++) { + + // Work distribution + if (object_index % _args->num_threads == _args->thread_num) { + ds3_bulk_object_response* object = chunk_object_list->objects[object_index]; + ds3_request* request = ds3_init_put_object_request(_args->bucket_name, object->name->value, object->length); + ds3_request_set_job(request, _args->job_id); + + if (_args->verbose) { + ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] BEGIN xfer File[%s] Chunk[%lu]", _args->thread_num, object->name->value, _args->chunks_list->num_objects); + } + file = fopen(_args->src_object_name, "r"); + if (object->offset != 0) { + fseek(file, object->offset, SEEK_SET); + } + error = ds3_put_object_request(_args->client, request, file, ds3_read_from_file); + fclose(file); + if (_args->verbose) { + ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] END xfer File[%s] Chunk[%lu]", _args->thread_num, object->name->value, _args->chunks_list->num_objects); + } + + ds3_request_free(request); + handle_error(error); + } + } + } +} + + +// Fill a 1MB buffer with a pattern of characters +static const char TEST_BUFFER_FILLER[] = "0123456789ABCDEF"; // 16 chars +static const uint8_t TEST_BUFFER_FILLER_SIZE = sizeof(TEST_BUFFER_FILLER)-1;// ignore NULL terminator + +void init_xfer_info(xfer_info* xfer_info_to_init) { + for(size_t offset = 0; offset < XFER_BUFFER_SIZE; offset += TEST_BUFFER_FILLER_SIZE) { + memcpy(xfer_info_to_init->data + offset, TEST_BUFFER_FILLER, TEST_BUFFER_FILLER_SIZE); + } + xfer_info_to_init->size = XFER_BUFFER_SIZE; + xfer_info_to_init->total_read = 0; +} + +size_t ds3_test_read_from_mem(void* buffer, size_t size, size_t nmemb, void* user_data) { + xfer_info* my_xfer_info = (struct xfer_info*) user_data; + size_t to_read = nmemb * size; + + if (my_xfer_info->size < to_read) { + to_read = my_xfer_info->size; + } + + if (my_xfer_info->args->verbose) { + ds3_log_message(my_xfer_info->args->client->log, DS3_DEBUG, "ThreadNum[%d] [%s:%s] total_read[%lu]", + my_xfer_info->args->thread_num, my_xfer_info->args->bucket_name, my_xfer_info->args->current_object->name->value, my_xfer_info->total_read); + } + memcpy(buffer, my_xfer_info->data, to_read); + my_xfer_info->total_read += to_read; + return to_read; +} + +/** + * To be passed as GThreadFunc arg to g_thread_new() along with a put_chunks_args struct + * Reads input from a memory buffer rather than a File* to eliminate any disk read bottleneck. + */ +void put_chunks_from_mem(void* args) { + put_chunks_args* _args = (put_chunks_args*)args; + ds3_objects_response* chunk_object_list = NULL; + ds3_error* error = NULL; + + struct xfer_info my_xfer_info; + init_xfer_info(&my_xfer_info); + my_xfer_info.args = _args; + + for (size_t chunk_index = 0; chunk_index < _args->chunks_list->num_objects; chunk_index++) { + chunk_object_list = _args->chunks_list->objects[chunk_index]; + for (size_t object_index = 0; object_index < chunk_object_list->num_objects; object_index++) { + + // Work distribution + if (object_index % _args->num_threads == _args->thread_num) { + + _args->current_object = chunk_object_list->objects[object_index]; + + ds3_request* request = ds3_init_put_object_request(_args->bucket_name, _args->current_object->name->value, _args->current_object->length); + ds3_request_set_job(request, _args->job_id); + + if (_args->verbose) { + ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] BEGIN xfer File[%s] Chunk[%lu]", _args->thread_num, _args->current_object->name->value, _args->chunks_list->num_objects); + } + + // Transfer each object for reading from the memory buffer + error = ds3_put_object_request(_args->client, request, &my_xfer_info, ds3_test_read_from_mem); + if (_args->verbose) { + ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] END xfer File[%s] Chunk[%lu]", _args->thread_num, _args->current_object->name->value, _args->chunks_list->num_objects); + } + + ds3_request_free(request); + handle_error(error); + } + } + + my_xfer_info.total_read = 0; + } +} + diff --git a/test/test.h b/test/test.h index 55516fbe..580d5640 100644 --- a/test/test.h +++ b/test/test.h @@ -13,14 +13,16 @@ * **************************************************************************** */ -#include "ds3.h" #ifndef __DS3_TEST__ #define __DS3_TEST__ -static const char TEST_DP_NAME[] = "test_data_policy"; -static const char TEST_SD_NAME[] = "test_storage_domain"; -static const char TEST_PP_NAME[] = "test_pool_partition"; +#include +#include "ds3.h" + +static const char TEST_DP_NAME[] = "c_sdk_test_data_policy"; +static const char TEST_SD_NAME[] = "c_sdk_test_storage_domain"; +static const char TEST_PP_NAME[] = "c_sdk_test_pool_partition"; struct TempStorageIds { ds3_str* data_policy_id; @@ -32,6 +34,7 @@ struct TempStorageIds { extern TempStorageIds ids; ds3_client* get_client(); + ds3_client* get_client_at_loglvl(ds3_log_lvl lvl); void configure_for_tests(); @@ -70,4 +73,70 @@ ds3_error* create_bucket_with_data_policy(const ds3_client* client, const char* // caller must free data_policy_id ds3_error* get_bucket_data_policy_id(const ds3_client* client, const char* bucket_name, ds3_str* data_policy_id); ds3_error* get_bucket_data_policy_checksum_type(const ds3_client* client, const char* bucket_name, ds3_checksum_type* checksum_type); + +/** + * Find the size of a local file then create a ds3_bulk_object_list_response with the same name many times, append a number + */ +ds3_bulk_object_list_response* create_bulk_object_list_single_file(const char* file_name, size_t num_files); +/** + * Create a ds3_bulk_object_list_response with the same name and size many times, append a number. + */ +ds3_bulk_object_list_response* create_bulk_object_list_from_prefix_with_size(const char* put_name_prefix, size_t num_files, size_t size); + +/** + * g_thread_new only takes a single parameter to pass to the spawned thread, so its necessary + * to wrap multiple parameters in a struct to be passed along. + */ +typedef struct { + uint8_t num_threads; + uint8_t thread_num; + ds3_client* client; + char* job_id; + char* src_object_name; + char* bucket_name; + ds3_master_object_list_response* chunks_list; + ds3_bool verbose; + ds3_bulk_object_response* current_object; +} put_chunks_args; + +double timespec_to_seconds(struct timespec* ts); +void test_log(const char* message, void* user_data); + +/* + * Returned put_chunks_threads_args* must be freed with put_chunks_threads_args_free(); + */ +GPtrArray* new_put_chunks_threads_args(ds3_client* client, + const char* src_obj_name, + const char* dest_bucket_name, + const ds3_master_object_list_response* bulk_response, + ds3_master_object_list_response* available_chunks, + const uint8_t num_threads, + const ds3_bool verbose); +void put_chunks_threads_args_free(GPtrArray* array); + +/** + * To be passed as GThreadFunc arg to g_thread_new() along with a put_chunks_args struct + */ +void put_chunks_from_file(void* args); +/** + * To be passed as GThreadFunc arg to g_thread_new() along with a put_chunks_args struct + * Reads input from a memory buffer rather than a File* + */ +void put_chunks_from_mem(void* args); + +/* + * Helper functions for a performance test to read data from memory rather than a File + */ +#define XFER_BUFFER_SIZE 16 * 1024 // 16kb + +struct xfer_info { + char data[XFER_BUFFER_SIZE]; + size_t size; + size_t total_read; + put_chunks_args* args; +}; + +void init_xfer_info(const xfer_info* xfer_info_to_init, uint16_t size_in_mb); +size_t ds3_test_read_from_mem(void* buffer, size_t size, size_t nmemb, void* user_data); + #endif