From 4e10182ab4afb1116ef68d7f5e1fe5f7c9494747 Mon Sep 17 00:00:00 2001 From: DenverM80 Date: Thu, 3 Aug 2017 11:17:52 -0600 Subject: [PATCH] Fix issue in connection pool handle queue behavior; add tests around concurrent transfers of files of various sizes --- .gitignore | 5 +- src/ds3.c | 2 + src/ds3_connection.c | 59 ++++++++++++++++------- src/ds3_connection.h | 14 ++---- src/ds3_net.c | 2 - src/ds3_net.h | 1 - test/CMakeLists.txt | 1 + test/connection_tests.cpp | 28 ++++++----- test/put_directory.cpp | 98 +++++++++++++++++++++++++++++++++++++++ test/test.cpp | 48 +++++++++++++++++-- test/test.h | 2 + 11 files changed, 210 insertions(+), 50 deletions(-) create mode 100644 test/put_directory.cpp diff --git a/.gitignore b/.gitignore index c5623f58..4944f8c2 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,7 @@ Debug/ *.ilk *.pdb ds3.dll -/win32/output/bin \ No newline at end of file +/win32/output/bin +.idea/ +cmake-build-debug/ +*.dylib diff --git a/src/ds3.c b/src/ds3.c index 8f93b1fd..caff29a3 100644 --- a/src/ds3.c +++ b/src/ds3.c @@ -29,6 +29,8 @@ #include #include "ds3.h" +#include "ds3_net.h" +#include "ds3_connection.h" #include "ds3_request.h" #include "ds3_string_multimap.h" #include "ds3_string_multimap_impl.h" diff --git a/src/ds3_connection.c b/src/ds3_connection.c index 42a7be61..4f6f57e2 100644 --- a/src/ds3_connection.c +++ b/src/ds3_connection.c @@ -21,18 +21,35 @@ #include #include #include -#include "ds3_net.h" +#include "ds3_connection.h" + +//-- Opaque struct +struct _ds3_connection_pool{ + ds3_connection** connections; + uint16_t num_connections; // the number of connections created + ds3_connection** connection_queue; + uint16_t max_connections; // max number of possible connections, which the connections and queue arrays will be initialized to + int queue_head; + int queue_tail; + ds3_mutex mutex; + ds3_condition available_connection_notifier; + uint16_t ref_count; +}; ds3_connection_pool* ds3_connection_pool_init(void) { - return ds3_connection_pool_init_with_size(CONNECTION_POOL_SIZE); + return ds3_connection_pool_init_with_size(DEFAULT_CONNECTION_POOL_SIZE); } ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size) { ds3_connection_pool* pool = g_new0(ds3_connection_pool, 1); + pool->connections = g_new0(ds3_connection*, pool_size); - pool->num_connections = pool_size; + pool->connection_queue = g_new0(ds3_connection*, pool_size); + + pool->max_connections = pool_size; + g_mutex_init(&pool->mutex); - g_cond_init(&pool->available_connections); + g_cond_init(&pool->available_connection_notifier); pool->ref_count = 1; return pool; } @@ -55,17 +72,19 @@ void ds3_connection_pool_clear(ds3_connection_pool* pool, ds3_bool already_locke } g_free(pool->connections); + g_free(pool->connection_queue); g_mutex_unlock(&pool->mutex); g_mutex_clear(&pool->mutex); // an attempt to clear a locked mutex is undefined - g_cond_clear(&pool->available_connections); + g_cond_clear(&pool->available_connection_notifier); } -static int _pool_inc(int index, uint16_t num_connections) { - return (index+1) % num_connections; +static int _queue_inc(int index, uint16_t size) { + return (index+1) % size; } -static int _pool_full(ds3_connection_pool* pool) { - return (_pool_inc(pool->head, pool->num_connections) == pool->tail); +static int _queue_is_empty(ds3_connection_pool* pool) { + int queue_head = pool->queue_head; + return pool->queue_tail == queue_head && pool->connection_queue[queue_head] == NULL; } @@ -73,18 +92,20 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { ds3_connection* connection = NULL; g_mutex_lock(&pool->mutex); - while (_pool_full(pool)) { - g_cond_wait(&pool->available_connections, &pool->mutex); + while (_queue_is_empty(pool) && pool->num_connections >= pool->max_connections) { + g_cond_wait(&pool->available_connection_notifier, &pool->mutex); } - if (pool->connections[pool->head] == NULL) { + if (_queue_is_empty(pool)) { connection = curl_easy_init(); - pool->connections[pool->head] = connection; + pool->connections[pool->num_connections] = connection; + pool->num_connections++; } else { - connection = pool->connections[pool->head]; + connection = pool->connection_queue[pool->queue_tail]; + pool->connection_queue[pool->queue_tail] = NULL; + pool->queue_tail = _queue_inc(pool->queue_tail, pool->max_connections); } - pool->head = _pool_inc(pool->head, pool->num_connections); g_mutex_unlock(&pool->mutex); @@ -93,11 +114,15 @@ ds3_connection* ds3_connection_acquire(ds3_connection_pool* pool) { void ds3_connection_release(ds3_connection_pool* pool, ds3_connection* connection) { g_mutex_lock(&pool->mutex); + curl_easy_reset(connection); - pool->tail = _pool_inc(pool->tail, pool->num_connections); + pool->connection_queue[pool->queue_head] = connection; + + pool->queue_head = _queue_inc(pool->queue_head, pool->max_connections); + + g_cond_signal(&pool->available_connection_notifier); g_mutex_unlock(&pool->mutex); - g_cond_signal(&pool->available_connections); } void ds3_connection_pool_inc_ref(ds3_connection_pool* pool) { diff --git a/src/ds3_connection.h b/src/ds3_connection.h index 5374bddd..154d96fa 100644 --- a/src/ds3_connection.h +++ b/src/ds3_connection.h @@ -26,24 +26,16 @@ extern "C" { #include #include +#include "ds3.h" -#define CONNECTION_POOL_SIZE 10 +#define DEFAULT_CONNECTION_POOL_SIZE 10 typedef GMutex ds3_mutex; typedef GCond ds3_condition; typedef CURL ds3_connection; -//-- Opaque struct -struct _ds3_connection_pool{ - ds3_connection** connections; - uint16_t num_connections; - int head; - int tail; - ds3_mutex mutex; - ds3_condition available_connections; - uint16_t ref_count; -}; +typedef struct _ds3_connection_pool ds3_connection_pool; ds3_connection_pool* ds3_connection_pool_init(void); ds3_connection_pool* ds3_connection_pool_init_with_size(uint16_t pool_size); diff --git a/src/ds3_net.c b/src/ds3_net.c index 8d64b263..e06df88d 100644 --- a/src/ds3_net.c +++ b/src/ds3_net.c @@ -20,10 +20,8 @@ #include #include "ds3_request.h" -#include "ds3.h" #include "ds3_net.h" #include "ds3_utils.h" -#include "ds3_string_multimap.h" #include "ds3_string_multimap_impl.h" #include "ds3_connection.h" diff --git a/src/ds3_net.h b/src/ds3_net.h index 5e9ad115..1bca07e4 100644 --- a/src/ds3_net.h +++ b/src/ds3_net.h @@ -26,7 +26,6 @@ extern "C" { #include "ds3.h" #include "ds3_string_multimap.h" -#include "ds3_connection.h" char* escape_url(const char* url); char* escape_url_extended(const char* url, const char** delimiters, uint32_t num_delimiters); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 7d303973..33cedd9c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -77,6 +77,7 @@ add_executable(ds3_c_tests search_tests.cpp service_tests.cpp connection_tests.cpp + put_directory.cpp test.cpp) add_test(regression_tests ds3_c_tests) diff --git a/test/connection_tests.cpp b/test/connection_tests.cpp index fe8ccb3b..1bf7d357 100644 --- a/test/connection_tests.cpp +++ b/test/connection_tests.cpp @@ -25,7 +25,7 @@ BOOST_AUTO_TEST_CASE( ds3_client_create_free ) { printf("-----Testing ds3_client create and free-------\n"); ds3_client* client = get_client(); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); + BOOST_CHECK(client->connection_pool != NULL); ds3_creds_free(client->creds); ds3_client_free(client); } @@ -34,10 +34,8 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) { printf("-----Testing ds3_copy_client-------\n"); ds3_client* client = get_client(); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 1); ds3_client* client_copy = ds3_copy_client(client); - BOOST_CHECK_EQUAL(client->connection_pool->ref_count, 2); BOOST_CHECK_EQUAL(client->endpoint->value, client_copy->endpoint->value); if (client->proxy) { BOOST_CHECK_EQUAL(client->proxy->value, client_copy->proxy->value); @@ -53,7 +51,7 @@ BOOST_AUTO_TEST_CASE( ds3_connection_pool_copy ) { ds3_creds_free(client->creds); ds3_client_free(client); - BOOST_CHECK_EQUAL(client_copy->connection_pool->ref_count, 1); + BOOST_CHECK(client_copy->connection_pool != NULL); ds3_creds_free(client_copy->creds); ds3_client_free(client_copy); } @@ -63,10 +61,10 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) { ds3_client* client = get_client(); ds3_connection_pool* cp = client->connection_pool; - BOOST_CHECK_EQUAL(cp->ref_count, 1); + BOOST_CHECK(cp != NULL); ds3_client* client_copy = ds3_copy_client(client); - BOOST_CHECK_EQUAL(cp->ref_count, 2); + BOOST_CHECK_EQUAL(cp, client_copy->connection_pool); const char* client_bucket_name = "create_bucket_from_original_client"; ds3_error* error = create_bucket_with_data_policy(client, client_bucket_name, ids.data_policy_id->value); @@ -74,7 +72,7 @@ BOOST_AUTO_TEST_CASE( create_bucket_with_copied_client ) { clear_bucket(client, client_bucket_name); ds3_creds_free(client->creds); ds3_client_free(client); - BOOST_CHECK_EQUAL(cp->ref_count, 1); + BOOST_CHECK(client_copy->connection_pool != NULL); const char* copied_client_bucket_name = "create_bucket_from_copied_client"; error = create_bucket_with_data_policy(client_copy, copied_client_bucket_name, ids.data_policy_id->value); @@ -162,7 +160,7 @@ BOOST_AUTO_TEST_CASE( bulk_put_200_very_small_files_multithreaded ) { ds3_master_object_list_response* chunk_response = ensure_available_chunks(client, bulk_response->job_id); - GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, bucket_name, bulk_response, chunk_response, num_threads, False); + GPtrArray* put_objs_args_array = new_put_chunks_threads_args(client, object_name, NULL, bucket_name, bulk_response, chunk_response, num_threads, False); GThread* chunks_thread_0 = g_thread_new("objects_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 0)); GThread* chunks_thread_1 = g_thread_new("objects_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_objs_args_array, 1)); @@ -204,7 +202,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) { ds3_master_object_list_response* sequential_chunks = ensure_available_chunks(client, mol->job_id); - GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, sequential_bucket_name, mol, sequential_chunks, 1, False); + GPtrArray* put_sequential_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, sequential_bucket_name, mol, sequential_chunks, 1, False); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -238,7 +236,7 @@ BOOST_AUTO_TEST_CASE( sequential_vs_parallel_xfer ) { ds3_master_object_list_response* parallel_chunks = ensure_available_chunks(client, mol->job_id); - GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, parallel_bucket_name, mol, parallel_chunks, 4, False); + GPtrArray* put_parallel_objs_threads_array = new_put_chunks_threads_args(client, obj_name, NULL, parallel_bucket_name, mol, parallel_chunks, 4, False); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -315,8 +313,8 @@ BOOST_AUTO_TEST_CASE( multiple_client_xfer ) { ds3_master_object_list_response* client1_chunks = ensure_available_chunks(client1, mol1->job_id); ds3_master_object_list_response* client2_chunks = ensure_available_chunks(client2, mol2->job_id); - GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, client1_bucket_name, mol1, client1_chunks, 1, True); - GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, client2_bucket_name, mol2, client2_chunks, 1, True); + GPtrArray* client1_put_objs_args = new_put_chunks_threads_args(client1, obj_name, NULL, client1_bucket_name, mol1, client1_chunks, 1, True); + GPtrArray* client2_put_objs_args = new_put_chunks_threads_args(client2, obj_name, NULL, client2_bucket_name, mol2, client2_chunks, 1, True); // capture sequential test start time clock_gettime(CLOCK_MONOTONIC, &start_time_t); @@ -407,9 +405,9 @@ BOOST_AUTO_TEST_CASE( performance_bulk_put ) { ds3_master_object_list_response* chunks_response2 = ensure_available_chunks(client2, bulk_response2->job_id); ds3_master_object_list_response* chunks_response3 = ensure_available_chunks(client3, bulk_response3->job_id); - GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, bucket_name1, bulk_response1, chunks_response1, 1, True); - GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, bucket_name2, bulk_response2, chunks_response2, 1, True); - GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, bucket_name3, bulk_response3, chunks_response3, 1, True); + GPtrArray* put_perf_objs_threads_array1 = new_put_chunks_threads_args(client1, obj_prefix, NULL, bucket_name1, bulk_response1, chunks_response1, 1, True); + GPtrArray* put_perf_objs_threads_array2 = new_put_chunks_threads_args(client2, obj_prefix, NULL, bucket_name2, bulk_response2, chunks_response2, 1, True); + GPtrArray* put_perf_objs_threads_array3 = new_put_chunks_threads_args(client3, obj_prefix, NULL, bucket_name3, bulk_response3, chunks_response3, 1, True); // capture sequential test start time struct timespec start_time_t, end_time_t; diff --git a/test/put_directory.cpp b/test/put_directory.cpp new file mode 100644 index 00000000..b14e3227 --- /dev/null +++ b/test/put_directory.cpp @@ -0,0 +1,98 @@ +/* + * ****************************************************************************** + * Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved. + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use + * this file except in compliance with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * **************************************************************************** + */ + +#include +#include +#include +#include +#include +#include +#include +#include "ds3.h" +#include "ds3_net.h" +#include "ds3_utils.h" +#include "test.h" + +BOOST_AUTO_TEST_CASE( put_directory_4_threads) { + printf("-----Testing PUT all objects in a directory with 4 threads-------\n"); + + const char* dir_path = getenv("DS3_TEST_DIRECTORY"); + if (dir_path == NULL) { + printf("ENV[DS3_TEST_DIRECTORY] unset - Skipping put_directory_4_threads test.\n"); + return; + } + + const char* bucket_name = "test_bulk_put_directory"; + printf(" Putting all files in [%s] to bucket [%s]\n", dir_path, bucket_name); + + ds3_client* client = get_client_at_loglvl(DS3_DEBUG); + int client_thread=1; + ds3_client_register_logging(client, DS3_DEBUG, test_log, (void*)&client_thread); // Use DEBUG level logging + + ds3_error* error = create_bucket_with_data_policy(client, bucket_name, ids.data_policy_id->value); + + char* objects_list[100]; + uint64_t num_objs = 0; + GDir* dir_info = g_dir_open(dir_path, 0, NULL); + for (char* current_obj = (char*)g_dir_read_name(dir_info); current_obj != NULL; current_obj = (char*)g_dir_read_name(dir_info)) { + objects_list[num_objs++] = current_obj; + printf(" obj[%" PRIu64 "][%s]\n", num_objs, objects_list[num_objs-1]); + } + + ds3_bulk_object_list_response* bulk_object_list = ds3_convert_file_list_with_basepath((const char**)objects_list, num_objs, dir_path); + + ds3_request* request = ds3_init_put_bulk_job_spectra_s3_request(bucket_name, bulk_object_list); + ds3_master_object_list_response* mol; + error = ds3_put_bulk_job_spectra_s3_request(client, request, &mol); + ds3_request_free(request); + ds3_bulk_object_list_response_free(bulk_object_list); + handle_error(error); + + // Allocate cache + ds3_master_object_list_response* chunks_list = ensure_available_chunks(client, mol->job_id); + + // Use helper functions from test.cpp + const uint8_t num_threads = 4; + GPtrArray* put_dir_args = new_put_chunks_threads_args(client, NULL, dir_path, bucket_name, mol, chunks_list, num_threads, True); // Last param indicates verbose logging in the spawned thread + + + // capture test start time + struct timespec start_time_t, end_time_t; + double elapsed_t; + clock_gettime(CLOCK_MONOTONIC, &start_time_t); + + GThread* put_dir_xfer_thread_0 = g_thread_new("put_dir_xfer_thread_0", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 0)); + GThread* put_dir_xfer_thread_1 = g_thread_new("put_dir_xfer_thread_1", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 1)); + GThread* put_dir_xfer_thread_2 = g_thread_new("put_dir_xfer_thread_2", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 2)); + GThread* put_dir_xfer_thread_3 = g_thread_new("put_dir_xfer_thread_3", (GThreadFunc)put_chunks_from_file, g_ptr_array_index(put_dir_args, 3)); + + // Block and cleanup GThread(s) + g_thread_join(put_dir_xfer_thread_0); + g_thread_join(put_dir_xfer_thread_1); + g_thread_join(put_dir_xfer_thread_2); + g_thread_join(put_dir_xfer_thread_3); + + // find elapsed CPU and real time + clock_gettime(CLOCK_MONOTONIC, &end_time_t); + elapsed_t = timespec_to_seconds(&end_time_t) - timespec_to_seconds(&start_time_t); + ds3_log_message(client->log, DS3_INFO, " Elapsed time[%f]", elapsed_t); + + g_dir_close(dir_info); + ds3_master_object_list_response_free(chunks_list); + ds3_master_object_list_response_free(mol); + put_chunks_threads_args_free(put_dir_args); + clear_bucket(client, bucket_name); + free_client(client); +} diff --git a/test/test.cpp b/test/test.cpp index 1e27dd64..b6f67cf2 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -45,12 +45,34 @@ struct BoostTestFixture { BOOST_GLOBAL_FIXTURE( BoostTestFixture ); +static void _log_timestamp(char* string_buff, long buff_size) +{ + time_t ltime; + struct tm result; + struct timeval tv; + char usec_buff[8]; + int millisec; + + gettimeofday(&tv, NULL); + millisec = lrint(tv.tv_usec/1000.0); // Round to nearest millisec + + ltime = time(NULL); + localtime_r(<ime, &result); + + strftime(string_buff, buff_size, "%Y:%m:%dT%H:%M:%S", &result); + strcat(string_buff, "."); + sprintf(usec_buff,"%03d", millisec); + strcat(string_buff, usec_buff); +} + void test_log(const char* message, void* user_data) { + char timebuffer[32]; + _log_timestamp(timebuffer, 32); if (user_data) { int client_num = *((int*)user_data); - fprintf(stderr, "ClientNum[%d], Log Message: %s\n", client_num, message); + fprintf(stderr, "%s Client[%d] %s\n", timebuffer, client_num, message); } else { - fprintf(stderr, "Log Message: %s\n", message); + fprintf(stderr, "%s %s\n", timebuffer, message); } } @@ -469,13 +491,22 @@ ds3_bulk_object_list_response* create_bulk_object_list_from_prefix_with_size(con return obj_list; } +/* + * Provide *EITHER* src_obj_name *OR* src_dir, not both + */ GPtrArray* new_put_chunks_threads_args(ds3_client* client, const char* src_obj_name, + const char* src_dir, const char* dest_bucket_name, const ds3_master_object_list_response* bulk_response, ds3_master_object_list_response* available_chunks, const uint8_t num_threads, const ds3_bool verbose) { + if (src_obj_name && src_dir) { + printf("Error: provide new_put_chunks_threads_with_args() with either src_object_name or src_dir, not both\n"); + return NULL; + } + GPtrArray* put_chunks_args_array = g_ptr_array_new(); for (uint8_t thread_index = 0; thread_index < num_threads; thread_index++) { @@ -483,6 +514,7 @@ GPtrArray* new_put_chunks_threads_args(ds3_client* client, put_objects_args->client = client; put_objects_args->job_id = bulk_response->job_id->value; put_objects_args->src_object_name = (char*)src_obj_name; + put_objects_args->src_dir = (char*)src_dir; put_objects_args->bucket_name = (char*)dest_bucket_name; put_objects_args->chunks_list = available_chunks; put_objects_args->thread_num = thread_index; @@ -525,7 +557,17 @@ void put_chunks_from_file(void* args) { if (_args->verbose) { ds3_log_message(_args->client->log, DS3_INFO, " GlibThread[%d] BEGIN xfer File[%s] Chunk[%lu]", _args->thread_num, object->name->value, _args->chunks_list->num_objects); } - file = fopen(_args->src_object_name, "r"); + if (_args->src_object_name) { + file = fopen(_args->src_object_name, "r"); + } else { + char* file_with_path = g_strconcat(_args->src_dir, object->name->value, (char*)NULL); + printf(" opening file[%s]\n", file_with_path); + file = fopen(file_with_path, "r"); + if (file == NULL) { + printf(" ***Unable to open file[%s]!!!\n", file_with_path); + } + g_free(file_with_path); + } if (object->offset != 0) { fseek(file, object->offset, SEEK_SET); } diff --git a/test/test.h b/test/test.h index 580d5640..12930664 100644 --- a/test/test.h +++ b/test/test.h @@ -93,6 +93,7 @@ typedef struct { ds3_client* client; char* job_id; char* src_object_name; + char* src_dir; char* bucket_name; ds3_master_object_list_response* chunks_list; ds3_bool verbose; @@ -107,6 +108,7 @@ void test_log(const char* message, void* user_data); */ GPtrArray* new_put_chunks_threads_args(ds3_client* client, const char* src_obj_name, + const char* src_dir, const char* dest_bucket_name, const ds3_master_object_list_response* bulk_response, ds3_master_object_list_response* available_chunks,