From 2371e4428e1e038759d3b9413f0e5d442f94fbd1 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 18 May 2017 14:21:16 -0400 Subject: [PATCH 1/4] MINIFI-320: Move instantiation of OpenSSL to a singleton that's lazily created. Added test. In order to make the test work as expected I had to update thread pool to use a non blocking queue. --- libminifi/include/io/tls/TLSSocket.h | 31 ++++++++++++++++ libminifi/include/utils/ThreadPool.h | 46 +++++++++++++----------- libminifi/src/io/tls/TLSSocket.cpp | 41 ++++++++++++--------- libminifi/test/unit/SocketTests.cpp | 53 ++++++++++++++++++++++++---- 4 files changed, 127 insertions(+), 44 deletions(-) diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h index c14170b7ad..11850450e7 100644 --- a/libminifi/include/io/tls/TLSSocket.h +++ b/libminifi/include/io/tls/TLSSocket.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "../ClientSocket.h" @@ -37,6 +38,36 @@ namespace io { #define TLS_ERROR_KEY_ERROR 4 #define TLS_ERROR_CERT_ERROR 5 +class OpenSSLInitializer +{ + public: + static OpenSSLInitializer *getInstance() { + OpenSSLInitializer* atomic_context = context_instance.load( + std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_acquire); + if (atomic_context == nullptr) { + std::lock_guard lock(context_mutex); + atomic_context = context_instance.load(std::memory_order_relaxed); + if (atomic_context == nullptr) { + atomic_context = new OpenSSLInitializer(); + std::atomic_thread_fence(std::memory_order_release); + context_instance.store(atomic_context, std::memory_order_relaxed); + } + } + return atomic_context; + } + + OpenSSLInitializer() + { + SSL_library_init(); + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + } + private: + static std::atomic context_instance; + static std::mutex context_mutex; +}; + class TLSContext: public SocketContext { public: diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index e3c15d87a9..00b888e0ff 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -17,6 +17,7 @@ #ifndef LIBMINIFI_INCLUDE_THREAD_POOL_H #define LIBMINIFI_INCLUDE_THREAD_POOL_H +#include #include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include "concurrentqueue.h" namespace org { namespace apache { namespace nifi { @@ -43,6 +45,9 @@ class Worker { promise = std::make_shared>(); } + explicit Worker() { + } + /** * Move constructor for worker tasks */ @@ -160,6 +165,7 @@ class ThreadPool { thread_queue_ = std::move(other.thread_queue_); worker_queue_ = std::move(other.worker_queue_); + if (!running_) { start(); } @@ -189,7 +195,8 @@ class ThreadPool { // atomic running boolean std::atomic running_; // worker queue of worker objects - std::queue> worker_queue_; + //std::queue> worker_queue_; + moodycamel::ConcurrentQueue> worker_queue_; // notification for available work std::condition_variable tasks_available_; // manager mutex @@ -206,17 +213,15 @@ class ThreadPool { * Runs worker tasks */ void run_tasks(); -} -; +}; template std::future ThreadPool::execute(Worker &&task) { - std::unique_lock lock(worker_queue_mutex_); - bool wasEmpty = worker_queue_.empty(); + bool wasEmpty = worker_queue_.size_approx() == 0; std::future future = task.getPromise()->get_future(); - worker_queue_.push(std::move(task)); - if (wasEmpty) { + worker_queue_.enqueue(std::move(task)); + if (wasEmpty && running_) { tasks_available_.notify_one(); } return future; @@ -241,20 +246,15 @@ void ThreadPool::startWorkers() { } template void ThreadPool::run_tasks() { + auto waitperiod = std::chrono::milliseconds(1) * 100; while (running_.load()) { - std::unique_lock lock(worker_queue_mutex_); - if (worker_queue_.empty()) { - - tasks_available_.wait(lock); - } - - if (!running_.load()) - break; - if (worker_queue_.empty()) + Worker task; + if (!worker_queue_.try_dequeue(task)) { + std::unique_lock lock(worker_queue_mutex_); + tasks_available_.wait_for(lock, waitperiod); continue; - Worker task = std::move(worker_queue_.front()); - worker_queue_.pop(); + } task.run(); } current_workers_--; @@ -266,7 +266,9 @@ void ThreadPool::start() { if (!running_) { running_ = true; manager_thread_ = std::thread(&ThreadPool::startWorkers, this); - + if (worker_queue_.size_approx() > 0) { + tasks_available_.notify_all(); + } } } @@ -285,8 +287,10 @@ void ThreadPool::shutdown() { std::unique_lock lock(worker_queue_mutex_); thread_queue_.clear(); current_workers_ = 0; - while (!worker_queue_.empty()) - worker_queue_.pop(); + while (worker_queue_.size_approx() > 0) { + Worker task; + worker_queue_.try_dequeue(task); + } } } } diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index f938e0a34e..4c3bf51e72 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -32,8 +32,12 @@ namespace nifi { namespace minifi { namespace io { +std::atomic OpenSSLInitializer::context_instance; +std::mutex OpenSSLInitializer::context_mutex; + TLSContext::TLSContext(const std::shared_ptr &configure) - : SocketContext(configure), error_value(0), + : SocketContext(configure), + error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configure_(configure) { @@ -45,20 +49,20 @@ int16_t TLSContext::initialize() { if (ctx != 0) { return error_value; } + + if (nullptr == OpenSSLInitializer::getInstance()) { + return error_value; + } + std::string clientAuthStr; bool needClientCert = true; - if (!(configure_->get(Configure::nifi_security_need_ClientAuth, - clientAuthStr) + if (!(configure_->get(Configure::nifi_security_need_ClientAuth, clientAuthStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool( clientAuthStr, needClientCert))) { needClientCert = true; } - SSL_library_init(); const SSL_METHOD *method; - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); method = TLSv1_2_client_method(); ctx = SSL_CTX_new(method); if (ctx == NULL) { @@ -74,9 +78,9 @@ int16_t TLSContext::initialize() { std::string caCertificate; if (!(configure_->get(Configure::nifi_security_client_certificate, - certificate) + certificate) && configure_->get(Configure::nifi_security_client_private_key, - privatekey))) { + privatekey))) { logger_->log_error( "Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno)); @@ -92,10 +96,11 @@ int16_t TLSContext::initialize() { return error_value; } if (configure_->get(Configure::nifi_security_client_pass_phrase, - passphrase)) { + passphrase)) { // if the private key has passphase SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - SSL_CTX_set_default_passwd_cb_userdata(ctx, static_cast(configure_.get())); + SSL_CTX_set_default_passwd_cb_userdata( + ctx, static_cast(configure_.get())); } int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), @@ -117,7 +122,7 @@ int16_t TLSContext::initialize() { } // load CA certificates if (configure_->get(Configure::nifi_security_client_ca_certificate, - caCertificate)) { + caCertificate)) { retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); if (retp == 0) { logger_->log_error("Can not load CA certificate, Exiting, error : %s", @@ -143,23 +148,25 @@ TLSSocket::~TLSSocket() { * @param port connecting port * @param listeners number of listeners in the queue */ -TLSSocket::TLSSocket(const std::shared_ptr &context, const std::string &hostname, const uint16_t port, +TLSSocket::TLSSocket(const std::shared_ptr &context, + const std::string &hostname, const uint16_t port, const uint16_t listeners) : Socket(context, hostname, port, listeners), ssl(0) { - context_ = context; + context_ = context; } -TLSSocket::TLSSocket(const std::shared_ptr &context, const std::string &hostname, const uint16_t port) +TLSSocket::TLSSocket(const std::shared_ptr &context, + const std::string &hostname, const uint16_t port) : Socket(context, hostname, port, 0), ssl(0) { - context_ = context; + context_ = context; } TLSSocket::TLSSocket(const TLSSocket &&d) : Socket(std::move(d)), ssl(0) { - context_ = d.context_; + context_ = d.context_; } int16_t TLSSocket::initialize() { diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 5a53b4778d..e55f4b7c57 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -17,11 +17,16 @@ */ #define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file - -#include "../TestBase.h" -#include +#include +#include +#include #include +#include +#include +#include "../TestBase.h" #include "io/ClientSocket.h" +#include "io/tls/TLSSocket.h" +#include "utils/ThreadPool.h" TEST_CASE("TestSocket", "[TestSocket1]") { org::apache::nifi::minifi::io::Socket socket( @@ -53,7 +58,6 @@ TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { std::vector buffer; buffer.push_back('a'); - std::shared_ptr socket_context = std::make_shared( std::make_shared()); @@ -89,7 +93,6 @@ TEST_CASE("TestGetHostName", "[TestSocket4]") { TEST_CASE("TestWriteEndian64", "[TestSocket4]") { std::vector buffer; buffer.push_back('a'); - std::shared_ptr socket_context = std::make_shared( std::make_shared()); @@ -127,7 +130,6 @@ TEST_CASE("TestWriteEndian32", "[TestSocket5]") { org::apache::nifi::minifi::io::Socket server(socket_context, "localhost", 9183, 1); - REQUIRE(-1 != server.initialize()); org::apache::nifi::minifi::io::Socket client(socket_context, "localhost", @@ -190,3 +192,42 @@ TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") { server.closeStream(); } + +std::atomic counter; +std::mt19937_64 seed { std::random_device { }() }; +bool createSocket() { + int mine = counter++; + std::shared_ptr configuration = std::make_shared< + minifi::Configure>(); + + std::uniform_int_distribution<> distribution { 10, 100 }; + std::this_thread::sleep_for(std::chrono::milliseconds { distribution(seed) }); + + for (int i = 0; i < 50; i++) { + std::shared_ptr socketA = + std::make_shared( + configuration); + socketA->initialize(); + } + + return true; +} + +TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { + utils::ThreadPool pool(20, true); + + std::vector> futures; + for (int i = 0; i < 20; i++) { + std::function f_ex = createSocket; + utils::Worker functor(f_ex); + std::future fut = pool.execute(std::move(functor)); + futures.push_back(std::move(fut)); + } + pool.start(); + for (auto &&future : futures) { + future.wait(); + } + + REQUIRE(20 == counter.load()); +} + From 443a8d6c5e216134807ee81f2c1c279315f04e21 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Thu, 18 May 2017 21:07:28 -0400 Subject: [PATCH 2/4] MINIFI-320: Address review comments --- libminifi/include/utils/ThreadPool.h | 1 - libminifi/test/unit/SocketTests.cpp | 6 +++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 00b888e0ff..9c51c0f5f8 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -195,7 +195,6 @@ class ThreadPool { // atomic running boolean std::atomic running_; // worker queue of worker objects - //std::queue> worker_queue_; moodycamel::ConcurrentQueue> worker_queue_; // notification for available work std::condition_variable tasks_available_; diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index e55f4b7c57..ae45553452 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -212,7 +212,11 @@ bool createSocket() { return true; } - +/** + * MINIFI-320 was created to address reallocations within TLSContext + * This test will create 20 threads that attempt to create contexts + * to ensure we no longer see the segfaults. + */ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { utils::ThreadPool pool(20, true); From 1d75863a152bacc1ba01d19bb8fe0bcc6ac194e0 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 19 May 2017 08:27:15 -0400 Subject: [PATCH 3/4] MINIFI-320: Address possible concerns with condition notification --- libminifi/include/utils/ThreadPool.h | 20 +++++++++----------- libminifi/src/SchedulingAgent.cpp | 6 ++++-- libminifi/test/unit/SocketTests.cpp | 3 ++- libminifi/test/unit/ThreadPoolTests.cpp | 3 ++- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 9c51c0f5f8..4c399a7ea9 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -121,9 +121,10 @@ class ThreadPool { * a future * @param task this thread pool will subsume ownership of * the worker task - * @return future with the impending result. + * @param future future to move new promise to + * @return true if future can be created and thread pool is in a running state. */ - std::future execute(Worker &&task); + bool execute(Worker &&task, std::future &future); /** * Starts the Thread Pool */ @@ -215,15 +216,14 @@ class ThreadPool { }; template -std::future ThreadPool::execute(Worker &&task) { +bool ThreadPool::execute(Worker &&task, std::future &future) { - bool wasEmpty = worker_queue_.size_approx() == 0; - std::future future = task.getPromise()->get_future(); - worker_queue_.enqueue(std::move(task)); - if (wasEmpty && running_) { + future = std::move(task.getPromise()->get_future()); + bool enqueued = worker_queue_.enqueue(std::move(task)); + if (running_) { tasks_available_.notify_one(); } - return future; + return enqueued; } template @@ -273,10 +273,8 @@ void ThreadPool::start() { template void ThreadPool::shutdown() { - - std::lock_guard lock(manager_mutex_); if (running_.load()) { - + std::lock_guard lock(manager_mutex_); running_.store(false); drain(); diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index fc979fde13..c582c525e4 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -53,7 +53,8 @@ void SchedulingAgent::enableControllerService( utils::Worker functor(f_ex); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - component_lifecycle_thread_pool_.execute(std::move(functor)); + std::future future; + component_lifecycle_thread_pool_.execute(std::move(functor), future); } void SchedulingAgent::disableControllerService( @@ -67,7 +68,8 @@ void SchedulingAgent::disableControllerService( utils::Worker functor(f_ex); // move the functor into the thread pool. While a future is returned // we aren't terribly concerned with the result. - component_lifecycle_thread_pool_.execute(std::move(functor)); + std::future future; + component_lifecycle_thread_pool_.execute(std::move(functor), future); } bool SchedulingAgent::hasTooMuchOutGoing( diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index ae45553452..059e3c082a 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -224,7 +224,8 @@ TEST_CASE("TestTLSContextCreation", "[TestSocket6]") { for (int i = 0; i < 20; i++) { std::function f_ex = createSocket; utils::Worker functor(f_ex); - std::future fut = pool.execute(std::move(functor)); + std::future fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); futures.push_back(std::move(fut)); } pool.start(); diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp index 0fa75e650d..0bba76794a 100644 --- a/libminifi/test/unit/ThreadPoolTests.cpp +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -31,7 +31,8 @@ TEST_CASE("ThreadPoolTest1", "[TPT1]") { std::function f_ex = function; utils::Worker functor(f_ex); pool.start(); - std::future fut = pool.execute(std::move(functor)); + std::future fut; + REQUIRE(true == pool.execute(std::move(functor), fut)); fut.wait(); REQUIRE(true == fut.get()); } From e46f4b16910d065db122615dd282152fcc26e9a1 Mon Sep 17 00:00:00 2001 From: Marc Parisi Date: Fri, 19 May 2017 14:30:00 -0400 Subject: [PATCH 4/4] MINIFI-320: Reduce chance of timing issues with test --- .../ControllerServiceIntegrationTests.cpp | 45 ++++++++++--------- libminifi/test/unit/MockClasses.h | 3 +- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp index 9bfee2bc27..4668bb96db 100644 --- a/libminifi/test/integration/ControllerServiceIntegrationTests.cpp +++ b/libminifi/test/integration/ControllerServiceIntegrationTests.cpp @@ -61,6 +61,7 @@ void waitToVerifyProcessor() { std::this_thread::sleep_for(std::chrono::seconds(2)); } + int main(int argc, char **argv) { std::string test_file_location; std::string key_dir; @@ -82,12 +83,6 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - /* - * nifi.security.client.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.crt.pem - nifi.security.client.private.key=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.ckey.pem - nifi.security.client.pass.phrase=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/cn.pass - nifi.security.client.ca.certificate=/Users/mparisi/Downloads/nifi-toolkit-1.1.1/bin/nifi-cert.pem - */ std::string client_cert = "cn.crt.pem"; std::string priv_key_file = "cn.ckey.pem"; std::string passphrase = "cn.pass"; @@ -147,7 +142,6 @@ int main(int argc, char **argv) { "STARTING FLOW CONTROLLER INTEGRATION TEST"); controller->load(); controller->start(); - waitToVerifyProcessor(); std::shared_ptr ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); ssl_client_cont->enable(); @@ -163,23 +157,34 @@ int main(int argc, char **argv) { std::shared_ptr cs_id = controller ->getControllerServiceNode("ID"); assert(cs_id != nullptr); - controller->disableControllerService(cs_id); - disabled = true; - waitToVerifyProcessor(); - controller->enableControllerService(cs_id); - disabled = false; - waitToVerifyProcessor(); + { + std::lock_guard lock(control_mutex); + controller->disableControllerService(cs_id); + disabled = true; + waitToVerifyProcessor(); + } + { + std::lock_guard lock(control_mutex); + controller->enableControllerService(cs_id); + disabled = false; + waitToVerifyProcessor(); + } std::shared_ptr mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); assert(cs_id->enabled()); - - controller->disableReferencingServices(mock_cont); - disabled = true; - waitToVerifyProcessor(); + { + std::lock_guard lock(control_mutex); + controller->disableReferencingServices(mock_cont); + disabled = true; + waitToVerifyProcessor(); + } assert(cs_id->enabled() == false); - controller->enableReferencingServices(mock_cont); - disabled = false; - waitToVerifyProcessor(); + { + std::lock_guard lock(control_mutex); + controller->enableReferencingServices(mock_cont); + disabled = false; + waitToVerifyProcessor(); + } assert(cs_id->enabled() == true); controller->waitUnload(60000); diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h index d32184bd82..0aa4235382 100644 --- a/libminifi/test/unit/MockClasses.h +++ b/libminifi/test/unit/MockClasses.h @@ -24,6 +24,7 @@ #include "core/ProcessSession.h" std::atomic disabled; +std::mutex control_mutex; class MockControllerService : public core::controller::ControllerService { public: @@ -112,7 +113,7 @@ class MockProcessor : public core::Processor { std::shared_ptr service = context ->getControllerService(linked_service); - + std::lock_guard lock(control_mutex); if (!disabled.load()) { assert(true == context->isControllerServiceEnabled(linked_service)); assert(nullptr != service);