diff --git a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h index c8f72936c8e65..5eaa03dae6834 100644 --- a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h +++ b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h @@ -743,6 +743,13 @@ namespace ignite * @return Number of logical processors. */ IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors(); + + /** + * Get current processor thread count. + * + * @return Current processor thread count. + */ + IGNITE_IMPORT_EXPORT int32_t GetThreadsCount(); } } } diff --git a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp index 0370b57fc06d7..f781c2a9b0bb1 100644 --- a/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp +++ b/modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp @@ -16,6 +16,12 @@ */ #include +#include +#include +#include + +#include +#include #include "ignite/common/concurrent_os.h" @@ -245,6 +251,33 @@ namespace ignite return static_cast(res < 0 ? 0 : res); } + + int32_t GetThreadsCount() + { + DIR *proc_dir; + { + char dirname[100]; + snprintf(dirname, sizeof dirname, "/proc/%d/task", getpid()); + proc_dir = opendir(dirname); + } + + if (!proc_dir) + return -1; + + int32_t threadsCnt = 0; + struct dirent *entry; + while ((entry = readdir(proc_dir)) != NULL) + { + if(entry->d_name[0] == '.') + continue; + + ++threadsCnt; + } + + closedir(proc_dir); + + return threadsCnt; + } } } } diff --git a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h index 25fa5c22cbe9f..54f66fd9fee5c 100644 --- a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h +++ b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h @@ -24,6 +24,7 @@ #include #include +#include #include "ignite/common/common.h" namespace ignite @@ -652,6 +653,13 @@ namespace ignite * @return Number of logical processors. */ IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors(); + + /** + * Get current processor thread count. + * + * @return Current processor thread count. + */ + IGNITE_IMPORT_EXPORT int32_t GetThreadsCount(); } } } diff --git a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp index 02b6834dcc4cb..19a0d9e0216f5 100644 --- a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp +++ b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp @@ -248,6 +248,24 @@ namespace ignite return static_cast(info.dwNumberOfProcessors < 0 ? 0 : info.dwNumberOfProcessors); } + + int32_t GetThreadsCount() + { + DWORD id = GetCurrentProcessId(); + HANDLE snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPALL, 0); + + PROCESSENTRY32 entry; + memset(&entry, 0, sizeof(entry)); + entry.dwSize = sizeof(entry); + + BOOL ret = Process32First(snapshot, &entry); + + while (ret && entry.th32ProcessID != id) + ret = Process32Next(snapshot, &entry); + + CloseHandle(snapshot); + return static_cast(ret ? entry.cntThreads : -1); + } } } } diff --git a/modules/platforms/cpp/thin-client-test/CMakeLists.txt b/modules/platforms/cpp/thin-client-test/CMakeLists.txt index 8867f2c569090..215871f955420 100644 --- a/modules/platforms/cpp/thin-client-test/CMakeLists.txt +++ b/modules/platforms/cpp/thin-client-test/CMakeLists.txt @@ -23,7 +23,7 @@ if (WIN32) set(Boost_USE_STATIC_LIBS ON) endif() -find_package(Boost 1.53 REQUIRED COMPONENTS unit_test_framework chrono thread system) +find_package(Boost 1.53 REQUIRED COMPONENTS unit_test_framework chrono thread system regex) include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${JNI_INCLUDE_DIRS}) include_directories(include) @@ -34,6 +34,7 @@ set(SOURCES src/cache_client_test.cpp src/compute_client_test.cpp src/continuous_query_test.cpp + src/test_server.cpp src/test_utils.cpp src/ignite_client_test.cpp src/interop_test.cpp diff --git a/modules/platforms/cpp/thin-client-test/include/test_server.h b/modules/platforms/cpp/thin-client-test/include/test_server.h new file mode 100644 index 0000000000000..5b0752e7f096e --- /dev/null +++ b/modules/platforms/cpp/thin-client-test/include/test_server.h @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_THIN_CLIENT_TEST_TEST_SERVER +#define _IGNITE_THIN_CLIENT_TEST_TEST_SERVER + +#include + +#include + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0601 +#endif // _WIN32_WINNT + +#include + +#include +#include + +namespace ignite +{ + +/** + * Test Server Session. + */ +class TestServerSession +{ +public: + /** + * Construct new instance of class. + * @param service Asio service. + * @param responses Responses to provide to requests. + */ + TestServerSession(boost::asio::io_service& service, const std::vector< std::vector >& responses); + + /** + * Get socket. + */ + boost::asio::ip::tcp::socket& GetSocket() + { + return socket; + } + + /** + * Start session. + */ + void Start(); + + /** + * Get response at index. + * @param idx Index. + * @return Response. + */ + const std::vector& GetResponse(size_t idx) const + { + return responses.at(idx); + } + +private: + /** + * Receive next request. + */ + void ReadNextRequest(); + + /** + * Handle received request size. + * @param error Error. + * @param bytesTransferred Bytes transferred. + */ + void HandleRequestSizeReceived(const boost::system::error_code& error, size_t bytesTransferred); + + /** + * Handle received request. + * @param error Error. + * @param bytesTransferred Bytes transferred. + */ + void HandleRequestReceived(const boost::system::error_code& error, size_t bytesTransferred); + + /** + * Handle received request. + * @param error Error. + * @param bytesTransferred Bytes transferred. + */ + void HandleResponseSent(const boost::system::error_code& error, size_t bytesTransferred); + + // The socket used to communicate with the client. + boost::asio::ip::tcp::socket socket; + + // Received requests. + std::vector< std::vector > requests; + + // Responses to provide. + const std::vector< std::vector > responses; + + // Number of requests answered. + size_t requestsResponded; +}; + +/** + * Test Server. + */ +class TestServer +{ +public: + /** + * Constructor. + * @param port TCP port to listen. + */ + TestServer(uint16_t port = 11110); + + /** + * Destructor. + */ + ~TestServer(); + + /** + * Push new handshake response to send. + * @param accept Accept or reject response. + */ + void PushHandshakeResponse(bool accept) + { + std::vector rsp(4 + 1 + 4 + 1 + 4 + 17); + // Size + rsp[0] = 1 + 4 + 1 + 4 + 17; + + // Accept flag + rsp[4] = accept ? 1 : 0; + + // Bitmask (array header) + rsp[5] = impl::binary::IGNITE_TYPE_ARRAY_BYTE; + + // Guid (just random data so it won't be all zeroes) + rsp[10] = impl::binary::IGNITE_TYPE_UUID; + rsp[12] = 123; + + PushResponse(rsp); + } + + /** + * Push new response to send. + * @param resp Response to push. + */ + void PushResponse(const std::vector& resp) + { + responses.push_back(resp); + } + + /** + * Get specified session. + * @param idx Index. + * @return Specified session. + */ + TestServerSession& GetSession(size_t idx = 0) + { + return *sessions.at(idx); + } + + /** + * Start server. + */ + void Start(); + + /** + * Stop server. + */ + void Stop(); + +private: + /** + * Start accepting connections. + */ + void StartAccept(); + + /** + * Handle accepted connection. + * @param session Accepted session. + * @param error Error. + */ + void HandleAccept(boost::shared_ptr session, const boost::system::error_code& error); + + // Service. + boost::asio::io_service service; + + // Acceptor. + boost::asio::ip::tcp::acceptor acceptor; + + // Responses. + std::vector< std::vector > responses; + + // Sessions. + std::vector< boost::shared_ptr > sessions; + + // Server Thread. + boost::shared_ptr serverThread; +}; + +} // namespace ignite + +#endif //_IGNITE_THIN_CLIENT_TEST_TEST_SERVER \ No newline at end of file diff --git a/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp b/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp index 786b883bd2f47..0e15983b14dad 100644 --- a/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp +++ b/modules/platforms/cpp/thin-client-test/src/ignite_client_test.cpp @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include @@ -45,10 +46,10 @@ class IgniteClientTestSuiteFixture * Wait for connections. * @return True if condition was met, false if timeout has been reached. */ - bool WaitForConnections(size_t expected, int32_t timeout = 5000) + static bool WaitForConnections(size_t expected, int32_t timeout = 5000) { return ignite_test::WaitForCondition( - boost::bind(&IgniteClientTestSuiteFixture::CheckActiveConnections, this, expected), + boost::bind(&IgniteClientTestSuiteFixture::CheckActiveConnections, expected), timeout); } @@ -71,6 +72,44 @@ class IgniteClientTestSuiteFixture BOOST_CHECK_EQUAL(GetActiveConnections(), expect); } + /** + * Check that client started with specified size of user thread pool started exactly the specified number of threads + * in thread pool. + * + * @param cfg Client configuration. + * @param num Expected thread number + */ + static void CheckThreadsNum(IgniteClientConfiguration &cfg, uint32_t num) + { + ignite::TestServer server; + server.PushHandshakeResponse(true); + server.Start(); + + int32_t threadsBefore = ignite::common::concurrent::GetThreadsCount(); + int32_t netThreads = 1; + +#ifdef _WIN32 + // In Windows there is one additional thread for connecting. + netThreads += 1; +#endif + int32_t threadsExpected = static_cast(num) + netThreads; + + cfg.SetUserThreadPoolSize(num); + { + IgniteClient client = IgniteClient::Start(cfg); + + int32_t threadsActual = ignite::common::concurrent::GetThreadsCount() - threadsBefore; + + BOOST_CHECK_EQUAL(threadsExpected, threadsActual); + } + + int32_t threadsAfter = ignite::common::concurrent::GetThreadsCount(); + + BOOST_CHECK_EQUAL(threadsBefore, threadsAfter); + BOOST_CHECK_EQUAL(num, cfg.GetUserThreadPoolSize()); + + server.Stop(); + } /** * Check number of active connections. @@ -78,7 +117,7 @@ class IgniteClientTestSuiteFixture * @param expect connections to expect. * @return @c true on success. */ - bool CheckActiveConnections(size_t expect) + static bool CheckActiveConnections(size_t expect) { return GetActiveConnections() == expect; } @@ -193,4 +232,21 @@ BOOST_AUTO_TEST_CASE(IgniteClientReconnect) BOOST_REQUIRE_THROW((client.GetOrCreateCache("test")), ignite::IgniteError); } +BOOST_AUTO_TEST_CASE(IgniteClientUserThreadPoolSize) +{ + IgniteClientConfiguration cfg; + + BOOST_CHECK_EQUAL(0, cfg.GetUserThreadPoolSize()); + + cfg.SetEndPoints("127.0.0.1:11110"); + + CheckThreadsNum(cfg, 1); + CheckThreadsNum(cfg, 2); + CheckThreadsNum(cfg, 3); + CheckThreadsNum(cfg, 4); + CheckThreadsNum(cfg, 8); + CheckThreadsNum(cfg, 16); + CheckThreadsNum(cfg, 128); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/modules/platforms/cpp/thin-client-test/src/test_server.cpp b/modules/platforms/cpp/thin-client-test/src/test_server.cpp new file mode 100644 index 0000000000000..2327bd0b9b6a8 --- /dev/null +++ b/modules/platforms/cpp/thin-client-test/src/test_server.cpp @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#ifdef _MSC_VER +# pragma warning(push) +# pragma warning(disable : 4355) +#endif //_MSC_VER + +#ifndef _WIN32_WINNT +#define _WIN32_WINNT 0x0601 +#endif // _WIN32_WINNT + +#include +#include +#include + +#ifdef _MSC_VER +# pragma warning(pop) +#endif //_MSC_VER + +#include +#include + +#include "test_server.h" + +namespace ignite +{ + +TestServerSession::TestServerSession(boost::asio::io_service& service, const std::vector< std::vector >& responses) : + socket(service), + responses(responses), + requestsResponded(0) +{ + // No-op. +} + +void TestServerSession::Start() +{ + ReadNextRequest(); +} + +void TestServerSession::ReadNextRequest() +{ + requests.push_back(std::vector()); + + std::vector& newRequest = requests.back(); + newRequest.resize(4); + + async_read(socket, boost::asio::buffer(newRequest.data(), newRequest.size()), + boost::bind(&TestServerSession::HandleRequestSizeReceived, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void TestServerSession::HandleRequestSizeReceived(const boost::system::error_code& error, size_t bytesTransferred) +{ + if (error || bytesTransferred != 4) + { + socket.close(); + + return; + } + + std::vector& newRequest = requests.back(); + impl::interop::InteropUnpooledMemory mem(4); + mem.Length(4); + + memcpy(mem.Data(), newRequest.data(), newRequest.size()); + int32_t size = impl::binary::BinaryUtils::ReadInt32(mem, 0); + + newRequest.resize(4 + size); + + async_read(socket, boost::asio::buffer(newRequest.data() + 4, size), + boost::bind(&TestServerSession::HandleRequestReceived, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void TestServerSession::HandleRequestReceived(const boost::system::error_code& error, size_t bytesTransferred) +{ + if (error || !bytesTransferred || requestsResponded == responses.size()) + { + std::cout << requestsResponded << std::endl; + std::cout << responses.size() << std::endl; + + socket.close(); + + return; + } + + const std::vector& response = responses.at(requestsResponded); + + async_write(socket, boost::asio::buffer(response.data(), response.size()), + boost::bind(&TestServerSession::HandleResponseSent, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + ++requestsResponded; +} + +void TestServerSession::HandleResponseSent(const boost::system::error_code& error, size_t bytesTransferred) +{ + if (error || !bytesTransferred) + { + socket.close(); + + return; + } + + ReadNextRequest(); +} + + +TestServer::TestServer(uint16_t port) : + acceptor(service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) +{ + // No-op. +} + +TestServer::~TestServer() +{ + Stop(); +} + +void TestServer::Start() +{ + if (!serverThread) + { + StartAccept(); + serverThread.reset(new boost::thread(boost::bind(&boost::asio::io_service::run, &service))); + } +} + +void TestServer::Stop() +{ + if (serverThread) + { + service.stop(); + serverThread->join(); + serverThread.reset(); + } +} + +void TestServer::StartAccept() +{ + using namespace boost::asio; + + boost::shared_ptr newSession; + newSession.reset(new TestServerSession(service, responses)); + + acceptor.async_accept(newSession->GetSocket(), + boost::bind(&TestServer::HandleAccept, this, newSession, placeholders::error)); +} + +void TestServer::HandleAccept(boost::shared_ptr session, const boost::system::error_code& error) +{ + if (!error) + { + session->Start(); + + sessions.push_back(session); + } + + StartAccept(); +} + +} // namespace ignite diff --git a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h index 68750a23855f9..01bc407263eb6 100644 --- a/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h +++ b/modules/platforms/cpp/thin-client/include/ignite/thin/ignite_client_configuration.h @@ -51,7 +51,8 @@ namespace ignite sslMode(SslMode::DISABLE), partitionAwareness(true), connectionsLimit(0), - connectionTimeout(DEFAULT_CONNECTION_TIMEOUT) + connectionTimeout(DEFAULT_CONNECTION_TIMEOUT), + userThreadPoolSize(0) { // No-op. } @@ -294,6 +295,26 @@ namespace ignite connectionTimeout = timeout; } + /** + * Set thread pool size. + * + * @param size Desired number of threads in user thread pool. Zero means to use number of available core. + */ + void SetUserThreadPoolSize(uint32_t size) + { + userThreadPoolSize = size; + } + + /** + * Get thread pool size. + * + * @return Number of threads in user thread pool. Zero means to use number of available core. + */ + uint32_t GetUserThreadPoolSize() const + { + return userThreadPoolSize; + } + private: /** Connection end points */ std::string endPoints; @@ -324,6 +345,9 @@ namespace ignite /** Connection timeout in milliseconds. */ int32_t connectionTimeout; + + /** User thread pool size. */ + uint32_t userThreadPoolSize; }; } } diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp index 78580caceb76b..d23109044091a 100644 --- a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp +++ b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp @@ -42,7 +42,7 @@ namespace ignite { DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) : config(cfg), - userThreadPool(0) + userThreadPool(cfg.GetUserThreadPoolSize()) { srand(common::GetRandSeed());