Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-16755 C++ Thin: Add user threadpool size to public configuration #9960

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,13 @@ namespace ignite
* @return Number of logical processors.
*/
IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();

/**
* Get current processor thread count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Get current processor thread count.
* Get current process thread count.

*
* @return Current processor thread count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return Current processor thread count.
* @return Current process thread count.

*/
IGNITE_IMPORT_EXPORT int32_t GetThreadsCount();
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions modules/platforms/cpp/common/os/linux/src/common/concurrent_os.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/

#include <sys/sysinfo.h>
#include <sys/types.h>
#include <dirent.h>
#include <unistd.h>

#include <stdlib.h>
#include <stdio.h>

#include "ignite/common/concurrent_os.h"

Expand Down Expand Up @@ -245,6 +251,33 @@ namespace ignite

return static_cast<uint32_t>(res < 0 ? 0 : res);
}

int32_t GetThreadsCount()
{
DIR *proc_dir;
{
char dirname[100];
snprintf(dirname, sizeof dirname, "/proc/%d/task", getpid());
proc_dir = opendir(dirname);
}

if (!proc_dir)
return -1;

int32_t threadsCnt = 0;
struct dirent *entry;
while ((entry = readdir(proc_dir)) != NULL)
{
if(entry->d_name[0] == '.')
continue;

++threadsCnt;
}

closedir(proc_dir);

return threadsCnt;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <map>

#include <windows.h>
#include <tlhelp32.h>

#include "ignite/common/common.h"
namespace ignite
Expand Down Expand Up @@ -652,6 +653,13 @@ namespace ignite
* @return Number of logical processors.
*/
IGNITE_IMPORT_EXPORT uint32_t GetNumberOfProcessors();

/**
* Get current processor thread count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Get current processor thread count.
* Get current process thread count.

*
* @return Current processor thread count.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return Current processor thread count.
* @return Current process thread count.

*/
IGNITE_IMPORT_EXPORT int32_t GetThreadsCount();
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,24 @@ namespace ignite

return static_cast<uint32_t>(info.dwNumberOfProcessors < 0 ? 0 : info.dwNumberOfProcessors);
}

int32_t GetThreadsCount()
{
DWORD id = GetCurrentProcessId();
HANDLE snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPALL, 0);

PROCESSENTRY32 entry;
memset(&entry, 0, sizeof(entry));
entry.dwSize = sizeof(entry);

BOOL ret = Process32First(snapshot, &entry);

while (ret && entry.th32ProcessID != id)
ret = Process32Next(snapshot, &entry);

CloseHandle(snapshot);
return static_cast<int32_t>(ret ? entry.cntThreads : -1);
}
}
}
}
3 changes: 2 additions & 1 deletion modules/platforms/cpp/thin-client-test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ if (WIN32)
set(Boost_USE_STATIC_LIBS ON)
endif()

find_package(Boost 1.53 REQUIRED COMPONENTS unit_test_framework chrono thread system)
find_package(Boost 1.53 REQUIRED COMPONENTS unit_test_framework chrono thread system regex)

include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${JNI_INCLUDE_DIRS})
include_directories(include)
Expand All @@ -34,6 +34,7 @@ set(SOURCES
src/cache_client_test.cpp
src/compute_client_test.cpp
src/continuous_query_test.cpp
src/test_server.cpp
src/test_utils.cpp
src/ignite_client_test.cpp
src/interop_test.cpp
Expand Down
213 changes: 213 additions & 0 deletions modules/platforms/cpp/thin-client-test/include/test_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef _IGNITE_THIN_CLIENT_TEST_TEST_SERVER
#define _IGNITE_THIN_CLIENT_TEST_TEST_SERVER

#include <stdint.h>

#include <vector>

#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif // _WIN32_WINNT

#include <ignite/impl/binary/binary_common.h>

#include <boost/asio.hpp>
#include <boost/thread.hpp>

namespace ignite
{

/**
* Test Server Session.
*/
class TestServerSession
{
public:
/**
* Construct new instance of class.
* @param service Asio service.
* @param responses Responses to provide to requests.
*/
TestServerSession(boost::asio::io_service& service, const std::vector< std::vector<int8_t> >& responses);

/**
* Get socket.
*/
boost::asio::ip::tcp::socket& GetSocket()
{
return socket;
}

/**
* Start session.
*/
void Start();

/**
* Get response at index.
* @param idx Index.
* @return Response.
*/
const std::vector<int8_t>& GetResponse(size_t idx) const
{
return responses.at(idx);
}

private:
/**
* Receive next request.
*/
void ReadNextRequest();

/**
* Handle received request size.
* @param error Error.
* @param bytesTransferred Bytes transferred.
*/
void HandleRequestSizeReceived(const boost::system::error_code& error, size_t bytesTransferred);

/**
* Handle received request.
* @param error Error.
* @param bytesTransferred Bytes transferred.
*/
void HandleRequestReceived(const boost::system::error_code& error, size_t bytesTransferred);

/**
* Handle received request.
* @param error Error.
* @param bytesTransferred Bytes transferred.
*/
void HandleResponseSent(const boost::system::error_code& error, size_t bytesTransferred);

// The socket used to communicate with the client.
boost::asio::ip::tcp::socket socket;

// Received requests.
std::vector< std::vector<int8_t> > requests;

// Responses to provide.
const std::vector< std::vector<int8_t> > responses;

// Number of requests answered.
size_t requestsResponded;
};

/**
* Test Server.
*/
class TestServer
{
public:
/**
* Constructor.
* @param port TCP port to listen.
*/
TestServer(uint16_t port = 11110);

/**
* Destructor.
*/
~TestServer();

/**
* Push new handshake response to send.
* @param accept Accept or reject response.
*/
void PushHandshakeResponse(bool accept)
{
std::vector<int8_t> rsp(4 + 1 + 4 + 1 + 4 + 17);
// Size
rsp[0] = 1 + 4 + 1 + 4 + 17;

// Accept flag
rsp[4] = accept ? 1 : 0;

// Bitmask (array header)
rsp[5] = impl::binary::IGNITE_TYPE_ARRAY_BYTE;

// Guid (just random data so it won't be all zeroes)
rsp[10] = impl::binary::IGNITE_TYPE_UUID;
rsp[12] = 123;

PushResponse(rsp);
}

/**
* Push new response to send.
* @param resp Response to push.
*/
void PushResponse(const std::vector<int8_t>& resp)
{
responses.push_back(resp);
}

/**
* Get specified session.
* @param idx Index.
* @return Specified session.
*/
TestServerSession& GetSession(size_t idx = 0)
{
return *sessions.at(idx);
}

/**
* Start server.
*/
void Start();

/**
* Stop server.
*/
void Stop();

private:
/**
* Start accepting connections.
*/
void StartAccept();

/**
* Handle accepted connection.
* @param session Accepted session.
* @param error Error.
*/
void HandleAccept(boost::shared_ptr<TestServerSession> session, const boost::system::error_code& error);

// Service.
boost::asio::io_service service;

// Acceptor.
boost::asio::ip::tcp::acceptor acceptor;

// Responses.
std::vector< std::vector<int8_t> > responses;

// Sessions.
std::vector< boost::shared_ptr<TestServerSession> > sessions;

// Server Thread.
boost::shared_ptr<boost::thread> serverThread;
};

} // namespace ignite

#endif //_IGNITE_THIN_CLIENT_TEST_TEST_SERVER