From 6223305514705ea77fbbe490a3dfd4d460c69e8f Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 21:16:10 +0200 Subject: [PATCH 01/13] clean win merge by unifying tcp_client and providing unified interface for io_service. Provide thread_pool to handle multithreaded io. clean duplication in tests. support concurrent callback executions --- CMakeLists.txt | 2 +- includes/cpp_redis/network/io_service.hpp | 65 ++++++ .../cpp_redis/network/redis_connection.hpp | 7 +- .../network/{unix => }/tcp_client.hpp | 20 +- .../cpp_redis/network/unix/io_service.hpp | 44 ++-- .../cpp_redis/network/windows/io_service.hpp | 68 +++--- .../cpp_redis/network/windows/tcp_client.hpp | 85 ------- includes/cpp_redis/redis_client.hpp | 3 +- includes/cpp_redis/tools/thread_pool.hpp | 71 ++++++ sources/network/io_service.cpp | 48 ++++ sources/network/redis_connection.cpp | 4 +- sources/network/{windows => }/tcp_client.cpp | 112 +++++---- sources/network/unix/io_service.cpp | 17 +- sources/network/unix/tcp_client.cpp | 212 ------------------ sources/network/windows/io_service.cpp | 67 +++--- sources/redis_client.cpp | 9 +- sources/redis_subscriber.cpp | 4 +- sources/tools/thread_pool.cpp | 78 +++++++ tests/sources/spec/redis_subscriber_spec.cpp | 104 ++++----- 19 files changed, 474 insertions(+), 546 deletions(-) create mode 100644 includes/cpp_redis/network/io_service.hpp rename includes/cpp_redis/network/{unix => }/tcp_client.hpp (85%) delete mode 100644 includes/cpp_redis/network/windows/tcp_client.hpp create mode 100644 includes/cpp_redis/tools/thread_pool.hpp create mode 100644 sources/network/io_service.cpp rename sources/network/{windows => }/tcp_client.cpp (68%) delete mode 100644 sources/network/unix/tcp_client.cpp create mode 100644 sources/tools/thread_pool.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index db8f6bc2..09484e93 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ link_directories(${GTEST_LIBS}) ### # sources ### -set(SRC_DIRS "sources" "sources/network" "sources/builders") +set(SRC_DIRS "sources" "sources/network" "sources/builders" "sources/tools") IF (WIN32) set(SRC_DIRS ${SRC_DIRS} "sources/network/windows") diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp new file mode 100644 index 00000000..2d99e3d6 --- /dev/null +++ b/includes/cpp_redis/network/io_service.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include +#include +#include + +#define __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS 1 + +namespace cpp_redis { + +namespace network { + +class io_service { +public: + //! get default global instance + static const std::shared_ptr& get_global_instance(void); + //! set default global instance + static void set_global_instance(const std::shared_ptr& instance); + +public: + //! ctor & dtor + io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); + virtual ~io_service(void) = default; + + //! copy ctor & assignment operator + io_service(const io_service&) = default; + io_service& operator=(const io_service&) = default; + +public: + //! disconnection handler declaration + typedef std::function disconnection_handler_t; + + //! add or remove a given fd from the io service + //! untrack should never be called from inside a callback + virtual void track(int fd, const disconnection_handler_t& handler) = 0; + virtual void untrack(int fd) = 0; + + //! asynchronously read read_size bytes and append them to the given buffer + //! on completion, call the read_callback to notify of the success or failure of the operation + //! return false if another async_read operation is in progress or fd is not registered + typedef std::function read_callback_t; + virtual bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) = 0; + + //! asynchronously write write_size bytes from buffer to the specified fd + //! on completion, call the write_callback to notify of the success or failure of the operation + //! return false if another async_write operation is in progress or fd is not registered + typedef std::function write_callback_t; + virtual bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) = 0; + + size_t get_nb_workers(void) const; + +private: + //! listen for incoming events and notify + virtual void process_io(void) = 0; + +private: + size_t m_nb_workers; +}; + +//! multi-platform instance builder +std::shared_ptr create_io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index fe6d5ec6..4b7e6743 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -5,12 +5,7 @@ #include #include -#ifdef _MSC_VER -#include -#else -#include -#endif /* _MSC_VER */ - +#include #include namespace cpp_redis { diff --git a/includes/cpp_redis/network/unix/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp similarity index 85% rename from includes/cpp_redis/network/unix/tcp_client.hpp rename to includes/cpp_redis/network/tcp_client.hpp index 2a564fe0..a10f37ed 100644 --- a/includes/cpp_redis/network/unix/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -1,19 +1,30 @@ #pragma once #include +#include #include #include #include #include #include -#include +#include #include #ifndef __CPP_REDIS_READ_SIZE #define __CPP_REDIS_READ_SIZE 4096 #endif /* __CPP_REDIS_READ_SIZE */ +#ifdef _WIN32 + typedef SOCKET _sock_t; +#else + typedef int _sock_t; +#endif /* _WIN32 */ + +#ifndef INVALID_SOCKET +#define INVALID_SOCKET -1 +#endif /* INVALID_SOCKET */ + namespace cpp_redis { namespace network { @@ -56,20 +67,21 @@ class tcp_client { void reset_state(void); void clear_buffer(void); + void setup_socket(void); + private: //! io service instance std::shared_ptr m_io_service; //! socket fd - int m_fd; + _sock_t m_sock; //! is connected std::atomic_bool m_is_connected; //! buffers - static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE; std::vector m_read_buffer; - std::vector m_write_buffer; + std::list> m_write_buffer; //! handlers receive_handler_t m_receive_handler; diff --git a/includes/cpp_redis/network/unix/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp index 69b100e0..a11c0935 100644 --- a/includes/cpp_redis/network/unix/io_service.hpp +++ b/includes/cpp_redis/network/unix/io_service.hpp @@ -11,46 +11,30 @@ #include #include +#include + namespace cpp_redis { namespace network { -class io_service { -public: - //! instance getter (singleton pattern) - static const std::shared_ptr& get_instance(void); +namespace unix { - //! dtor +class io_service : public network::io_service { +public: + //! ctor & dtor + io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); ~io_service(void); -private: - //! ctor - io_service(void); - //! copy ctor & assignment operator io_service(const io_service&) = delete; io_service& operator=(const io_service&) = delete; public: - //! disconnection handler declaration - typedef std::function disconnection_handler_t; - - //! add or remove a given fd from the io service - //! untrack should never be called from inside a callback - void track(int fd, const disconnection_handler_t& handler); - void untrack(int fd); - - //! asynchronously read read_size bytes and append them to the given buffer - //! on completion, call the read_callback to notify of the success or failure of the operation - //! return false if another async_read operation is in progress or fd is not registered - typedef std::function read_callback_t; - bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); - - //! asynchronously write write_size bytes from buffer to the specified fd - //!on completion, call the write_callback to notify of the success or failure of the operation - //! return false if another async_write operation is in progress or fd is not registered - typedef std::function write_callback_t; - bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + void track(int fd, const disconnection_handler_t& handler) override; + void untrack(int fd) override; + + bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; + bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; private: //! simple struct to keep track of ongoing operations on a given fd @@ -73,7 +57,7 @@ class io_service { private: //! listen for incoming events and notify - void listen(void); + void process_io(void) override; //! notify the poll call so that it can wake up to process new events void notify_poll(void); @@ -108,6 +92,8 @@ class io_service { std::recursive_mutex m_fds_mutex; }; +} //! unix + } //! network } //! cpp_redis diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index fcdf6d4d..a45f2919 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -8,54 +8,44 @@ #include -#define MAX_BUFF_SIZE __CPP_REDIS_READ_SIZE -#define MAX_WORKER_THREADS 16 +#include namespace cpp_redis { namespace network { +namespace windows { + typedef enum _enIoOperation { - //IO_OP_ACCEPT, + IO_OP_ACCEPT, IO_OP_READ, IO_OP_WRITE } enIoOperation; - -class io_service { +class io_service : public network::io_service { public: - //! instance getter (singleton pattern) - static const std::shared_ptr& get_instance(void); - io_service(size_t max_worker_threads = MAX_WORKER_THREADS); + //! ctor & dtor + io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); ~io_service(void); - void shutdown(); - private: //! copy ctor & assignment operator io_service(const io_service&) = delete; io_service& operator=(const io_service&) = delete; public: - //! disconnection handler declaration - typedef std::function disconnection_handler_t; - - //! add or remove a given socket from the io service - //! untrack should never be called from inside a callback - void track(SOCKET sock, const disconnection_handler_t& handler); - void untrack(SOCKET sock); - - //! asynchronously read read_size bytes and append them to the given buffer - //! on completion, call the read_callback to notify of the success or failure of the operation - //! return false if another async_read operation is in progress or socket is not registered - typedef std::function read_callback_t; - bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); - - //! asynchronously write write_size bytes from buffer to the specified fd - //!on completion, call the write_callback to notify of the success or failure of the operation - //! return false if another async_write operation is in progress or socket is not registered - typedef std::function write_callback_t; - bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + void track(SOCKET sock, const disconnection_handler_t& handler) override; + void untrack(SOCKET sock) override; + + bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; + bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; + +private: + //! wait for incoming events and notify + int process_io(void) override; + + //! shutdown + void shutdown(void); private: struct io_context_info : OVERLAPPED { @@ -63,6 +53,7 @@ class io_service { enIoOperation eOperation; }; +private: //! simple struct to keep track of ongoing operations on a given sockeet class sock_info { public: @@ -78,12 +69,12 @@ class io_service { SOCKET hsock; std::size_t sent_bytes; - //Must protect the members of our structure from access by multiple threads during IO Completion + //! Must protect the members of our structure from access by multiple threads during IO Completion std::recursive_mutex sock_info_mutex; - //We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations - //Since each must have its OWN struct if we issue them at the same time. - //othewise things get tangled up and borked. + //! We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations + //! Since each must have its OWN struct if we issue them at the same time. + //! othewise things get tangled up and borked. std::vector io_contexts_pool; disconnection_handler_t disconnection_handler; @@ -117,14 +108,9 @@ class io_service { } }; - typedef std::function callback_t; - - //! wait for incoming events and notify - int process_io(void); - +private: + //! completion port HANDLE m_completion_port; - unsigned int m_worker_thread_pool_size; - std::vector m_worker_threads; //vector containing all the threads we start to service our i/o requests private: //! whether the worker should terminate or not @@ -142,6 +128,8 @@ class io_service { std::recursive_mutex m_socket_mutex; }; +} //! windows + } //! network } //! cpp_redis diff --git a/includes/cpp_redis/network/windows/tcp_client.hpp b/includes/cpp_redis/network/windows/tcp_client.hpp deleted file mode 100644 index 8fc9f804..00000000 --- a/includes/cpp_redis/network/windows/tcp_client.hpp +++ /dev/null @@ -1,85 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#ifndef __CPP_REDIS_READ_SIZE -#define __CPP_REDIS_READ_SIZE 4096 -#endif /* __CPP_REDIS_READ_SIZE */ - -namespace cpp_redis { - -namespace network { - -//! tcp_client -//! async tcp client based on boost asio -class tcp_client { -public: - //! ctor & dtor - tcp_client(const std::shared_ptr& IO = nullptr); - ~tcp_client(void); - - //! assignment operator & copy ctor - tcp_client(const tcp_client&) = delete; - tcp_client& operator=(const tcp_client&) = delete; - - //! returns whether the client is connected or not - bool is_connected(void); - - //! handle connection & disconnection - typedef std::function disconnection_handler_t; - typedef std::function& buffer)> receive_handler_t; - void connect(const std::string& host, unsigned int port, - const disconnection_handler_t& disconnection_handler = nullptr, - const receive_handler_t& receive_handler = nullptr); - void disconnect(void); - - //! send data - void send(const std::string& buffer); - void send(const std::vector& buffer); - -private: - //! make async read and write operations - void async_read(void); - void async_write(void); - - //! io service callback - void io_service_disconnection_handler(io_service&); - - void reset_state(void); - void clear_buffer(void); - -private: - //! io service instance - const std::shared_ptr m_io_service; - - //! socket - SOCKET m_sock; - - //! is connected - std::atomic_bool m_is_connected; - - //! buffers - static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE; - std::vector m_read_buffer; - std::list> m_write_buffer; - - //! handlers - receive_handler_t m_receive_handler; - disconnection_handler_t m_disconnection_handler; - - //! thread safety - std::mutex m_write_buffer_mutex; -}; - -} //! network - -} //! cpp_redis diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index a1f6cbaa..8bedca94 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -45,7 +45,7 @@ class redis_client { std::unique_lock lock_callback(m_callbacks_mutex); __CPP_REDIS_LOG(debug, "cpp_redis::redis_client waits for callbacks to complete"); - m_sync_condvar.wait_for(lock_callback, timeout, [=] { return m_callbacks.empty(); }); + m_sync_condvar.wait_for(lock_callback, timeout, [=] { return m_callbacks_running == 0 && m_callbacks.empty(); }); __CPP_REDIS_LOG(debug, "cpp_redis::redis_client finished to wait for callbacks completion (or timeout reached)"); return *this; @@ -282,6 +282,7 @@ class redis_client { std::mutex m_callbacks_mutex; std::mutex m_send_mutex; std::condition_variable m_sync_condvar; + std::atomic_uint m_callbacks_running; }; } //! cpp_redis diff --git a/includes/cpp_redis/tools/thread_pool.hpp b/includes/cpp_redis/tools/thread_pool.hpp new file mode 100644 index 00000000..4b48239a --- /dev/null +++ b/includes/cpp_redis/tools/thread_pool.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define __CPP_REDIS_DEFAULT_NB_THREADS 16 + +namespace cpp_redis { + +namespace tools { + +class thread_pool { +public: + //! singleton pattern: instance getter + //! first parameter defines the number of threads to use + //! the parameter is only processed for the first call, and is ignored on future calls + static const std::shared_ptr& get_instance(size_t nb_threads = __CPP_REDIS_DEFAULT_NB_THREADS); + + //! dtor + ~thread_pool(void); + +private: + //! ctor + thread_pool(size_t nb_threads); + + //! copy ctor & assignment operator + thread_pool(const thread_pool&) = delete; + thread_pool& operator=(const thread_pool&) = delete; + +public: + //! whether the thread_pool is running or not + bool is_running(void) const; + + //! return number of threads + size_t get_nb_threads(void) const; + + //! add tasks + typedef std::function task_t; + void add_task(const task_t& task); + void operator<<(const task_t& task); + +private: + //! workers processing loop + void run(void); + + //! retrieve available task + task_t fetch_task(void); + +private: + //! whether the thread_pool is running or not + std::atomic_bool m_is_running; + + //! workers + std::vector m_workers; + + //! taks + std::queue m_tasks; + + //! thread safety + std::mutex m_tasks_mutex; + std::condition_variable m_tasks_condvar; +}; + +} //! tools + +} //! cpp_redis diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp new file mode 100644 index 00000000..2817f41a --- /dev/null +++ b/sources/network/io_service.cpp @@ -0,0 +1,48 @@ +#include + +#ifdef _WIN32 +#include +#else +#include +#endif /* _WIN32 */ + +namespace cpp_redis { + +namespace network { + +static std::shared_ptr global_instance = nullptr; + +const std::shared_ptr& +io_service::get_global_instance(void) { + if (!global_instance) + global_instance = create_io_service(); + + return global_instance; +} + +void +io_service::set_global_instance(const std::shared_ptr& io_service) { + global_instance = io_service; +} + +io_service::io_service(size_t nb_io_service_workers) +: m_nb_workers(nb_io_service_workers) +{} + +size_t +io_service::get_nb_workers(void) const { + return m_nb_workers; +} + +std::shared_ptr +create_io_service(size_t nb_io_service_workers) { +#ifdef _WIN32 + return std::make_shared(nb_io_service_workers); +#else + return std::make_shared(nb_io_service_workers); +#endif /* _WIN32 */ +} + +} //! network + +} //! cpp_redis diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 7283b768..26b6d5dd 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -5,8 +5,8 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(const std::shared_ptr& IO) -: m_client(IO) +redis_connection::redis_connection(const std::shared_ptr& io_service) +: m_client(io_service) , m_reply_callback(nullptr) , m_disconnection_handler(nullptr) { __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection created"); diff --git a/sources/network/windows/tcp_client.cpp b/sources/network/tcp_client.cpp similarity index 68% rename from sources/network/windows/tcp_client.cpp rename to sources/network/tcp_client.cpp index 2230fda6..49b5b69e 100644 --- a/sources/network/windows/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -1,24 +1,25 @@ #include -#pragma warning(disable : 4996) //Disable "The POSIX name for this item is deprecated" warnings for gethostbyname() +//! Disable "The POSIX name for this item is deprecated" warnings for gethostbyname() +#ifdef _WIN32 +#pragma warning(disable : 4996) +#endif /* _WIN32 */ #include +#ifndef _WIN32 +#include +#include +#endif /* _WIN32 */ + #include -#include +#include namespace cpp_redis { namespace network { -//! note that we call io_service::get_instance in the init list -//! -//! this will force force io_service instance creation -//! this is a workaround to handle static object destructions order -//! -//! that way, any object containing a tcp_client has an attribute (or through its attributes) -//! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(const std::shared_ptr& IO) -: m_io_service(IO ? IO : io_service::get_instance()) +tcp_client::tcp_client(const std::shared_ptr& io_service) +: m_io_service(io_service ? io_service : io_service::get_global_instance()) , m_sock(-1) , m_is_connected(false) , m_receive_handler(nullptr) @@ -31,6 +32,39 @@ tcp_client::~tcp_client(void) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client destroyed"); } +void +tcp_client::setup_socket(void) { +#ifdef _WIN32 + //! create the socket + //! Enable socket for overlapped i/o + m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); + if (m_sock < 0) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not create socket"); + throw redis_error("Can't open a socket"); + } + + //! Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. + //! The advantage is performance because we save a buffer copy between the TCP stack buffer + //! and our user buffer for each I/O call. + //! BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! + int nZero = 0; + if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*) &nZero, sizeof(nZero))) + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); + + //! Set socket to non blocking. + u_long ulValue = 1; + if (ioctlsocket(m_sock, FIONBIO, &ulValue)) + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); +#else + //! create the socket + m_sock = socket(AF_INET, SOCK_STREAM, 0); + if (m_sock < 0) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not create socket"); + throw redis_error("Can't open a socket"); + } +#endif /* _WIN32 */ +} + void tcp_client::connect(const std::string& host, unsigned int port, const disconnection_handler_t& disconnection_handler, @@ -42,20 +76,7 @@ tcp_client::connect(const std::string& host, unsigned int port, return throw cpp_redis::redis_error("Client already connected"); } - //! create the socket - int nZero = 0; - //Enable socket for overlapped i/o - m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); - if (m_sock < 0) - throw redis_error("Can't open a socket"); - - //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. - //The advantage is performance because we save a buffer copy between the TCP stack buffer - //and our user buffer for each I/O call. - //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! - if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*) &nZero, sizeof(nZero))) { - return throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to disable buffering"); - } + setup_socket(); //! get the server's DNS entry struct hostent* server = gethostbyname(host.c_str()); @@ -73,17 +94,13 @@ tcp_client::connect(const std::string& host, unsigned int port, //! create a connection with the server if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); } - //! add socket to the io_service and set the disconnection & recv handlers + //! add fd to the io_service and set the disconnection & recv handlers m_disconnection_handler = disconnection_handler; m_receive_handler = receive_handler; - - u_long ulValue = 1; - if (0 != ioctlsocket(m_sock, FIONBIO, &ulValue)) //Set socket to non blocking. - throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to set socket to non-blocking"); - m_io_service->track(m_sock, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); m_is_connected = true; @@ -95,18 +112,14 @@ tcp_client::connect(const std::string& host, unsigned int port, void tcp_client::disconnect(void) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attemps to disconnect"); + if (!m_is_connected) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client already disconnected"); return; } - m_is_connected = false; m_io_service->untrack(m_sock); - - closesocket(m_sock); - m_sock = INVALID_SOCKET; - - clear_buffer(); reset_state(); __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client disconnected"); @@ -119,6 +132,8 @@ tcp_client::send(const std::string& buffer) { void tcp_client::send(const std::vector& buffer) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attemps to send data"); + if (!m_is_connected) { __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client is not connected"); throw redis_error("Not connected"); @@ -148,22 +163,21 @@ void tcp_client::async_read(void) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_read"); - m_io_service->async_read(m_sock, m_read_buffer, READ_SIZE, + m_io_service->async_read(m_sock, m_read_buffer, __CPP_REDIS_READ_SIZE, [&](std::size_t length) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client received data"); if (m_receive_handler) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls receive_handler"); - if (!m_receive_handler(*this, - {m_read_buffer.begin(), m_read_buffer.begin() + length})) { + if (!m_receive_handler(*this, {m_read_buffer.begin(), m_read_buffer.begin() + length})) { __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has been asked for disconnection by receive_handler"); disconnect(); return; } } - //! clear read buffer and re-issue async read to receive more incoming bytes + //! clear read buffer keep waiting for incoming bytes m_read_buffer.clear(); if (m_is_connected) @@ -180,13 +194,13 @@ tcp_client::async_write(void) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client wrote data and cleans write_buffer"); std::lock_guard lock(m_write_buffer_mutex); - //Remove what has already been sent and see if we have any more to send + //! Remove what has already been sent and see if we have any more to send if (length >= m_write_buffer.front().size()) m_write_buffer.pop_front(); else m_write_buffer.front().erase(m_write_buffer.front().begin(), m_write_buffer.front().begin() + length); - //If we still have data to write the call ourselves recursivly until the buffer is completely sent + //! If we still have data to write the call ourselves recursivly until the buffer is completely sent if (m_is_connected && m_write_buffer.size()) async_write(); }); @@ -211,10 +225,16 @@ tcp_client::io_service_disconnection_handler(network::io_service&) { void tcp_client::reset_state(void) { - if (m_sock != INVALID_SOCKET) - closesocket(m_sock); m_is_connected = false; - m_sock = INVALID_SOCKET; + + if (m_sock != INVALID_SOCKET) { +#ifdef _WIN32 + closesocket(m_sock); +#else + close(m_sock); +#endif /* _WIN32 */ + m_sock = INVALID_SOCKET; + } clear_buffer(); } diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 484b8f3f..00889afa 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -8,14 +8,11 @@ namespace cpp_redis { namespace network { -const std::shared_ptr& -io_service::get_instance(void) { - static std::shared_ptr instance = std::shared_ptr{new io_service}; - return instance; -} +namespace unix { -io_service::io_service(void) -: m_should_stop(false) +io_service::io_service(size_t nb_io_service_workers) +: network::io_service(nb_io_service_workers) +, m_should_stop(false) , m_notif_pipe_fds{1, 1} { if (pipe(m_notif_pipe_fds) == -1) { __CPP_REDIS_LOG(error, "cpp_redis::network::io_service could not create pipe"); @@ -28,7 +25,7 @@ io_service::io_service(void) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, fcntl() failure"); } - m_worker = std::thread(&io_service::listen, this); + m_worker = std::thread(&io_service::process_io, this); __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service created"); } @@ -172,7 +169,7 @@ io_service::process_sets(struct pollfd* fds, unsigned int nfds) { } void -io_service::listen(void) { +io_service::process_io(void) { struct pollfd fds[1024]; __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service starts poll loop in worker thread"); @@ -289,6 +286,8 @@ io_service::notify_poll(void) { (void) write(m_notif_pipe_fds[1], "a", 1); } +} //! unix + } //! network } //! cpp_redis diff --git a/sources/network/unix/tcp_client.cpp b/sources/network/unix/tcp_client.cpp deleted file mode 100644 index b66791a6..00000000 --- a/sources/network/unix/tcp_client.cpp +++ /dev/null @@ -1,212 +0,0 @@ -#include -#include -#include - -#include -#include - -namespace cpp_redis { - -namespace network { - -//! note that we call io_service::get_instance in the init list -//! -//! this will force force io_service instance creation -//! this is a workaround to handle static object destructions order -//! -//! that way, any object containing a tcp_client has an attribute (or through its attributes) -//! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(const std::shared_ptr& IO) -: m_io_service(IO ? IO : io_service::get_instance()) -, m_fd(-1) -, m_is_connected(false) -, m_receive_handler(nullptr) -, m_disconnection_handler(nullptr) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client created"); -} - -tcp_client::~tcp_client(void) { - disconnect(); - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client destroyed"); -} - -void -tcp_client::connect(const std::string& host, unsigned int port, - const disconnection_handler_t& disconnection_handler, - const receive_handler_t& receive_handler) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attempts to connect"); - - if (m_is_connected) { - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client is already connected"); - return throw cpp_redis::redis_error("Client already connected"); - } - - //! create the socket - m_fd = socket(AF_INET, SOCK_STREAM, 0); - if (m_fd < 0) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not create socket"); - throw redis_error("Can't open a socket"); - } - - //! get the server's DNS entry - struct hostent* server = gethostbyname(host.c_str()); - if (!server) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not resolve DNS"); - throw redis_error("No such host: " + host); - } - - //! build the server's Internet address - struct sockaddr_in server_addr; - std::memset(&server_addr, 0, sizeof(server_addr)); - std::memcpy(&server_addr.sin_addr.s_addr, server->h_addr, server->h_length); - server_addr.sin_port = htons(port); - server_addr.sin_family = AF_INET; - - //! create a connection with the server - if (::connect(m_fd, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); - throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); - } - - //! add fd to the io_service and set the disconnection & recv handlers - m_disconnection_handler = disconnection_handler; - m_receive_handler = receive_handler; - m_io_service->track(m_fd, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); - m_is_connected = true; - - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client connected"); - - //! start async read - async_read(); -} - -void -tcp_client::disconnect(void) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attemps to disconnect"); - - if (!m_is_connected) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client already disconnected"); - return; - } - - m_io_service->untrack(m_fd); - reset_state(); - - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client disconnected"); -} - -void -tcp_client::send(const std::string& buffer) { - send(std::vector{buffer.begin(), buffer.end()}); -} - -void -tcp_client::send(const std::vector& buffer) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attemps to send data"); - - if (!m_is_connected) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client is not connected"); - throw redis_error("Not connected"); - } - - if (!buffer.size()) { - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has nothing to send"); - return; - } - - std::lock_guard lock(m_write_buffer_mutex); - - bool bytes_in_buffer = m_write_buffer.size() > 0; - - //! concat buffer - m_write_buffer.insert(m_write_buffer.end(), buffer.begin(), buffer.end()); - - //! if there were already bytes in buffer, simply return - //! async_write callback will process the new buffer - if (bytes_in_buffer) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client is already processing an async_write"); - return; - } - - async_write(); -} - -void -tcp_client::async_read(void) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_read"); - - m_io_service->async_read(m_fd, m_read_buffer, READ_SIZE, - [&](std::size_t length) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client received data"); - - if (m_receive_handler) - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls receive_handler"); - if (!m_receive_handler(*this, {m_read_buffer.begin(), m_read_buffer.begin() + length})) { - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has been asked for disconnection by receive_handler"); - disconnect(); - return; - } - - //! clear read buffer keep waiting for incoming bytes - m_read_buffer.clear(); - - if (m_is_connected) - async_read(); - }); -} - -void -tcp_client::async_write(void) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_write"); - - m_io_service->async_write(m_fd, m_write_buffer, m_write_buffer.size(), - [&](std::size_t length) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client wrote data and cleans write_buffer"); - std::lock_guard lock(m_write_buffer_mutex); - - m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); - - if (m_is_connected && m_write_buffer.size()) - async_write(); - }); -} - -bool -tcp_client::is_connected(void) { - return m_is_connected; -} - -void -tcp_client::io_service_disconnection_handler(network::io_service&) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client has been disconnected"); - - reset_state(); - - if (m_disconnection_handler) { - __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls disconnection handler"); - m_disconnection_handler(*this); - } -} - -void -tcp_client::reset_state(void) { - m_is_connected = false; - - if (m_fd != -1) { - close(m_fd); - m_fd = -1; - } - - clear_buffer(); -} - -void -tcp_client::clear_buffer(void) { - std::lock_guard lock(m_write_buffer_mutex); - m_write_buffer.clear(); - m_read_buffer.clear(); -} - -} //! network - -} //! cpp_redis diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index db14d4b9..eb8300ab 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -5,38 +5,26 @@ namespace cpp_redis { namespace network { -const std::shared_ptr& -io_service::get_instance(void) { - static std::shared_ptr instance = std::shared_ptr{new io_service}; - return instance; -} - -io_service::io_service(size_t max_worker_threads) -: m_should_stop(false) { - //Determine the size of the thread pool dynamically. - //2 * number of processors in the system is our rule here. - SYSTEM_INFO info; - ::GetSystemInfo(&info); - m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); - - if (m_worker_thread_pool_size > max_worker_threads) - m_worker_thread_pool_size = max_worker_threads; +namespace windows { +io_service::io_service(size_t nb_io_service_workers) +: network::io_service(nb_io_service_workers) +, m_should_stop(false) { + //! Start winsock before any other socket calls. WSADATA wsaData; - int nRet = 0; - if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + int nRet = WSAStartup(0x202, &wsaData); + if (nRet) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); //Create completion port. Pass 0 for parameter 4 to allow as many threads as there are processors in the system m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - ; - if (INVALID_HANDLE_VALUE == m_completion_port) + if (m_completion_port == INVALID_HANDLE_VALUE) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); - //Now startup worker thread pool which will service our async io requests - for (unsigned int i = 0; i < m_worker_thread_pool_size; i++) { - m_worker_threads.push_back(std::thread(&io_service::process_io, this)); - } + //! Now startup worker thread pool which will service our async io requests + auto& thread_pool = tools::thread_pool::get_instance(); + for (unsigned int i = 0; i < nb_io_service_workers; ++i) + thread_pool->add_task(std::bind(&io_service::process_io, this)); } io_service::~io_service(void) { @@ -47,28 +35,20 @@ void io_service::shutdown() { m_should_stop = true; - //Iterate all of our sockets and shutdown any IO worker threads by posting a issuing a special - //message to the thread to tell them to wake up and shut down. - io_context_info* pInfo = NULL; - - auto sock_it = m_sockets.begin(); - while (sock_it != m_sockets.end()) { - auto& info = sock_it->second; - //Post for each of our worker threads. - int workers = m_worker_threads.size(); - for (int i = 0; i < workers; i++) - PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); //Use NULL for the completion key to wake them up. - sock_it++; - } + //! Iterate all of our sockets and shutdown any IO worker threads by posting a issuing a special + //! message to the thread to tell them to wake up and shut down. + io_context_info* pInfo = nullptr; - // Wait for the threads to finish - for (auto& t : m_worker_threads) - t.join(); + //! Post for each of our worker threads. + for (int i = 0; i < workers; i++) { + //! Use nullptr for the completion key to wake them up. + PostQueuedCompletionStatus(m_completion_port, 0, nullptr, nullptr); + } - //close the completion port otherwise the worker threads will all be waiting on GetQueuedCompletionStatus() + //! close the completion port otherwise the worker threads will all be waiting on GetQueuedCompletionStatus() if (m_completion_port) { CloseHandle(m_completion_port); - m_completion_port = NULL; + m_completion_port = nullptr; } } @@ -263,5 +243,8 @@ io_service::process_io(void) { return 0; } +} //! windows + } //! network + } //! cpp_redis diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index 96f888be..ad5d7146 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -3,8 +3,9 @@ namespace cpp_redis { -redis_client::redis_client(const std::shared_ptr& IO) -: m_client(IO) { +redis_client::redis_client(const std::shared_ptr& io_service) +: m_client(io_service) +, m_callbacks_running(0) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_client created"); } @@ -65,7 +66,7 @@ redis_client::sync_commit(void) { std::unique_lock lock_callback(m_callbacks_mutex); __CPP_REDIS_LOG(debug, "cpp_redis::redis_client waits for callbacks to complete"); - m_sync_condvar.wait(lock_callback, [=] { return m_callbacks.empty(); }); + m_sync_condvar.wait(lock_callback, [=] { return m_callbacks_running == 0 && m_callbacks.empty(); }); __CPP_REDIS_LOG(debug, "cpp_redis::redis_client finished to wait for callbacks completion"); return *this; @@ -95,6 +96,7 @@ redis_client::connection_receive_handler(network::redis_connection&, reply& repl if (m_callbacks.size()) { callback = m_callbacks.front(); + m_callbacks_running += 1; m_callbacks.pop(); } } @@ -104,6 +106,7 @@ redis_client::connection_receive_handler(network::redis_connection&, reply& repl callback(reply); } + m_callbacks_running -= 1; m_sync_condvar.notify_all(); } diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index eefc954f..ababc985 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -4,8 +4,8 @@ namespace cpp_redis { -redis_subscriber::redis_subscriber(const std::shared_ptr& IO) -: m_client(IO) { +redis_subscriber::redis_subscriber(const std::shared_ptr& io_service) +: m_client(io_service) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber created"); } diff --git a/sources/tools/thread_pool.cpp b/sources/tools/thread_pool.cpp new file mode 100644 index 00000000..7558b887 --- /dev/null +++ b/sources/tools/thread_pool.cpp @@ -0,0 +1,78 @@ +#include + +namespace cpp_redis { + +namespace tools { + +const std::shared_ptr& +thread_pool::get_instance(size_t nb_threads) { + static std::shared_ptr instance = std::shared_ptr{ new thread_pool(nb_threads) }; + + return instance; +} + +thread_pool::thread_pool(size_t nb_thread) +: m_is_running(true) +, m_workers(nb_thread) { + for (auto& worker : m_workers) + worker = std::thread(std::bind(&thread_pool::run, this)); +} + +thread_pool::~thread_pool(void) { + m_is_running = false; + + for (auto& worker : m_workers) + worker.join(); +} + +bool +thread_pool::is_running(void) const { + return m_is_running; +} + +size_t +thread_pool::get_nb_threads(void) const { + return m_workers.size(); +} + +void +thread_pool::add_task(const task_t& task) { + std::lock_guard lock(m_tasks_mutex); + + m_tasks.push(task); +} + +void +thread_pool::operator<<(const task_t& task) { + add_task(task); +} + +thread_pool::task_t +thread_pool::fetch_task(void) { + std::unique_lock lock(m_tasks_mutex); + + while (m_is_running && m_tasks.empty()) + m_tasks_condvar.wait(lock); + + if (m_tasks.size()) { + task_t task = m_tasks.front(); + m_tasks.pop(); + return task; + } + + return nullptr; +} + +void +thread_pool::run(void) { + while (m_is_running) { + auto task = fetch_task(); + + if (task) + task(); + } +} + +} //! tools + +} //! cpp_redis diff --git a/tests/sources/spec/redis_subscriber_spec.cpp b/tests/sources/spec/redis_subscriber_spec.cpp index a0804a45..d66c689a 100644 --- a/tests/sources/spec/redis_subscriber_spec.cpp +++ b/tests/sources/spec/redis_subscriber_spec.cpp @@ -298,6 +298,14 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { sub.connect(); client.connect(); + auto ack_callback = [&](int nb_chans) { + if (nb_chans == 2) { + client.publish("/chan_1", "hello"); + client.publish("/chan_2", "world"); + client.commit(); + } + }; + std::atomic_bool callback_1_run(false); std::atomic_bool callback_2_run(false); sub.subscribe("/chan_1", @@ -308,14 +316,7 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { if (callback_2_run) cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1", "hello"); - client.publish("/chan_2", "world"); - client.commit(); - } - }); + }, ack_callback); sub.subscribe("/chan_2", [&](const std::string& channel, const std::string& message) { EXPECT_TRUE(channel == "/chan_2"); @@ -324,14 +325,7 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { if (callback_1_run) cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1", "hello"); - client.publish("/chan_2", "world"); - client.commit(); - } - }); + }, ack_callback); sub.commit(); @@ -446,6 +440,14 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { sub.connect(); client.connect(); + auto ack_callback = [&](int nb_chans) { + if (nb_chans == 2) { + client.publish("/chan/1", "hello"); + client.publish("/other_chan/2", "world"); + client.commit(); + } + }; + std::atomic_bool callback_1_run(false); std::atomic_bool callback_2_run(false); sub.psubscribe("/chan/*", @@ -456,14 +458,7 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { if (callback_2_run) cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan/1", "hello"); - client.publish("/other_chan/2", "world"); - client.commit(); - } - }); + }, ack_callback); sub.psubscribe("/other_chan/*", [&](const std::string& channel, const std::string& message) { EXPECT_TRUE(channel == "/other_chan/2"); @@ -472,14 +467,7 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { if (callback_1_run) cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan/1", "hello"); - client.publish("/other_chan/2", "world"); - client.commit(); - } - }); + }, ack_callback); sub.commit(); @@ -499,31 +487,25 @@ TEST(RedisSubscriber, Unsubscribe) { sub.connect(); client.connect(); + auto ack_callback = [&](int nb_chans) { + if (nb_chans == 2) { + client.publish("/chan_1", "hello"); + client.publish("/chan_2", "hello"); + client.commit(); + } + }; + std::atomic_bool callback_1_run(false); std::atomic_bool callback_2_run(false); sub.subscribe("/chan_1", [&](const std::string&, const std::string&) { callback_1_run = true; - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1", "hello"); - client.publish("/chan_2", "hello"); - client.commit(); - } - }); + }, ack_callback); sub.subscribe("/chan_2", [&](const std::string&, const std::string&) { callback_2_run = true; cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1", "hello"); - client.publish("/chan_2", "hello"); - client.commit(); - } - }); + }, ack_callback); sub.unsubscribe("/chan_1"); sub.commit(); @@ -544,31 +526,25 @@ TEST(RedisSubscriber, PUnsubscribe) { sub.connect(); client.connect(); + auto ack_callback = [&](int nb_chans) { + if (nb_chans == 2) { + client.publish("/chan_1/hello", "hello"); + client.publish("/chan_2/hello", "hello"); + client.commit(); + } + }; + std::atomic_bool callback_1_run(false); std::atomic_bool callback_2_run(false); sub.psubscribe("/chan_1/*", [&](const std::string&, const std::string&) { callback_1_run = true; - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1/hello", "hello"); - client.publish("/chan_2/hello", "hello"); - client.commit(); - } - }); + }, ack_callback); sub.psubscribe("/chan_2/*", [&](const std::string&, const std::string&) { callback_2_run = true; cv.notify_all(); - }, - [&](int nb_chans) { - if (nb_chans == 2) { - client.publish("/chan_1/hello", "hello"); - client.publish("/chan_2/hello", "hello"); - client.commit(); - } - }); + }, ack_callback); sub.punsubscribe("/chan_1/*"); sub.commit(); From d555ea7b808242438f199822c7e64c39e2416f07 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 21:49:12 +0200 Subject: [PATCH 02/13] remove thread pool, this is adding way to overhead for the benefits it brings. --- includes/cpp_redis/network/io_service.hpp | 9 +-- .../cpp_redis/network/unix/io_service.hpp | 2 +- .../cpp_redis/network/windows/io_service.hpp | 9 ++- includes/cpp_redis/tools/thread_pool.hpp | 71 ----------------- sources/network/io_service.cpp | 15 +--- sources/network/unix/io_service.cpp | 5 +- sources/network/windows/io_service.cpp | 14 ++-- sources/tools/thread_pool.cpp | 78 ------------------- 8 files changed, 24 insertions(+), 179 deletions(-) delete mode 100644 includes/cpp_redis/tools/thread_pool.hpp delete mode 100644 sources/tools/thread_pool.cpp diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 2d99e3d6..701a9b87 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -19,7 +19,7 @@ class io_service { public: //! ctor & dtor - io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); + io_service(void) = default; virtual ~io_service(void) = default; //! copy ctor & assignment operator @@ -47,18 +47,13 @@ class io_service { typedef std::function write_callback_t; virtual bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) = 0; - size_t get_nb_workers(void) const; - private: //! listen for incoming events and notify virtual void process_io(void) = 0; - -private: - size_t m_nb_workers; }; //! multi-platform instance builder -std::shared_ptr create_io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); +std::shared_ptr create_io_service(void); } //! network diff --git a/includes/cpp_redis/network/unix/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp index a11c0935..2de47a65 100644 --- a/includes/cpp_redis/network/unix/io_service.hpp +++ b/includes/cpp_redis/network/unix/io_service.hpp @@ -22,7 +22,7 @@ namespace unix { class io_service : public network::io_service { public: //! ctor & dtor - io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); + io_service(void); ~io_service(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index a45f2919..be7153bb 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -10,6 +10,10 @@ #include +#ifndef __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS +#define __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS 16 +#endif /* __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS */ + namespace cpp_redis { namespace network { @@ -25,7 +29,7 @@ typedef enum _enIoOperation { class io_service : public network::io_service { public: //! ctor & dtor - io_service(size_t nb_io_service_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); + io_service(void); ~io_service(void); private: @@ -112,6 +116,9 @@ class io_service : public network::io_service { //! completion port HANDLE m_completion_port; + //! vector containing all the threads we start to service our i/o requests + std::vector m_worker_threads; + private: //! whether the worker should terminate or not std::atomic_bool m_should_stop; diff --git a/includes/cpp_redis/tools/thread_pool.hpp b/includes/cpp_redis/tools/thread_pool.hpp deleted file mode 100644 index 4b48239a..00000000 --- a/includes/cpp_redis/tools/thread_pool.hpp +++ /dev/null @@ -1,71 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#define __CPP_REDIS_DEFAULT_NB_THREADS 16 - -namespace cpp_redis { - -namespace tools { - -class thread_pool { -public: - //! singleton pattern: instance getter - //! first parameter defines the number of threads to use - //! the parameter is only processed for the first call, and is ignored on future calls - static const std::shared_ptr& get_instance(size_t nb_threads = __CPP_REDIS_DEFAULT_NB_THREADS); - - //! dtor - ~thread_pool(void); - -private: - //! ctor - thread_pool(size_t nb_threads); - - //! copy ctor & assignment operator - thread_pool(const thread_pool&) = delete; - thread_pool& operator=(const thread_pool&) = delete; - -public: - //! whether the thread_pool is running or not - bool is_running(void) const; - - //! return number of threads - size_t get_nb_threads(void) const; - - //! add tasks - typedef std::function task_t; - void add_task(const task_t& task); - void operator<<(const task_t& task); - -private: - //! workers processing loop - void run(void); - - //! retrieve available task - task_t fetch_task(void); - -private: - //! whether the thread_pool is running or not - std::atomic_bool m_is_running; - - //! workers - std::vector m_workers; - - //! taks - std::queue m_tasks; - - //! thread safety - std::mutex m_tasks_mutex; - std::condition_variable m_tasks_condvar; -}; - -} //! tools - -} //! cpp_redis diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp index 2817f41a..7a50703d 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/io_service.cpp @@ -25,21 +25,12 @@ io_service::set_global_instance(const std::shared_ptr& io_s global_instance = io_service; } -io_service::io_service(size_t nb_io_service_workers) -: m_nb_workers(nb_io_service_workers) -{} - -size_t -io_service::get_nb_workers(void) const { - return m_nb_workers; -} - std::shared_ptr -create_io_service(size_t nb_io_service_workers) { +create_io_service(void) { #ifdef _WIN32 - return std::make_shared(nb_io_service_workers); + return std::make_shared(); #else - return std::make_shared(nb_io_service_workers); + return std::make_shared(); #endif /* _WIN32 */ } diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 00889afa..3c34fbe2 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -10,9 +10,8 @@ namespace network { namespace unix { -io_service::io_service(size_t nb_io_service_workers) -: network::io_service(nb_io_service_workers) -, m_should_stop(false) +io_service::io_service(void) +: m_should_stop(false) , m_notif_pipe_fds{1, 1} { if (pipe(m_notif_pipe_fds) == -1) { __CPP_REDIS_LOG(error, "cpp_redis::network::io_service could not create pipe"); diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index eb8300ab..df867df3 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -7,9 +7,8 @@ namespace network { namespace windows { -io_service::io_service(size_t nb_io_service_workers) -: network::io_service(nb_io_service_workers) -, m_should_stop(false) { +io_service::io_service(void) +: m_should_stop(false) { //! Start winsock before any other socket calls. WSADATA wsaData; int nRet = WSAStartup(0x202, &wsaData); @@ -22,9 +21,8 @@ io_service::io_service(size_t nb_io_service_workers) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); //! Now startup worker thread pool which will service our async io requests - auto& thread_pool = tools::thread_pool::get_instance(); - for (unsigned int i = 0; i < nb_io_service_workers; ++i) - thread_pool->add_task(std::bind(&io_service::process_io, this)); + for (unsigned int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; ++i) + m_worker_threads.push_back(std::thread(&io_service::process_io, this)); } io_service::~io_service(void) { @@ -50,6 +48,10 @@ io_service::shutdown() { CloseHandle(m_completion_port); m_completion_port = nullptr; } + + //! Wait for the threads to finish + for (auto& worker : m_worker_threads) + worker.join(); } //! add or remove a given socket from the io service diff --git a/sources/tools/thread_pool.cpp b/sources/tools/thread_pool.cpp deleted file mode 100644 index 7558b887..00000000 --- a/sources/tools/thread_pool.cpp +++ /dev/null @@ -1,78 +0,0 @@ -#include - -namespace cpp_redis { - -namespace tools { - -const std::shared_ptr& -thread_pool::get_instance(size_t nb_threads) { - static std::shared_ptr instance = std::shared_ptr{ new thread_pool(nb_threads) }; - - return instance; -} - -thread_pool::thread_pool(size_t nb_thread) -: m_is_running(true) -, m_workers(nb_thread) { - for (auto& worker : m_workers) - worker = std::thread(std::bind(&thread_pool::run, this)); -} - -thread_pool::~thread_pool(void) { - m_is_running = false; - - for (auto& worker : m_workers) - worker.join(); -} - -bool -thread_pool::is_running(void) const { - return m_is_running; -} - -size_t -thread_pool::get_nb_threads(void) const { - return m_workers.size(); -} - -void -thread_pool::add_task(const task_t& task) { - std::lock_guard lock(m_tasks_mutex); - - m_tasks.push(task); -} - -void -thread_pool::operator<<(const task_t& task) { - add_task(task); -} - -thread_pool::task_t -thread_pool::fetch_task(void) { - std::unique_lock lock(m_tasks_mutex); - - while (m_is_running && m_tasks.empty()) - m_tasks_condvar.wait(lock); - - if (m_tasks.size()) { - task_t task = m_tasks.front(); - m_tasks.pop(); - return task; - } - - return nullptr; -} - -void -thread_pool::run(void) { - while (m_is_running) { - auto task = fetch_task(); - - if (task) - task(); - } -} - -} //! tools - -} //! cpp_redis From a22d9cb9a9b27d2875974c6161df5e9bf6269bab Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 21:49:25 +0200 Subject: [PATCH 03/13] clang-format --- includes/cpp_redis/network/io_service.hpp | 2 +- .../cpp_redis/network/redis_connection.hpp | 2 +- includes/cpp_redis/network/tcp_client.hpp | 4 ++-- sources/network/tcp_client.cpp | 2 +- tests/sources/spec/redis_subscriber_spec.cpp | 24 ++++++++++++------- 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 701a9b87..09603d7c 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -19,7 +19,7 @@ class io_service { public: //! ctor & dtor - io_service(void) = default; + io_service(void) = default; virtual ~io_service(void) = default; //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 4b7e6743..fa357165 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -5,8 +5,8 @@ #include #include -#include #include +#include namespace cpp_redis { diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp index a10f37ed..2a53b826 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -16,9 +16,9 @@ #endif /* __CPP_REDIS_READ_SIZE */ #ifdef _WIN32 - typedef SOCKET _sock_t; +typedef SOCKET _sock_t; #else - typedef int _sock_t; +typedef int _sock_t; #endif /* _WIN32 */ #ifndef INVALID_SOCKET diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index 49b5b69e..7689bf8e 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -231,7 +231,7 @@ tcp_client::reset_state(void) { #ifdef _WIN32 closesocket(m_sock); #else - close(m_sock); + close(m_sock); #endif /* _WIN32 */ m_sock = INVALID_SOCKET; } diff --git a/tests/sources/spec/redis_subscriber_spec.cpp b/tests/sources/spec/redis_subscriber_spec.cpp index d66c689a..0cf6ac9f 100644 --- a/tests/sources/spec/redis_subscriber_spec.cpp +++ b/tests/sources/spec/redis_subscriber_spec.cpp @@ -316,7 +316,8 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { if (callback_2_run) cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.subscribe("/chan_2", [&](const std::string& channel, const std::string& message) { EXPECT_TRUE(channel == "/chan_2"); @@ -325,7 +326,8 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { if (callback_1_run) cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.commit(); @@ -458,7 +460,8 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { if (callback_2_run) cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.psubscribe("/other_chan/*", [&](const std::string& channel, const std::string& message) { EXPECT_TRUE(channel == "/other_chan/2"); @@ -467,7 +470,8 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { if (callback_1_run) cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.commit(); @@ -500,12 +504,14 @@ TEST(RedisSubscriber, Unsubscribe) { sub.subscribe("/chan_1", [&](const std::string&, const std::string&) { callback_1_run = true; - }, ack_callback); + }, + ack_callback); sub.subscribe("/chan_2", [&](const std::string&, const std::string&) { callback_2_run = true; cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.unsubscribe("/chan_1"); sub.commit(); @@ -539,12 +545,14 @@ TEST(RedisSubscriber, PUnsubscribe) { sub.psubscribe("/chan_1/*", [&](const std::string&, const std::string&) { callback_1_run = true; - }, ack_callback); + }, + ack_callback); sub.psubscribe("/chan_2/*", [&](const std::string&, const std::string&) { callback_2_run = true; cv.notify_all(); - }, ack_callback); + }, + ack_callback); sub.punsubscribe("/chan_1/*"); sub.commit(); From 34b5c194b54299579dc63bf3692575b28f9e736d Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:11:40 +0200 Subject: [PATCH 04/13] fix windows compilation --- includes/cpp_redis/network/io_service.hpp | 10 +++++----- includes/cpp_redis/network/socket.hpp | 19 +++++++++++++++++++ includes/cpp_redis/network/tcp_client.hpp | 11 +---------- .../cpp_redis/network/unix/io_service.hpp | 8 ++++---- .../cpp_redis/network/windows/io_service.hpp | 10 +++++----- sources/network/unix/io_service.cpp | 8 ++++---- sources/network/windows/io_service.cpp | 16 ++++++++-------- 7 files changed, 46 insertions(+), 36 deletions(-) create mode 100644 includes/cpp_redis/network/socket.hpp diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 09603d7c..c94e7047 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -4,7 +4,7 @@ #include #include -#define __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS 1 +#include namespace cpp_redis { @@ -32,20 +32,20 @@ class io_service { //! add or remove a given fd from the io service //! untrack should never be called from inside a callback - virtual void track(int fd, const disconnection_handler_t& handler) = 0; - virtual void untrack(int fd) = 0; + virtual void track(_sock_t sock, const disconnection_handler_t& handler) = 0; + virtual void untrack(_sock_t sock) = 0; //! asynchronously read read_size bytes and append them to the given buffer //! on completion, call the read_callback to notify of the success or failure of the operation //! return false if another async_read operation is in progress or fd is not registered typedef std::function read_callback_t; - virtual bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) = 0; + virtual bool async_read(_sock_t sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) = 0; //! asynchronously write write_size bytes from buffer to the specified fd //! on completion, call the write_callback to notify of the success or failure of the operation //! return false if another async_write operation is in progress or fd is not registered typedef std::function write_callback_t; - virtual bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) = 0; + virtual bool async_write(_sock_t sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) = 0; private: //! listen for incoming events and notify diff --git a/includes/cpp_redis/network/socket.hpp b/includes/cpp_redis/network/socket.hpp new file mode 100644 index 00000000..e63bcede --- /dev/null +++ b/includes/cpp_redis/network/socket.hpp @@ -0,0 +1,19 @@ +#pragma once + +namespace cpp_redis { + +namespace network { + +#ifdef _WIN32 +typedef SOCKET _sock_t; +#else +typedef int _sock_t; +#endif /* _WIN32 */ + +#ifndef INVALID_SOCKET +#define INVALID_SOCKET -1 +#endif /* INVALID_SOCKET */ + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp index 2a53b826..631b7b63 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -9,22 +9,13 @@ #include #include +#include #include #ifndef __CPP_REDIS_READ_SIZE #define __CPP_REDIS_READ_SIZE 4096 #endif /* __CPP_REDIS_READ_SIZE */ -#ifdef _WIN32 -typedef SOCKET _sock_t; -#else -typedef int _sock_t; -#endif /* _WIN32 */ - -#ifndef INVALID_SOCKET -#define INVALID_SOCKET -1 -#endif /* INVALID_SOCKET */ - namespace cpp_redis { namespace network { diff --git a/includes/cpp_redis/network/unix/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp index 2de47a65..278b126a 100644 --- a/includes/cpp_redis/network/unix/io_service.hpp +++ b/includes/cpp_redis/network/unix/io_service.hpp @@ -30,11 +30,11 @@ class io_service : public network::io_service { io_service& operator=(const io_service&) = delete; public: - void track(int fd, const disconnection_handler_t& handler) override; - void untrack(int fd) override; + void track(_sock_t fd, const disconnection_handler_t& handler) override; + void untrack(_sock_t fd) override; - bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; - bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; + bool async_read(_sock_t fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; + bool async_write(_sock_t fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; private: //! simple struct to keep track of ongoing operations on a given fd diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index be7153bb..e790970d 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -38,15 +38,15 @@ class io_service : public network::io_service { io_service& operator=(const io_service&) = delete; public: - void track(SOCKET sock, const disconnection_handler_t& handler) override; - void untrack(SOCKET sock) override; + void track(_sock_t sock, const disconnection_handler_t& handler) override; + void untrack(_sock_t sock) override; - bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; - bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; + bool async_read(_sock_t socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) override; + bool async_write(_sock_t socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) override; private: //! wait for incoming events and notify - int process_io(void) override; + void process_io(void) override; //! shutdown void shutdown(void); diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 3c34fbe2..7d32dc5f 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -188,7 +188,7 @@ io_service::process_io(void) { } void -io_service::track(int fd, const disconnection_handler_t& handler) { +io_service::track(_sock_t fd, const disconnection_handler_t& handler) { std::lock_guard lock(m_fds_mutex); auto& info = m_fds[fd]; @@ -202,7 +202,7 @@ io_service::track(int fd, const disconnection_handler_t& handler) { } void -io_service::untrack(int fd) { +io_service::untrack(_sock_t fd) { std::unique_lock lock(m_fds_mutex); __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service requests to untrack fd #" + std::to_string(fd)); @@ -224,7 +224,7 @@ io_service::untrack(int fd) { } bool -io_service::async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { +io_service::async_read(_sock_t fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { std::lock_guard lock(m_fds_mutex); __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service is requested async_read for fd #" + std::to_string(fd)); @@ -252,7 +252,7 @@ io_service::async_read(int fd, std::vector& buffer, std::size_t read_size, } bool -io_service::async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { +io_service::async_write(_sock_t fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { std::lock_guard lock(m_fds_mutex); __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service is requested async_write for fd #" + std::to_string(fd)); diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index df867df3..51208851 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -57,7 +57,7 @@ io_service::shutdown() { //! add or remove a given socket from the io service //! untrack should never be called from inside a callback void -io_service::track(SOCKET sock, const disconnection_handler_t& handler) { +io_service::track(_sock_t sock, const disconnection_handler_t& handler) { std::lock_guard lock(m_socket_mutex); //Add the socket to our map and return the allocated struct @@ -73,13 +73,13 @@ io_service::track(SOCKET sock, const disconnection_handler_t& handler) { } void -io_service::untrack(SOCKET sock) { +io_service::untrack(_sock_t sock) { std::lock_guard lock(m_socket_mutex); m_sockets.erase(sock); } bool -io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { +io_service::async_read(_sock_t sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { std::lock_guard lock(m_socket_mutex); auto sock_it = m_sockets.find(sock); @@ -117,7 +117,7 @@ io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_ } bool -io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { +io_service::async_write(_sock_t sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { std::lock_guard lock(m_socket_mutex); auto sock_it = m_sockets.find(sock); @@ -154,7 +154,7 @@ io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_ } //function used by worker thread(s) used to process io requests -int +void io_service::process_io(void) { BOOL bSuccess = FALSE; int nRet = 0; @@ -185,7 +185,7 @@ io_service::process_io(void) { continue; } if (m_should_stop) - return 0; + return; } //get the base address of the struct holding lpOverlapped (the io_context_info) pointer. @@ -197,7 +197,7 @@ io_service::process_io(void) { // Somebody used PostQueuedCompletionStatus to post an I/O packet with // a NULL CompletionKey (or if we get one for any reason). It is time to exit. if (!psock_info || !pOverlapped) - return 0; + return; e_op = pio_info->eOperation; @@ -242,7 +242,7 @@ io_service::process_io(void) { } //switch } //while - return 0; + return; } } //! windows From 1206dbd29d53efbbb71864441d693406ae77f33e Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:35:08 +0200 Subject: [PATCH 05/13] patch windows compilation and behavior --- CMakeLists.txt | 2 +- includes/cpp_redis/network/socket.hpp | 1 + .../cpp_redis/network/windows/io_service.hpp | 2 +- sources/network/tcp_client.cpp | 19 +++++++---- sources/network/windows/io_service.cpp | 34 +++++++++---------- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 09484e93..db8f6bc2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ link_directories(${GTEST_LIBS}) ### # sources ### -set(SRC_DIRS "sources" "sources/network" "sources/builders" "sources/tools") +set(SRC_DIRS "sources" "sources/network" "sources/builders") IF (WIN32) set(SRC_DIRS ${SRC_DIRS} "sources/network/windows") diff --git a/includes/cpp_redis/network/socket.hpp b/includes/cpp_redis/network/socket.hpp index e63bcede..91809b57 100644 --- a/includes/cpp_redis/network/socket.hpp +++ b/includes/cpp_redis/network/socket.hpp @@ -5,6 +5,7 @@ namespace cpp_redis { namespace network { #ifdef _WIN32 +#include typedef SOCKET _sock_t; #else typedef int _sock_t; diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index e790970d..92513051 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -21,7 +21,7 @@ namespace network { namespace windows { typedef enum _enIoOperation { - IO_OP_ACCEPT, + //IO_OP_ACCEPT, IO_OP_READ, IO_OP_WRITE } enIoOperation; diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index 7689bf8e..53b5cd84 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -6,7 +6,9 @@ #endif /* _WIN32 */ #include -#ifndef _WIN32 +#ifdef _WIN32 +#include +#else #include #include #endif /* _WIN32 */ @@ -50,11 +52,6 @@ tcp_client::setup_socket(void) { int nZero = 0; if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*) &nZero, sizeof(nZero))) __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); - - //! Set socket to non blocking. - u_long ulValue = 1; - if (ioctlsocket(m_sock, FIONBIO, &ulValue)) - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); #else //! create the socket m_sock = socket(AF_INET, SOCK_STREAM, 0); @@ -94,10 +91,18 @@ tcp_client::connect(const std::string& host, unsigned int port, //! create a connection with the server if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); } +#ifdef _WIN32 + //! Set socket to non blocking. + //! Must only be done once connected + u_long ulValue = 1; + if (ioctlsocket(m_sock, FIONBIO, &ulValue)) + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); +#endif /* _WIN32 */ + //! add fd to the io_service and set the disconnection & recv handlers m_disconnection_handler = disconnection_handler; m_receive_handler = receive_handler; diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index 51208851..b9686f07 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -35,29 +35,29 @@ io_service::shutdown() { //! Iterate all of our sockets and shutdown any IO worker threads by posting a issuing a special //! message to the thread to tell them to wake up and shut down. - io_context_info* pInfo = nullptr; - - //! Post for each of our worker threads. - for (int i = 0; i < workers; i++) { - //! Use nullptr for the completion key to wake them up. - PostQueuedCompletionStatus(m_completion_port, 0, nullptr, nullptr); + for (const auto& sock : m_sockets) { + //! Post for each of our worker threads. + for (int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; i++) { + //! Use nullptr for the completion key to wake them up. + PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); + } } + //! Wait for the threads to finish + for (auto& worker : m_worker_threads) + worker.join(); + //! close the completion port otherwise the worker threads will all be waiting on GetQueuedCompletionStatus() if (m_completion_port) { CloseHandle(m_completion_port); m_completion_port = nullptr; } - - //! Wait for the threads to finish - for (auto& worker : m_worker_threads) - worker.join(); } //! add or remove a given socket from the io service //! untrack should never be called from inside a callback void -io_service::track(_sock_t sock, const disconnection_handler_t& handler) { +io_service::track(SOCKET sock, const disconnection_handler_t& handler) { std::lock_guard lock(m_socket_mutex); //Add the socket to our map and return the allocated struct @@ -73,13 +73,13 @@ io_service::track(_sock_t sock, const disconnection_handler_t& handler) { } void -io_service::untrack(_sock_t sock) { +io_service::untrack(SOCKET sock) { std::lock_guard lock(m_socket_mutex); m_sockets.erase(sock); } bool -io_service::async_read(_sock_t sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { std::lock_guard lock(m_socket_mutex); auto sock_it = m_sockets.find(sock); @@ -117,7 +117,7 @@ io_service::async_read(_sock_t sock, std::vector& buffer, std::size_t read } bool -io_service::async_write(_sock_t sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { std::lock_guard lock(m_socket_mutex); auto sock_it = m_sockets.find(sock); @@ -185,7 +185,7 @@ io_service::process_io(void) { continue; } if (m_should_stop) - return; + return ; } //get the base address of the struct holding lpOverlapped (the io_context_info) pointer. @@ -197,7 +197,7 @@ io_service::process_io(void) { // Somebody used PostQueuedCompletionStatus to post an I/O packet with // a NULL CompletionKey (or if we get one for any reason). It is time to exit. if (!psock_info || !pOverlapped) - return; + return ; e_op = pio_info->eOperation; @@ -241,8 +241,6 @@ io_service::process_io(void) { break; } //switch } //while - - return; } } //! windows From 369871aebf633c27bff2c911a1e97f6dfa581a1d Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:35:48 +0200 Subject: [PATCH 06/13] clang-format --- sources/network/tcp_client.cpp | 4 ++-- sources/network/windows/io_service.cpp | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index 53b5cd84..b5480fa9 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -91,7 +91,7 @@ tcp_client::connect(const std::string& host, unsigned int port, //! create a connection with the server if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { - __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not connect"); throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); } @@ -100,7 +100,7 @@ tcp_client::connect(const std::string& host, unsigned int port, //! Must only be done once connected u_long ulValue = 1; if (ioctlsocket(m_sock, FIONBIO, &ulValue)) - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); #endif /* _WIN32 */ //! add fd to the io_service and set the disconnection & recv handlers diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index b9686f07..b04cdfbf 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -36,16 +36,16 @@ io_service::shutdown() { //! Iterate all of our sockets and shutdown any IO worker threads by posting a issuing a special //! message to the thread to tell them to wake up and shut down. for (const auto& sock : m_sockets) { - //! Post for each of our worker threads. - for (int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; i++) { - //! Use nullptr for the completion key to wake them up. - PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); - } + //! Post for each of our worker threads. + for (int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; i++) { + //! Use nullptr for the completion key to wake them up. + PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); + } } //! Wait for the threads to finish for (auto& worker : m_worker_threads) - worker.join(); + worker.join(); //! close the completion port otherwise the worker threads will all be waiting on GetQueuedCompletionStatus() if (m_completion_port) { @@ -185,7 +185,7 @@ io_service::process_io(void) { continue; } if (m_should_stop) - return ; + return; } //get the base address of the struct holding lpOverlapped (the io_context_info) pointer. @@ -197,7 +197,7 @@ io_service::process_io(void) { // Somebody used PostQueuedCompletionStatus to post an I/O packet with // a NULL CompletionKey (or if we get one for any reason). It is time to exit. if (!psock_info || !pOverlapped) - return ; + return; e_op = pio_info->eOperation; From 7777063a593d3a21fcf215a9e3402d27b31d80a0 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:39:50 +0200 Subject: [PATCH 07/13] reverse default logging behavior: NO_LOGGING is removed and replaced by LOGGING_ENABLED. by default, no logging in source code --- CMakeLists.txt | 6 +++--- includes/cpp_redis/logger.hpp | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index db8f6bc2..1600ae79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,11 +96,11 @@ set_target_properties(${PROJECT} COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}") ENDIF (READ_SIZE) -IF (NO_LOGGING) +IF (LOGGING_ENABLED) set_target_properties(${PROJECT} PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") -ENDIF (NO_LOGGING) + COMPILE_DEFINITIONS "__CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}") +ENDIF (LOGGING_ENABLED) ### diff --git a/includes/cpp_redis/logger.hpp b/includes/cpp_redis/logger.hpp index 281a87d2..22bbb285 100644 --- a/includes/cpp_redis/logger.hpp +++ b/includes/cpp_redis/logger.hpp @@ -67,11 +67,10 @@ void warn(const std::string& msg, const std::string& file, unsigned int line); void error(const std::string& msg, const std::string& file, unsigned int line); //! convenience macro to log with file and line information -//! if __CPP_REDIS_NO_LOGGING, all logging related lines are removed from source code -#ifdef __CPP_REDIS_NO_LOGGING -#define __CPP_REDIS_LOG(level, msg) -#else +#ifdef __CPP_REDIS_LOGGING_ENABLED #define __CPP_REDIS_LOG(level, msg) cpp_redis::level(msg, __FILE__, __LINE__); -#endif /* __CPP_REDIS_NO_LOGGING */ +#else +#define __CPP_REDIS_LOG(level, msg) +#endif /* __CPP_REDIS_LOGGING_ENABLED */ } //! cpp_redis From ab2a1fcd3d399b7aa43e65586a599a49dd2de2e1 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:51:18 +0200 Subject: [PATCH 08/13] provide define for defining maximum number of monitored fds under unix --- CMakeLists.txt | 20 +++++++++++++------ .../cpp_redis/network/unix/io_service.hpp | 4 ++++ sources/network/unix/io_service.cpp | 2 +- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1600ae79..92b5f9bc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,18 +90,26 @@ ELSE () target_link_libraries(${PROJECT} pthread) ENDIF (WIN32) +# __CPP_REDIS_READ_SIZE IF (READ_SIZE) -set_target_properties(${PROJECT} - PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}") +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}") ENDIF (READ_SIZE) +# __CPP_REDIS_LOGGING_ENABLED IF (LOGGING_ENABLED) -set_target_properties(${PROJECT} - PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}") +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}") ENDIF (LOGGING_ENABLED) +# _CPP_REDIS_UNIX_MAX_NB_FDS +IF (UNIX_MAX_NB_FDS) +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "_CPP_REDIS_UNIX_MAX_NB_FDS=${UNIX_MAX_NB_FDS}") +ENDIF (UNIX_MAX_NB_FDS) + +# __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS +IF (WIN_NB_IO_SERVICE_WORKERS) +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS=${WIN_NB_IO_SERVICE_WORKERS}") +ENDIF (WIN_NB_IO_SERVICE_WORKERS) + ### # install diff --git a/includes/cpp_redis/network/unix/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp index 278b126a..6b75c96a 100644 --- a/includes/cpp_redis/network/unix/io_service.hpp +++ b/includes/cpp_redis/network/unix/io_service.hpp @@ -13,6 +13,10 @@ #include +#ifndef _CPP_REDIS_UNIX_MAX_NB_FDS +#define _CPP_REDIS_UNIX_MAX_NB_FDS 1024 +#endif /* _CPP_REDIS_UNIX_MAX_FDS */ + namespace cpp_redis { namespace network { diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 7d32dc5f..fc27cb43 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -169,7 +169,7 @@ io_service::process_sets(struct pollfd* fds, unsigned int nfds) { void io_service::process_io(void) { - struct pollfd fds[1024]; + struct pollfd fds[_CPP_REDIS_UNIX_MAX_NB_FDS]; __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service starts poll loop in worker thread"); From 3c413cd20633f841cecd44facb93cd4d4c920f93 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Fri, 14 Oct 2016 23:52:57 +0200 Subject: [PATCH 09/13] fix warning under unix --- sources/network/unix/io_service.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index fc27cb43..7af70de0 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -180,8 +180,9 @@ io_service::process_io(void) { __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service woke up by poll"); process_sets(fds, nfds); } - else + else { __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service woke up by poll, but nothing to process"); + } } __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service ends poll loop in worker thread"); From 6a0dba1c732b1d152fbb695fdb15ccd18141eb96 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Sat, 15 Oct 2016 21:05:36 +0200 Subject: [PATCH 10/13] rename some internal defines. create_io_service and io_service constructors now take again the nb_workers as parameter. --- CMakeLists.txt | 18 +++++++++--------- includes/cpp_redis/network/io_service.hpp | 14 ++++++++++++-- includes/cpp_redis/network/unix/io_service.hpp | 8 ++++---- .../cpp_redis/network/windows/io_service.hpp | 6 +----- sources/network/io_service.cpp | 9 ++++++--- sources/network/unix/io_service.cpp | 7 ++++--- sources/network/windows/io_service.cpp | 7 ++++--- 7 files changed, 40 insertions(+), 29 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 92b5f9bc..d49fe66f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,15 +100,15 @@ IF (LOGGING_ENABLED) set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}") ENDIF (LOGGING_ENABLED) -# _CPP_REDIS_UNIX_MAX_NB_FDS -IF (UNIX_MAX_NB_FDS) -set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "_CPP_REDIS_UNIX_MAX_NB_FDS=${UNIX_MAX_NB_FDS}") -ENDIF (UNIX_MAX_NB_FDS) - -# __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS -IF (WIN_NB_IO_SERVICE_WORKERS) -set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS=${WIN_NB_IO_SERVICE_WORKERS}") -ENDIF (WIN_NB_IO_SERVICE_WORKERS) +# _CPP_REDIS_MAX_NB_FDS +IF (MAX_NB_FDS) +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "_CPP_REDIS_MAX_NB_FDS=${MAX_NB_FDS}") +ENDIF (MAX_NB_FDS) + +# __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS +IF (DEFAULT_NB_IO_SERVICE_WORKERS) +set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS=${DEFAULT_NB_IO_SERVICE_WORKERS}") +ENDIF (DEFAULT_NB_IO_SERVICE_WORKERS) ### diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index c94e7047..77894ea1 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -6,6 +6,10 @@ #include +#ifndef __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS +#define __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS 16 +#endif /* __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS */ + namespace cpp_redis { namespace network { @@ -19,7 +23,7 @@ class io_service { public: //! ctor & dtor - io_service(void) = default; + io_service(size_t nb_workers); virtual ~io_service(void) = default; //! copy ctor & assignment operator @@ -47,13 +51,19 @@ class io_service { typedef std::function write_callback_t; virtual bool async_write(_sock_t sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) = 0; +public: + size_t get_nb_workers(void) const; + private: //! listen for incoming events and notify virtual void process_io(void) = 0; + +private: + size_t m_nb_workers; }; //! multi-platform instance builder -std::shared_ptr create_io_service(void); +std::shared_ptr create_io_service(size_t nb_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS); } //! network diff --git a/includes/cpp_redis/network/unix/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp index 6b75c96a..5e21a841 100644 --- a/includes/cpp_redis/network/unix/io_service.hpp +++ b/includes/cpp_redis/network/unix/io_service.hpp @@ -13,9 +13,9 @@ #include -#ifndef _CPP_REDIS_UNIX_MAX_NB_FDS -#define _CPP_REDIS_UNIX_MAX_NB_FDS 1024 -#endif /* _CPP_REDIS_UNIX_MAX_FDS */ +#ifndef _CPP_REDIS_MAX_NB_FDS +#define _CPP_REDIS_MAX_NB_FDS 1024 +#endif /* _CPP_REDIS_MAX_NB_FDS */ namespace cpp_redis { @@ -26,7 +26,7 @@ namespace unix { class io_service : public network::io_service { public: //! ctor & dtor - io_service(void); + io_service(size_t nb_workers); ~io_service(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index 92513051..441ede41 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -10,10 +10,6 @@ #include -#ifndef __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS -#define __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS 16 -#endif /* __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS */ - namespace cpp_redis { namespace network { @@ -29,7 +25,7 @@ typedef enum _enIoOperation { class io_service : public network::io_service { public: //! ctor & dtor - io_service(void); + io_service(size_t nb_workers); ~io_service(void); private: diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp index 7a50703d..fc02950f 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/io_service.cpp @@ -25,12 +25,15 @@ io_service::set_global_instance(const std::shared_ptr& io_s global_instance = io_service; } +io_service::io_service(size_t nb_workers) +: m_nb_workers(nb_workers) {} + std::shared_ptr -create_io_service(void) { +create_io_service(size_t nb_workers) { #ifdef _WIN32 - return std::make_shared(); + return std::make_shared(nb_workers); #else - return std::make_shared(); + return std::make_shared(nb_workers); #endif /* _WIN32 */ } diff --git a/sources/network/unix/io_service.cpp b/sources/network/unix/io_service.cpp index 7af70de0..922b4741 100644 --- a/sources/network/unix/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -10,8 +10,9 @@ namespace network { namespace unix { -io_service::io_service(void) -: m_should_stop(false) +io_service::io_service(size_t nb_workers) +: network::io_service(nb_workers) +, m_should_stop(false) , m_notif_pipe_fds{1, 1} { if (pipe(m_notif_pipe_fds) == -1) { __CPP_REDIS_LOG(error, "cpp_redis::network::io_service could not create pipe"); @@ -169,7 +170,7 @@ io_service::process_sets(struct pollfd* fds, unsigned int nfds) { void io_service::process_io(void) { - struct pollfd fds[_CPP_REDIS_UNIX_MAX_NB_FDS]; + struct pollfd fds[_CPP_REDIS_MAX_NB_FDS]; __CPP_REDIS_LOG(debug, "cpp_redis::network::io_service starts poll loop in worker thread"); diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index b04cdfbf..77041e26 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -8,7 +8,8 @@ namespace network { namespace windows { io_service::io_service(void) -: m_should_stop(false) { +: nework::io_service(nb_workers) +, m_should_stop(false) { //! Start winsock before any other socket calls. WSADATA wsaData; int nRet = WSAStartup(0x202, &wsaData); @@ -21,7 +22,7 @@ io_service::io_service(void) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); //! Now startup worker thread pool which will service our async io requests - for (unsigned int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; ++i) + for (unsigned int i = 0; i < __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS; ++i) m_worker_threads.push_back(std::thread(&io_service::process_io, this)); } @@ -37,7 +38,7 @@ io_service::shutdown() { //! message to the thread to tell them to wake up and shut down. for (const auto& sock : m_sockets) { //! Post for each of our worker threads. - for (int i = 0; i < __CPP_REDIS_WIN_NB_IO_SERVICE_WORKERS; i++) { + for (int i = 0; i < __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS; i++) { //! Use nullptr for the completion key to wake them up. PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); } From 7e97d46831e2a29106ed916aa446a32d90de4c22 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Sat, 15 Oct 2016 21:23:03 +0200 Subject: [PATCH 11/13] fix windows compilation --- includes/cpp_redis/network/io_service.hpp | 1 + sources/network/io_service.cpp | 5 +++++ sources/network/tcp_client.cpp | 10 ++++++---- sources/network/windows/io_service.cpp | 12 ++++++------ 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 77894ea1..e1779484 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp index fc02950f..1c05ca8b 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/io_service.cpp @@ -28,6 +28,11 @@ io_service::set_global_instance(const std::shared_ptr& io_s io_service::io_service(size_t nb_workers) : m_nb_workers(nb_workers) {} +size_t +io_service::get_nb_workers(void) const { + return m_nb_workers; +} + std::shared_ptr create_io_service(size_t nb_workers) { #ifdef _WIN32 diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index b5480fa9..a261c447 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -50,8 +50,9 @@ tcp_client::setup_socket(void) { //! and our user buffer for each I/O call. //! BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! int nZero = 0; - if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*) &nZero, sizeof(nZero))) - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); + if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero))) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); + } #else //! create the socket m_sock = socket(AF_INET, SOCK_STREAM, 0); @@ -99,8 +100,9 @@ tcp_client::connect(const std::string& host, unsigned int port, //! Set socket to non blocking. //! Must only be done once connected u_long ulValue = 1; - if (ioctlsocket(m_sock, FIONBIO, &ulValue)) - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); + if (ioctlsocket(m_sock, FIONBIO, &ulValue)) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); + } #endif /* _WIN32 */ //! add fd to the io_service and set the disconnection & recv handlers diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index 77041e26..31a48dcc 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -1,5 +1,5 @@ -#include "cpp_redis/network/windows/io_service.hpp" -#include "cpp_redis/redis_error.hpp" +#include +#include namespace cpp_redis { @@ -7,8 +7,8 @@ namespace network { namespace windows { -io_service::io_service(void) -: nework::io_service(nb_workers) +io_service::io_service(size_t nb_workers) +: network::io_service(nb_workers) , m_should_stop(false) { //! Start winsock before any other socket calls. WSADATA wsaData; @@ -22,7 +22,7 @@ io_service::io_service(void) throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); //! Now startup worker thread pool which will service our async io requests - for (unsigned int i = 0; i < __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS; ++i) + for (unsigned int i = 0; i < get_nb_workers(); ++i) m_worker_threads.push_back(std::thread(&io_service::process_io, this)); } @@ -38,7 +38,7 @@ io_service::shutdown() { //! message to the thread to tell them to wake up and shut down. for (const auto& sock : m_sockets) { //! Post for each of our worker threads. - for (int i = 0; i < __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS; i++) { + for (size_t i = 0; i < get_nb_workers(); i++) { //! Use nullptr for the completion key to wake them up. PostQueuedCompletionStatus(m_completion_port, 0, NULL, NULL); } From ee7eac53934c9b613b5a585f750b1ff487c83def Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Sat, 15 Oct 2016 21:23:43 +0200 Subject: [PATCH 12/13] clang-format --- sources/network/io_service.cpp | 2 +- sources/network/tcp_client.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp index 1c05ca8b..0b14b037 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/io_service.cpp @@ -30,7 +30,7 @@ io_service::io_service(size_t nb_workers) size_t io_service::get_nb_workers(void) const { - return m_nb_workers; + return m_nb_workers; } std::shared_ptr diff --git a/sources/network/tcp_client.cpp b/sources/network/tcp_client.cpp index a261c447..a9a0510b 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/tcp_client.cpp @@ -50,8 +50,8 @@ tcp_client::setup_socket(void) { //! and our user buffer for each I/O call. //! BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! int nZero = 0; - if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero))) { - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); + if (setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char*) &nZero, sizeof(nZero))) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not disable buffering"); } #else //! create the socket @@ -101,7 +101,7 @@ tcp_client::connect(const std::string& host, unsigned int port, //! Must only be done once connected u_long ulValue = 1; if (ioctlsocket(m_sock, FIONBIO, &ulValue)) { - __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client could not enable non-blocking mode on socket"); } #endif /* _WIN32 */ From 617016862b943e1534ee4e77c731c6fa9ee13f41 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Sat, 15 Oct 2016 21:25:00 +0200 Subject: [PATCH 13/13] angle brackets include --- sources/reply.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sources/reply.cpp b/sources/reply.cpp index fbdd5f75..eaa92f23 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -1,5 +1,5 @@ -#include "cpp_redis/reply.hpp" -#include "cpp_redis/redis_error.hpp" +#include +#include namespace cpp_redis {