From 983764fd93092c45008d628b894b8e664556e7bb Mon Sep 17 00:00:00 2001 From: MikesAracade Date: Tue, 11 Oct 2016 15:53:10 -0500 Subject: [PATCH 1/7] windows port stage 1 --- .gitignore | 19 +- examples/redis_client.cpp | 4 +- includes/cpp_redis/network/io_service.hpp | 112 ++-- .../cpp_redis/network/redis_connection.hpp | 2 +- includes/cpp_redis/network/tcp_client.hpp | 10 +- includes/cpp_redis/redis_client.hpp | 6 +- sources/builders/array_builder.cpp | 12 +- sources/builders/bulk_string_builder.cpp | 6 +- sources/builders/integer_builder.cpp | 8 +- sources/builders/reply_builder.cpp | 8 +- sources/network/io_service.cpp | 230 -------- sources/network/redis_connection.cpp | 3 +- sources/network/win_io_service.cpp | 547 ++++++++++++++++++ .../{tcp_client.cpp => win_tcp_client.cpp} | 63 +- sources/redis_client.cpp | 12 +- sources/redis_subscriber.cpp | 16 +- sources/reply.cpp | 8 +- tests/sources/spec/redis_client_spec.cpp | 10 +- tests/sources/spec/redis_subscriber_spec.cpp | 26 +- 19 files changed, 754 insertions(+), 348 deletions(-) delete mode 100644 sources/network/io_service.cpp create mode 100644 sources/network/win_io_service.cpp rename sources/network/{tcp_client.cpp => win_tcp_client.cpp} (62%) diff --git a/.gitignore b/.gitignore index 6854a22f..78a41448 100644 --- a/.gitignore +++ b/.gitignore @@ -22,10 +22,25 @@ *.a *.lib +build +deps + # Executables *.exe *.out *.app +*.sln +*.vcxproj +*.suo +*.user +*.filters +*.VC.db +*.opendb +.gitignore +cmake_install.cmake -build -deps +rts-redis-test/ +CMakeCache.txt +CMakeFiles/ +Release/ +Debug/ \ No newline at end of file diff --git a/examples/redis_client.cpp b/examples/redis_client.cpp index 5f1083fd..c1fcee7f 100644 --- a/examples/redis_client.cpp +++ b/examples/redis_client.cpp @@ -40,7 +40,7 @@ main(void) { // client.sync_commit(std::chrono::milliseconds(100)); signal(SIGINT, &sigint_handler); - while (not should_exit); - + while (!should_exit); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 0; } diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 462bba85..680147ac 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -6,24 +6,38 @@ #include #include +#ifndef _MSC_VER #include #include #include +#else +#include + +#define MAX_BUFF_SIZE CPP_REDIS_READ_SIZE +#define MAX_WORKER_THREADS 32 +#endif namespace cpp_redis { namespace network { +typedef enum _enIoOperation { + //IO_OP_ACCEPT, + IO_OP_READ, + IO_OP_WRITE + } enIoOperation; + + class io_service { public: //! instance getter (singleton pattern) static io_service& get_instance(void); - -private: - //! ctor & dtor - io_service(void); + io_service(size_t max_worker_threads = MAX_WORKER_THREADS); ~io_service(void); + void shutdown(); + +private: //! copy ctor & assignment operator io_service(const io_service&) = delete; io_service& operator=(const io_service&) = delete; @@ -32,42 +46,64 @@ class io_service { //! disconnection handler declaration typedef std::function disconnection_handler_t; - //! add or remove a given fd from the io service + //! add or remove a given socket from the io service //! untrack should never be called from inside a callback - void track(int fd, const disconnection_handler_t& handler); - void untrack(int fd); + void track(SOCKET sock, const disconnection_handler_t& handler); + void untrack(SOCKET sock); //! asynchronously read read_size bytes and append them to the given buffer //! on completion, call the read_callback to notify of the success or failure of the operation - //! return false if another async_read operation is in progress or fd is not registered + //! return false if another async_read operation is in progress or socket is not registered typedef std::function read_callback_t; - bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); + bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); //! asynchronously write write_size bytes from buffer to the specified fd //!on completion, call the write_callback to notify of the success or failure of the operation //! return false if another async_write operation is in progress or fd is not registered typedef std::function write_callback_t; - bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); private: - //! simple struct to keep track of ongoing operations on a given fd - struct fd_info { + +#ifdef _WIN32 + struct io_context_info { + WSAOVERLAPPED overlapped; + enIoOperation eOperation; + }; +#endif + +//! simple struct to keep track of ongoing operations on a given sockeet + struct sock_info { +#ifdef _WIN32 + SOCKET hsock; + std::size_t sent_bytes; + + //Must protect the members of our structure from access by multiple threads during IO Completion + std::recursive_mutex sock_info_mutex; +#endif disconnection_handler_t disconnection_handler; - std::atomic_bool async_read; + std::atomic_bool async_read; std::vector* read_buffer; - std::size_t read_size; - read_callback_t read_callback; + std::size_t read_size; + read_callback_t read_callback; + + std::atomic_bool async_write; + std::vector write_buffer; + std::size_t write_size; + write_callback_t write_callback; - std::atomic_bool async_write; - std::vector write_buffer; - std::size_t write_size; - write_callback_t write_callback; +#ifdef _WIN32 + io_context_info io_info; +#endif }; +typedef std::function callback_t; + +#ifndef _WIN32 private: - //! listen for incoming events and notify - void listen(void); + //! wait for incoming events and notify + void wait_for_events(void); //! notify the select call so that it can wake up to process new events void notify_select(void); @@ -77,30 +113,36 @@ class io_service { int init_sets(fd_set* rd_set, fd_set* wr_set); void process_sets(fd_set* rd_set, fd_set* wr_set); - typedef std::function callback_t; - callback_t read_fd(int fd); - callback_t write_fd(int fd); + callback_t read_sock(SOCKET sock); + callback_t write_sock(SOCKET sock); +#else + //! wait for incoming events and notify + int process_io(void); + + HANDLE m_completion_port; + unsigned int m_worker_thread_pool_size; + std::vector m_worker_threads; //vector containing all the threads we start to service our i/o requests +#endif private: //! whether the worker should terminate or not std::atomic_bool m_should_stop; - //! worker in the background, listening for events - std::thread m_worker; - - //! tracked fds - std::unordered_map m_fds; + //! tracked sockets + std::unordered_map m_sockets; - //! fd associated to the pipe used to wake up the select call - int m_notif_pipe_fds[2]; +#ifndef _WIN32 + //! socket associated used to wake up the select call + SOCKET m_notify_socket; +#endif - //! mutex to protect m_fds access against race condition + //! mutex to protect m_notify_socket access against race condition //! - //! specific mutex for untrack: we dont want someone to untrack a fd while we process it + //! specific mutex for untrack: we dont want someone to untrack a socket while we process it //! this behavior could cause some issues when executing callbacks in another thread - //! for example, obj is destroyed, in its dtor it untracks the fd, but at the same time + //! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time //! a callback is executed from within another thread: the untrack mutex avoid this without being costly - std::recursive_mutex m_fds_mutex; + std::recursive_mutex m_socket_mutex; }; } //! network diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 7fa92002..157298f1 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -15,7 +15,7 @@ namespace network { class redis_connection { public: //! ctor & dtor - redis_connection(void); + redis_connection(network::io_service* pIO=NULL); ~redis_connection(void) = default; //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp index 13899bef..71b199eb 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -23,7 +23,7 @@ namespace network { class tcp_client { public: //! ctor & dtor - tcp_client(void); + tcp_client(io_service* pIO=NULL); ~tcp_client(void); //! assignment operator & copy ctor @@ -46,7 +46,7 @@ class tcp_client { void send(const std::vector& buffer); private: - //! make boost asio async read and write operations + //! make async read and write operations void async_read(void); void async_write(void); @@ -57,10 +57,10 @@ class tcp_client { private: //! io service instance - io_service& m_io_service; + io_service* m_pio_service; - //! socket fd - int m_fd; + //! socket + SOCKET m_sock; //! is connected std::atomic_bool m_is_connected; diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 0f9a2b7e..6a2b306c 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -14,8 +14,8 @@ namespace cpp_redis { class redis_client { public: //! ctor & dtor - redis_client(void); - ~redis_client(void) = default; + redis_client(network::io_service* pIO=NULL); + ~redis_client(void); //! copy ctor & assignment operator redis_client(const redis_client&) = delete; @@ -42,7 +42,7 @@ class redis_client { try_commit(); std::unique_lock lock_callback(m_callbacks_mutex); - m_sync_condvar.wait_for(lock_callback, timeout, [=]{ return m_callbacks_running == 0 and m_callbacks.empty(); }); + m_sync_condvar.wait_for(lock_callback, timeout, [=]{ return m_callbacks_running == 0 && m_callbacks.empty(); }); return *this; } diff --git a/sources/builders/array_builder.cpp b/sources/builders/array_builder.cpp index 814c4a6e..24c5e640 100644 --- a/sources/builders/array_builder.cpp +++ b/sources/builders/array_builder.cpp @@ -17,7 +17,7 @@ array_builder::fetch_array_size(std::string& buffer) { return true; m_int_builder << buffer; - if (not m_int_builder.reply_ready()) + if (!m_int_builder.reply_ready()) return false; int size = m_int_builder.get_integer(); @@ -33,13 +33,13 @@ array_builder::fetch_array_size(std::string& buffer) { bool array_builder::build_row(std::string& buffer) { - if (not m_current_builder) { + if (!m_current_builder) { m_current_builder = create_builder(buffer.front()); buffer.erase(0, 1); } *m_current_builder << buffer; - if (not m_current_builder->reply_ready()) + if (!m_current_builder->reply_ready()) return false; m_reply << m_current_builder->get_reply(); @@ -56,11 +56,11 @@ array_builder::operator<<(std::string& buffer) { if (m_reply_ready) return *this; - if (not fetch_array_size(buffer)) + if (!fetch_array_size(buffer)) return *this; - while (buffer.size() and not m_reply_ready) - if (not build_row(buffer)) + while (buffer.size() && !m_reply_ready) + if (!build_row(buffer)) return *this; return *this; diff --git a/sources/builders/bulk_string_builder.cpp b/sources/builders/bulk_string_builder.cpp index 575241a0..56937093 100644 --- a/sources/builders/bulk_string_builder.cpp +++ b/sources/builders/bulk_string_builder.cpp @@ -27,7 +27,7 @@ bulk_string_builder::fetch_size(std::string& buffer) { return true; m_int_builder << buffer; - if (not m_int_builder.reply_ready()) + if (!m_int_builder.reply_ready()) return false; m_str_size = m_int_builder.get_integer(); @@ -44,7 +44,7 @@ bulk_string_builder::fetch_str(std::string& buffer) { if (buffer.size() < static_cast(m_str_size) + 2) // also wait for end sequence return ; - if (buffer[m_str_size] != '\r' or buffer[m_str_size + 1] != '\n') + if (buffer[m_str_size] != '\r' || buffer[m_str_size + 1] != '\n') throw redis_error("Wrong ending sequence"); m_str = buffer.substr(0, m_str_size); @@ -58,7 +58,7 @@ bulk_string_builder::operator<<(std::string& buffer) { return *this; //! if we don't have the size, try to get it with the current buffer - if (not fetch_size(buffer) or m_reply_ready) + if (!fetch_size(buffer) || m_reply_ready) return *this; fetch_str(buffer); diff --git a/sources/builders/integer_builder.cpp b/sources/builders/integer_builder.cpp index cd400b21..a600a575 100644 --- a/sources/builders/integer_builder.cpp +++ b/sources/builders/integer_builder.cpp @@ -22,11 +22,15 @@ integer_builder::operator<<(std::string& buffer) { unsigned int i; for (i = 0; i < end_sequence; i++) { //! check for negative numbers - if (not i and m_negative_multiplicator == 1 and buffer[i] == '-') { + if (!i && m_negative_multiplicator == 1 && buffer[i] == '-') { m_negative_multiplicator = -1; continue; } - else if (not std::isdigit(buffer[i])) +#ifdef _MSC_VER + else if (!isdigit(buffer[i])) +#else + else if (!std::isdigit(buffer[i])) +#endif throw redis_error("Invalid character for integer redis reply"); m_nbr *= 10; diff --git a/sources/builders/reply_builder.cpp b/sources/builders/reply_builder.cpp index 8a5166a2..1a5652f4 100644 --- a/sources/builders/reply_builder.cpp +++ b/sources/builders/reply_builder.cpp @@ -20,10 +20,10 @@ reply_builder::operator<<(const std::string& data) { bool reply_builder::build_reply(void) { - if (not m_buffer.size()) + if (!m_buffer.size()) return false; - if (not m_builder) { + if (!m_builder) { m_builder = create_builder(m_buffer.front()); m_buffer.erase(0, 1); } @@ -47,7 +47,7 @@ reply_builder::operator>>(reply& reply) { const reply& reply_builder::get_front(void) const { - if (not reply_available()) + if (!reply_available()) throw redis_error("No available reply"); return m_available_replies.front(); @@ -55,7 +55,7 @@ reply_builder::get_front(void) const { void reply_builder::pop_front(void) { - if (not reply_available()) + if (!reply_available()) throw redis_error("No available reply"); m_available_replies.pop_front(); diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp deleted file mode 100644 index 0eab9790..00000000 --- a/sources/network/io_service.cpp +++ /dev/null @@ -1,230 +0,0 @@ -#include "cpp_redis/network/io_service.hpp" -#include "cpp_redis/redis_error.hpp" - -#include - -namespace cpp_redis { - -namespace network { - -io_service& -io_service::get_instance(void) { - static io_service instance; - return instance; -} - -io_service::io_service(void) -: m_should_stop(false) -{ - if (pipe(m_notif_pipe_fds) == -1) - throw cpp_redis::redis_error("Could not init cpp_redis::io_service, pipe() failure"); - - int flags = fcntl(m_notif_pipe_fds[1], F_GETFL, 0); - if (flags == -1 or fcntl(m_notif_pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1) - throw cpp_redis::redis_error("Could not init cpp_redis::io_service, fcntl() failure"); - - m_worker = std::thread(&io_service::listen, this); -} - -io_service::~io_service(void) { - m_should_stop = true; - notify_select(); - - m_worker.join(); - - close(m_notif_pipe_fds[0]); - close(m_notif_pipe_fds[1]); -} - -int -io_service::init_sets(fd_set* rd_set, fd_set* wr_set) { - int max_fd = m_notif_pipe_fds[0]; - - FD_ZERO(rd_set); - FD_ZERO(wr_set); - FD_SET(m_notif_pipe_fds[0], rd_set); - - std::lock_guard lock(m_fds_mutex); - for (const auto& fd : m_fds) { - if (fd.second.async_read) - FD_SET(fd.first, rd_set); - - if (fd.second.async_write) - FD_SET(fd.first, wr_set); - - if ((fd.second.async_read or fd.second.async_write) and fd.first > max_fd) - max_fd = fd.first; - } - - return max_fd; -} - -io_service::callback_t -io_service::read_fd(int fd) { - std::lock_guard lock(m_fds_mutex); - - auto fd_it = m_fds.find(fd); - if (fd_it == m_fds.end()) - return nullptr; - - auto& buffer = *fd_it->second.read_buffer; - int original_buffer_size = buffer.size(); - buffer.resize(original_buffer_size + fd_it->second.read_size); - - int nb_bytes_read = recv(fd_it->first, buffer.data() + original_buffer_size, fd_it->second.read_size, 0); - fd_it->second.async_read = false; - - if (nb_bytes_read <= 0) { - buffer.resize(original_buffer_size); - fd_it->second.disconnection_handler(*this); - m_fds.erase(fd_it); - - return nullptr; - } - else { - buffer.resize(original_buffer_size + nb_bytes_read); - - return std::bind(fd_it->second.read_callback, nb_bytes_read); - } -} - -io_service::callback_t -io_service::write_fd(int fd) { - std::lock_guard lock(m_fds_mutex); - - auto fd_it = m_fds.find(fd); - if (fd_it == m_fds.end()) - return nullptr; - - int nb_bytes_written = send(fd_it->first, fd_it->second.write_buffer.data(), fd_it->second.write_size, 0); - fd_it->second.async_write = false; - - if (nb_bytes_written <= 0) { - fd_it->second.disconnection_handler(*this); - m_fds.erase(fd_it); - - return nullptr; - } - else - return std::bind(fd_it->second.write_callback, nb_bytes_written); -} - -void -io_service::process_sets(fd_set* rd_set, fd_set* wr_set) { - std::vector fds_to_read; - std::vector fds_to_write; - - //! Quickly fetch fds that are readable/writeable - //! This reduce lock time and avoid possible deadlock in callbacks - { - std::lock_guard lock(m_fds_mutex); - - for (const auto& fd : m_fds) { - if (fd.second.async_read and FD_ISSET(fd.first, rd_set)) - fds_to_read.push_back(fd.first); - - if (fd.second.async_write and FD_ISSET(fd.first, wr_set)) - fds_to_write.push_back(fd.first); - } - } - - for (int fd : fds_to_read) { - auto callback = read_fd(fd); - if (callback) - callback(); - } - for (int fd : fds_to_write) { - auto callback = write_fd(fd); - if (callback) - callback(); - } - - if (FD_ISSET(m_notif_pipe_fds[0], rd_set)) { - char buf[1024]; - (void)read(m_notif_pipe_fds[0], buf, 1024); - } -} - -void -io_service::listen(void) { - fd_set rd_set; - fd_set wr_set; - - while (not m_should_stop) { - int max_fd = init_sets(&rd_set, &wr_set); - - if (select(max_fd + 1, &rd_set, &wr_set, nullptr, nullptr) > 0) - process_sets(&rd_set, &wr_set); - } -} - -void -io_service::track(int fd, const disconnection_handler_t& handler) { - std::lock_guard lock(m_fds_mutex); - - auto& info = m_fds[fd]; - info.async_read = false; - info.async_write = false; - info.disconnection_handler = handler; - - notify_select(); -} - -void -io_service::untrack(int fd) { - std::lock_guard lock(m_fds_mutex); - m_fds.erase(fd); -} - -bool -io_service::async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { - std::lock_guard lock(m_fds_mutex); - - auto reg_fd_it = m_fds.find(fd); - if (reg_fd_it == m_fds.end()) - return false; - - auto& reg_fd = reg_fd_it->second; - bool expected = false; - if (not reg_fd.async_read.compare_exchange_strong(expected, true)) - return false; - - reg_fd.read_buffer = &buffer; - reg_fd.read_size = read_size; - reg_fd.read_callback = callback; - - notify_select(); - - return true; -} - -bool -io_service::async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { - std::lock_guard lock(m_fds_mutex); - - auto reg_fd_it = m_fds.find(fd); - if (reg_fd_it == m_fds.end()) - return false; - - auto& reg_fd = reg_fd_it->second; - bool expected = false; - if (not reg_fd.async_write.compare_exchange_strong(expected, true)) - return false; - - reg_fd.write_buffer = buffer; - reg_fd.write_size = write_size; - reg_fd.write_callback = callback; - - notify_select(); - - return true; -} - -void -io_service::notify_select(void) { - (void)write(m_notif_pipe_fds[1], "a", 1); -} - -} //! network - -} //! cpp_redis diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 52f52b20..5f7ba9bc 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -4,9 +4,10 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(void) +redis_connection::redis_connection(network::io_service* pIO/*=NULL*/) : m_reply_callback(nullptr) , m_disconnection_handler(nullptr) +, m_client(pIO) {} void diff --git a/sources/network/win_io_service.cpp b/sources/network/win_io_service.cpp new file mode 100644 index 00000000..533e9551 --- /dev/null +++ b/sources/network/win_io_service.cpp @@ -0,0 +1,547 @@ +#include "cpp_redis/network/io_service.hpp" +#include "cpp_redis/redis_error.hpp" + +#ifdef _MSC_VER +#include //needed for fcntl, open etc. +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings +#endif + +namespace cpp_redis { + +namespace network { + +io_service& + io_service::get_instance(void) { + static io_service instance; + return instance; +} + +#ifdef _MSC_VER +io_service::io_service(size_t max_worker_threads /*= MAX_WORKER_THREADS*/) + : m_should_stop(false) +{ + //Determine the size of the thread pool dynamically. + //2 * number of processors in the system is our rule here. + SYSTEM_INFO info; + ::GetSystemInfo(&info); + m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); + + if (m_worker_thread_pool_size > MAX_WORKER_THREADS) + m_worker_thread_pool_size = MAX_WORKER_THREADS; + + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); + + //Create completion port. Pass 0 for parameter 4 to allow as many threads as there are processors in the system + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);; + if( INVALID_HANDLE_VALUE == m_completion_port) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); + + //Now startup worker thread pool which will service our async io requests + for (unsigned int i=0; i lock(m_socket_mutex); + + //Add the socket to our map and return the allocated struct + auto& info = m_sockets[sock]; + + memset(&info.io_info.overlapped, 0, sizeof(OVERLAPPED)); + info.hsock = sock; + info.async_read = false; + info.async_write = false; + info.disconnection_handler = handler; + + //Associate the socket with our io completion port. + if( NULL == CreateIoCompletionPort((HANDLE)sock, m_completion_port, (DWORD_PTR)&m_sockets[sock], 0)) + { + printf("CreateIoCompletionPort() failed: %d\n", GetLastError()); + throw cpp_redis::redis_error("Track() failed to create CreateIoCompletionPort"); + } + + printf("Socket(%d) added to IOCP ok\n", sock); +} + +void +io_service::untrack(SOCKET sock) +{ + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return; + auto& sockinfo = sock_it->second; + + //Wait until the posted i/o has completed. + //while (m_completion_port && !HasOverlappedIoCompleted((LPOVERLAPPED)&sockinfo.io_info.overlapped)) + // std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + bool expected = false; + if (!sockinfo.async_read.compare_exchange_strong(expected, true)) + return false; + + //Resize the buffer to the correct size if we need to. + if (buffer.size() < read_size) + buffer.resize(read_size); + + sockinfo.read_buffer = &buffer; + sockinfo.read_size = read_size; + sockinfo.read_callback = callback; + sockinfo.io_info.eOperation = IO_OP_READ; + + DWORD dwRecvNumBytes = 0; + DWORD dwFlags = 0; + WSABUF buffRecv; + + buffRecv.buf = sockinfo.read_buffer->data(); + buffRecv.len = sockinfo.read_size; + + int nRet = WSARecv(sock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &sockinfo.io_info.overlapped, NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf("WSARecv() failed: %d Closing socket\n", WSAGetLastError()); + + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + bool expected = false; + if (!sockinfo.async_write.compare_exchange_strong(expected, true)) + return false; + + sockinfo.write_buffer = buffer; + sockinfo.write_size = write_size; + sockinfo.write_callback = callback; + sockinfo.io_info.eOperation = IO_OP_WRITE; + + auto& sbuffer = sockinfo.write_buffer; + + WSABUF buffSend; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + buffSend.buf = sockinfo.write_buffer.data(); + buffSend.len = write_size; + + int nRet = WSASend(sock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(sockinfo.io_info.overlapped), NULL); + if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf("WSASend() failed: %d\n", WSAGetLastError()); + + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +//function used by worker thread(s) used to process io requests +int +io_service::process_io(void) +{ + BOOL bSuccess = FALSE; + int nRet = 0; + LPWSAOVERLAPPED pOverlapped = NULL; + sock_info* pPerSocketInfo = NULL; + + WSABUF buffRecv; + WSABUF buffSend; + DWORD dwRecvNumBytes = 0; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + DWORD io_size = 0; + + while (!m_should_stop) + { + // continually loop to service io completion packets + pPerSocketInfo = NULL; + bSuccess = GetQueuedCompletionStatus(m_completion_port, &io_size, (PDWORD_PTR)&pPerSocketInfo, (LPOVERLAPPED *)&pOverlapped, INFINITE); + if(!bSuccess && m_should_stop) + return 0; + + //assert(bSuccess==TRUE, "GetQueuedCompletionStatus failed."); + + // First check to see if an error has occurred on the socket and if so + // then close the socket and cleanup the SOCKET_INFORMATION structure + // associated with the socket + if (0 == io_size) + { + //Fire the disconnect handler. Check if stopped first because of some weird timing issues that may get us here. + if(bSuccess && false == m_should_stop && pPerSocketInfo) + pPerSocketInfo->disconnection_handler(*this); + continue; + } + + if (!pPerSocketInfo || !pOverlapped) + { + // Somebody used PostQueuedCompletionStatus to post an I/O packet with + // a NULL CompletionKey (or if we get one for any reason). It is time to exit. + return 0; + } + + //Lock our socket structure to prevent multiple access + std::lock_guard socklock(pPerSocketInfo->sock_info_mutex); + auto& io_info = pPerSocketInfo->io_info; + + // determine what type of IO packet has completed by checking the PER_IO_CONTEXT + // associated with this socket. This will determine what action to take. + switch (io_info.eOperation) + { + case IO_OP_READ: + // a read operation has completed + printf("WorkerThread %d: Socket(%d) Received %d bytes\n", GetCurrentThreadId(), pPerSocketInfo->hsock, io_size); + //TODO Resize Read buffer to the size read??? + pPerSocketInfo->read_callback(io_size); + break; + + case IO_OP_WRITE: + // a write operation has completed, determine if all the data intended to be + // sent actually was sent. + io_info.eOperation = IO_OP_WRITE; + pPerSocketInfo->sent_bytes += io_size; + dwFlags = 0; + + auto& buffer = pPerSocketInfo->write_buffer; + + // the previous write operation didn't send all the data, + // post another send to complete the operation + if (pPerSocketInfo->sent_bytes < pPerSocketInfo->write_size) + { + //Advance pointer past what we sent already and send the remaining. + size_t size = pPerSocketInfo->write_size; + buffSend.buf = buffer.data() + pPerSocketInfo->sent_bytes; + buffSend.len = pPerSocketInfo->write_size - pPerSocketInfo->sent_bytes; + + nRet = WSASend(pPerSocketInfo->hsock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(io_info.overlapped), NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf ("WSASend() failed: %d\n", WSAGetLastError()); + //Fire the disconnect handler + pPerSocketInfo->disconnection_handler(*this); + } + else + { + printf("WorkerThread %d: Socket(%d) Partial send %d of %d bytes. Posting another send\n", GetCurrentThreadId(), pPerSocketInfo->hsock, pPerSocketInfo->sent_bytes, size); + pPerSocketInfo->write_callback(io_size); + } + } + else + { + // previous write operation completed for this socket, post another recv + io_info.eOperation = IO_OP_READ; + dwRecvNumBytes = 0; + dwFlags = 0; + buffRecv.buf = pPerSocketInfo->read_buffer->data(); + buffRecv.len = pPerSocketInfo->read_size; + + int nRet = WSARecv(pPerSocketInfo->hsock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &io_info.overlapped, NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf ("WSARecv() failed. Error: %d\n", WSAGetLastError()); + //Fire the disconnect handler + pPerSocketInfo->disconnection_handler(*this); + } + else + { + printf("WorkerThread %d: Socket(%d) Sent %d bytes, Posted Recv\n", GetCurrentThreadId(), pPerSocketInfo->hsock, io_size); + pPerSocketInfo->write_callback(io_size); + } + } + break; + } //switch + } //while + + return 0; +} + +#else //End of Windows only code + +//Start of Linux code +io_service::io_service() +: m_should_stop(false) +{ +#ifdef _MSC_VER + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); +#endif + + m_notify_socket = socket(AF_INET, SOCK_STREAM, 0); + if(m_notify_socket < 0) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, socket() creation failure"); + + u_long ulValue = 1; + if (0 != ioctlsocket(m_notify_socket, FIONBIO, &ulValue)) //Set sockets to non blocking. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, ioctlsocket() failure"); + + m_worker = std::thread(&io_service::wait_for_events, this); +} + +io_service::~io_service(void) { + m_should_stop = true; + notify_select(); + + m_worker.join(); +} + +int +io_service::init_sets(fd_set* rd_set, fd_set* wr_set) { + SOCKET max_sock = m_notify_socket; + + FD_ZERO(rd_set); + FD_ZERO(wr_set); + FD_SET(m_notify_socket, rd_set); + + std::lock_guard lock(m_socket_mutex); + for (const auto& sock : m_sockets) { + if (sock.second.async_read) + FD_SET(sock.first, rd_set); + + if (sock.second.async_write) + FD_SET(sock.first, wr_set); + + if ((sock.second.async_read || sock.second.async_write) && sock.first > max_sock) + max_sock = sock.first; + } + + return max_sock; +} + +io_service::callback_t +io_service::read_sock(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return nullptr; + + auto& buffer = *sock_it->second.read_buffer; + int original_buffer_size = buffer.size(); + buffer.resize(original_buffer_size + sock_it->second.read_size); + + int nb_bytes_read = recv(sock_it->first, buffer.data() + original_buffer_size, sock_it->second.read_size, 0); + sock_it->second.async_read = false; + + if (nb_bytes_read <= 0) { + buffer.resize(original_buffer_size); + sock_it->second.disconnection_handler(*this); + m_sockets.erase(sock_it); + + return nullptr; + } + else { + buffer.resize(original_buffer_size + nb_bytes_read); + + return std::bind(sock_it->second.read_callback, nb_bytes_read); + } +} + +io_service::callback_t +io_service::write_sock(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return nullptr; + + int nb_bytes_written = send(sock_it->first, sock_it->second.write_buffer.data(), sock_it->second.write_size, 0); + sock_it->second.async_write = false; + + if (nb_bytes_written <= 0) { + sock_it->second.disconnection_handler(*this); + m_sockets.erase(sock_it); + + return nullptr; + } + else + return std::bind(sock_it->second.write_callback, nb_bytes_written); +} + +void +io_service::process_sets(fd_set* rd_set, fd_set* wr_set) { + std::vector socks_to_read; + std::vector socks_to_write; + + //! Quickly fetch fds that are readable/writeable + //! This reduce lock time and avoid possible deadlock in callbacks + { + std::lock_guard lock(m_socket_mutex); + + for (const auto& sock : m_sockets) { + if (sock.second.async_read && FD_ISSET(sock.first, rd_set)) + socks_to_read.push_back(sock.first); + + if (sock.second.async_write && FD_ISSET(sock.first, wr_set)) + socks_to_write.push_back(sock.first); + } + } + + for (SOCKET sock : socks_to_read) { + auto callback = read_sock(sock); + if (callback) + callback(); + } + for (SOCKET sock : socks_to_write) { + auto callback = write_sock(sock); + if (callback) + callback(); + } + + if (FD_ISSET(m_notify_socket, rd_set)) { + char buf[1024]; + (void)read(m_notify_socket, buf, 1024); + } +} + +void +io_service::wait_for_events(void) { + fd_set rd_set; + fd_set wr_set; + + while (!m_should_stop) { + int max_fd = init_sets(&rd_set, &wr_set); + + //Wait here until select returns and process any reads or writes that are pending. + if (select(max_fd + 1, &rd_set, &wr_set, nullptr, nullptr) > 0) + process_sets(&rd_set, &wr_set); + } +} + +void +io_service::track(SOCKET socket, const disconnection_handler_t& handler) { + std::lock_guard lock(m_socket_mutex); + + auto& info = m_sockets[socket]; + info.async_read = false; + info.async_write = false; + info.disconnection_handler = handler; + + notify_select(); +} + +void +io_service::untrack(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto reg_fd_it = m_sockets.find(sock); + if (reg_fd_it == m_sockets.end()) + return false; + + auto& reg_fd = reg_fd_it->second; + bool expected = false; + if (!reg_fd.async_read.compare_exchange_strong(expected, true)) + return false; + + reg_fd.read_buffer = &buffer; + reg_fd.read_size = read_size; + reg_fd.read_callback = callback; + + notify_select(); + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto reg_sock_it = m_sockets.find(sock); + if (reg_sock_it == m_sockets.end()) + return false; + + auto& reg_sock = reg_sock_it->second; + bool expected = false; + if (!reg_sock.async_write.compare_exchange_strong(expected, true)) + return false; + + reg_sock.write_buffer = buffer; + reg_sock.write_size = write_size; + reg_sock.write_callback = callback; + + notify_select(); + + return true; +} + +void +io_service::notify_select(void) { + (void)send(m_notify_socket, "a", 1, 0); +} +#endif + +} //! network +} //! cpp_redis diff --git a/sources/network/tcp_client.cpp b/sources/network/win_tcp_client.cpp similarity index 62% rename from sources/network/tcp_client.cpp rename to sources/network/win_tcp_client.cpp index 50e36460..9df4be17 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/win_tcp_client.cpp @@ -1,5 +1,7 @@ #include -#include + +#include //needed for fcntl, open etc. +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings #include #include "cpp_redis/network/tcp_client.hpp" @@ -15,9 +17,9 @@ namespace network { //! //! that way, any object containing a tcp_client has an attribute (or through its attributes) //! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(void) -: m_io_service(io_service::get_instance()) -, m_fd(-1) +tcp_client::tcp_client(io_service* pIO/*= NULL*/) +: m_pio_service((pIO?pIO:&io_service::get_instance())) +, m_sock(-1) , m_is_connected(false) , m_receive_handler(nullptr) , m_disconnection_handler(nullptr) @@ -37,13 +39,23 @@ tcp_client::connect(const std::string& host, unsigned int port, return throw cpp_redis::redis_error("Client already connected"); //! create the socket - m_fd = socket(AF_INET, SOCK_STREAM, 0); - if (m_fd < 0) - throw redis_error("Can't open a socket"); + int nZero = 0; + //Enable socket for overlapped i/o + m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); + if (m_sock < 0) + throw redis_error("Can't open a socket"); + + //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. + //The advantage is performance because we save a buffer copy between the TCP stack buffer + //and our user buffer for each I/O call. + //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! + if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero))) { + return throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to disable buffering"); + } //! get the server's DNS entry struct hostent *server = gethostbyname(host.c_str()); - if (not server) + if (!server) throw redis_error("No such host: " + host); //! build the server's Internet address @@ -54,13 +66,18 @@ tcp_client::connect(const std::string& host, unsigned int port, server_addr.sin_family = AF_INET; //! create a connection with the server - if (::connect(m_fd, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) + if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); - //! add fd to the io_service and set the disconnection & recv handlers + //! add sockeet to the io_service and set the disconnection & recv handlers m_disconnection_handler = disconnection_handler; m_receive_handler = receive_handler; - m_io_service.track(m_fd, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); + + u_long ulValue = 1; + if (0 != ioctlsocket(m_sock, FIONBIO, &ulValue)) //Set socket to non blocking. + throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to set socket to non-blocking"); + + m_pio_service->track(m_sock, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); m_is_connected = true; //! start async read @@ -69,12 +86,14 @@ tcp_client::connect(const std::string& host, unsigned int port, void tcp_client::disconnect(void) { - if (not m_is_connected) + if (!m_is_connected) return ; m_is_connected = false; - m_io_service.untrack(m_fd); - close(m_fd); + m_pio_service->untrack(m_sock); + + closesocket(m_sock); + clear_buffer(); } @@ -85,10 +104,10 @@ tcp_client::send(const std::string& buffer) { void tcp_client::send(const std::vector& buffer) { - if (not m_is_connected) + if (!m_is_connected) throw redis_error("Not connected"); - if (not buffer.size()) + if (!buffer.size()) return ; std::lock_guard lock(m_write_buffer_mutex); @@ -108,10 +127,10 @@ tcp_client::send(const std::vector& buffer) { void tcp_client::async_read(void) { - m_io_service.async_read(m_fd, m_read_buffer, READ_SIZE, + m_pio_service->async_read(m_sock, m_read_buffer, READ_SIZE, [&](std::size_t length) { if (m_receive_handler) - if (not m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { + if (!m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { disconnect(); return ; } @@ -126,13 +145,13 @@ tcp_client::async_read(void) { void tcp_client::async_write(void) { - m_io_service.async_write(m_fd, m_write_buffer, m_write_buffer.size(), + m_pio_service->async_write(m_sock, m_write_buffer, m_write_buffer.size(), [&](std::size_t length) { std::lock_guard lock(m_write_buffer_mutex); m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); - if (m_is_connected and m_write_buffer.size()) + if (m_is_connected && m_write_buffer.size()) async_write(); }); } @@ -145,7 +164,9 @@ tcp_client::is_connected(void) { void tcp_client::io_service_disconnection_handler(network::io_service&) { m_is_connected = false; - close(m_fd); + + closesocket(m_sock); + clear_buffer(); if (m_disconnection_handler) diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index 18ddc6b2..515eda4d 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -3,8 +3,8 @@ namespace cpp_redis { -redis_client::redis_client(void) -: m_callbacks_running(0) +redis_client::redis_client(network::io_service* pIO/*= NULL*/) +: m_callbacks_running(0), m_client(pIO) {} void @@ -18,6 +18,12 @@ redis_client::connect(const std::string& host, unsigned int port, m_disconnection_handler = client_disconnection_handler; } +redis_client::~redis_client(void) +{ + if(m_client.is_connected()) + m_client.disconnect(); +} + void redis_client::disconnect(void) { m_client.disconnect(); @@ -51,7 +57,7 @@ redis_client::sync_commit(void) { try_commit(); std::unique_lock lock_callback(m_callbacks_mutex); - m_sync_condvar.wait(lock_callback, [=]{ return m_callbacks_running == 0 and m_callbacks.empty(); }); + m_sync_condvar.wait(lock_callback, [=]{ return m_callbacks_running == 0 && m_callbacks.empty(); }); return *this; } diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 49822eae..fcd550b4 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -88,9 +88,9 @@ redis_subscriber::handle_subscribe_reply(const std::vector& reply) { const auto& channel = reply[1]; const auto& message = reply[2]; - if (not title.is_string() - or not channel.is_string() - or not message.is_string()) + if (!title.is_string() + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "message") @@ -115,10 +115,10 @@ redis_subscriber::handle_psubscribe_reply(const std::vector& reply) { const auto& channel = reply[2]; const auto& message = reply[3]; - if (not title.is_string() - or not pchannel.is_string() - or not channel.is_string() - or not message.is_string()) + if (!title.is_string() + || !pchannel.is_string() + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "pmessage") @@ -136,7 +136,7 @@ redis_subscriber::handle_psubscribe_reply(const std::vector& reply) { void redis_subscriber::connection_receive_handler(network::redis_connection&, reply& reply) { //! alaway return an array - if (not reply.is_array()) + if (!reply.is_array()) return ; auto& array = reply.as_array(); diff --git a/sources/reply.cpp b/sources/reply.cpp index 436fa8d4..36944ad8 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -56,7 +56,7 @@ reply::is_array(void) const { bool reply::is_string(void) const { - return is_simple_string() or is_bulk_string() or is_error(); + return is_simple_string() || is_bulk_string() || is_error(); } bool @@ -86,7 +86,7 @@ reply::is_null(void) const { const std::vector& reply::as_array(void) const { - if (not is_array()) + if (!is_array()) throw cpp_redis::redis_error("Reply is not an array"); return m_rows; @@ -94,7 +94,7 @@ reply::as_array(void) const { const std::string& reply::as_string(void) const { - if (not is_string()) + if (!is_string()) throw cpp_redis::redis_error("Reply is not a string"); return m_strval; @@ -102,7 +102,7 @@ reply::as_string(void) const { int reply::as_integer(void) const { - if (not is_integer()) + if (!is_integer()) throw cpp_redis::redis_error("Reply is not an integer"); return m_intval; diff --git a/tests/sources/spec/redis_client_spec.cpp b/tests/sources/spec/redis_client_spec.cpp index e2f068f4..ca3ecefe 100644 --- a/tests/sources/spec/redis_client_spec.cpp +++ b/tests/sources/spec/redis_client_spec.cpp @@ -100,12 +100,12 @@ TEST(RedisClient, SyncCommitTimeout) { client.connect(); volatile std::atomic_bool callback_exit(false); client.send({ "GET", "HELLO" }, [&] (cpp_redis::reply&) { - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); callback_exit = true; }); EXPECT_NO_THROW(client.sync_commit(std::chrono::milliseconds(100))); EXPECT_FALSE(callback_exit); - while (not callback_exit); + while (!callback_exit); } TEST(RedisClient, SyncCommitNoTimeout) { @@ -114,7 +114,7 @@ TEST(RedisClient, SyncCommitNoTimeout) { client.connect(); std::atomic_bool callback_exit(false); client.send({ "GET", "HELLO" }, [&] (cpp_redis::reply&) { - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); callback_exit = true; }); EXPECT_NO_THROW(client.sync_commit()); @@ -241,7 +241,7 @@ TEST(RedisClient, DisconnectionHandlerWithQuit) { client.send({ "QUIT" }); client.sync_commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(disconnection_handler_called); } @@ -254,7 +254,7 @@ TEST(RedisClient, DisconnectionHandlerWithoutQuit) { }); client.sync_commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(disconnection_handler_called); } diff --git a/tests/sources/spec/redis_subscriber_spec.cpp b/tests/sources/spec/redis_subscriber_spec.cpp index 7af3f4e1..2678f00f 100644 --- a/tests/sources/spec/redis_subscriber_spec.cpp +++ b/tests/sources/spec/redis_subscriber_spec.cpp @@ -136,7 +136,7 @@ TEST(RedisSubscriber, SubConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -155,7 +155,7 @@ TEST(RedisSubscriber, SubNotConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -175,7 +175,7 @@ TEST(RedisSubscriber, SubNotConnectedCommitNotConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -196,7 +196,7 @@ TEST(RedisSubscriber, SubscribeSomethingPublished) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -220,7 +220,7 @@ TEST(RedisSubscriber, SubscribeMultiplePublished) { client.publish("/chan", "first"); client.publish("/chan", "second"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(number_times_called == 2); } @@ -239,7 +239,7 @@ TEST(RedisSubscriber, SubscribeNothingPublished) { sub.commit(); client.publish("/other_chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -267,7 +267,7 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { client.publish("/chan_1", "hello"); client.publish("/chan_2", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -289,7 +289,7 @@ TEST(RedisSubscriber, PSubscribeSomethingPublished) { sub.commit(); client.publish("/chan/hello", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -319,7 +319,7 @@ TEST(RedisSubscriber, PSubscribeMultiplePublished) { client.publish("/chan/hello", "first"); client.publish("/chan/world", "second"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(number_times_called == 2); } @@ -338,7 +338,7 @@ TEST(RedisSubscriber, PSubscribeNothingPublished) { sub.commit(); client.publish("/other_chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -366,7 +366,7 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { client.publish("/chan/1", "hello"); client.publish("/other_chan/2", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -392,7 +392,7 @@ TEST(RedisSubscriber, Unsubscribe) { client.publish("/chan_1", "hello"); client.publish("/chan_2", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -418,7 +418,7 @@ TEST(RedisSubscriber, PUnsubscribe) { client.publish("/chan_1/hello", "hello"); client.publish("/chan_2/hello", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_1_run); EXPECT_TRUE(callback_2_run); } From 4865a551dd1dbbafc1417303d55ecc218b25071a Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Wed, 12 Oct 2016 22:36:51 +0200 Subject: [PATCH 2/7] fix compilation under unix and store io_service and tcp_client source files into subfolders unix and windows. --- CMakeLists.txt | 11 +++++++-- .../cpp_redis/network/redis_connection.hpp | 8 +++---- .../network/{ => unix}/io_service.hpp | 0 .../network/{ => unix}/tcp_client.hpp | 4 ++-- .../io_service.hpp} | 0 .../tcp_client.hpp} | 4 ++-- includes/cpp_redis/redis_client.hpp | 2 +- sources/network/redis_connection.cpp | 6 ++--- sources/network/{ => unix}/io_service.cpp | 2 +- sources/network/{ => unix}/tcp_client.cpp | 6 ++--- .../io_service.cpp} | 24 +++++++++---------- .../tcp_client.cpp} | 14 +++++------ sources/redis_client.cpp | 9 ++----- 13 files changed, 46 insertions(+), 44 deletions(-) rename includes/cpp_redis/network/{ => unix}/io_service.hpp (100%) rename includes/cpp_redis/network/{ => unix}/tcp_client.hpp (94%) rename includes/cpp_redis/network/{win_io_service.hpp => windows/io_service.hpp} (100%) rename includes/cpp_redis/network/{win_tcp_client.hpp => windows/tcp_client.hpp} (94%) rename sources/network/{ => unix}/io_service.cpp (99%) rename sources/network/{ => unix}/tcp_client.cpp (97%) rename sources/network/{win_io_service.cpp => windows/io_service.cpp} (98%) rename sources/network/{win_tcp_client.cpp => windows/tcp_client.cpp} (98%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 34a1478a..f48e024d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,8 +47,15 @@ link_directories(${GTEST_LIBS}) ### # sources ### -set(DIRS "sources" "sources/network" "sources/builders") -foreach(dir ${DIRS}) +set(SRC_DIRS "sources" "sources/network" "sources/builders") + +IF (WIN32) + set(SRC_DIRS ${SRC_DIRS} "sources/network/windows") +ELSE () + set(SRC_DIRS ${SRC_DIRS} "sources/network/unix") +ENDIF (WIN32) + +foreach(dir ${SRC_DIRS}) # get directory sources file(GLOB s_${dir} "${dir}/*.cpp") # set sources diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 76f2c918..bd79ccb3 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -6,10 +6,10 @@ #include #ifdef _MSC_VER -#include +# include #else -#include -#endif +# include +#endif /* _MSC_VER */ #include @@ -20,7 +20,7 @@ namespace network { class redis_connection { public: //! ctor & dtor - redis_connection::redis_connection(const std::shared_ptr pIO); + redis_connection(const std::shared_ptr& IO); ~redis_connection(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp similarity index 100% rename from includes/cpp_redis/network/io_service.hpp rename to includes/cpp_redis/network/unix/io_service.hpp diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/unix/tcp_client.hpp similarity index 94% rename from includes/cpp_redis/network/tcp_client.hpp rename to includes/cpp_redis/network/unix/tcp_client.hpp index d74204fa..1514afc6 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/unix/tcp_client.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #ifndef __CPP_REDIS_READ_SIZE @@ -23,7 +23,7 @@ namespace network { class tcp_client { public: //! ctor & dtor - tcp_client(void); + tcp_client(const std::shared_ptr& IO = nullptr); ~tcp_client(void); //! assignment operator & copy ctor diff --git a/includes/cpp_redis/network/win_io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp similarity index 100% rename from includes/cpp_redis/network/win_io_service.hpp rename to includes/cpp_redis/network/windows/io_service.hpp diff --git a/includes/cpp_redis/network/win_tcp_client.hpp b/includes/cpp_redis/network/windows/tcp_client.hpp similarity index 94% rename from includes/cpp_redis/network/win_tcp_client.hpp rename to includes/cpp_redis/network/windows/tcp_client.hpp index 3d5754c0..3e4e0496 100644 --- a/includes/cpp_redis/network/win_tcp_client.hpp +++ b/includes/cpp_redis/network/windows/tcp_client.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #ifndef __CPP_REDIS_READ_SIZE @@ -23,7 +23,7 @@ namespace network { class tcp_client { public: //! ctor & dtor - tcp_client(const std::shared_ptr pIO=NULL); + tcp_client(const std::shared_ptr& IO = nullptr); ~tcp_client(void); //! assignment operator & copy ctor diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 0ab7fab9..a633dcbf 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -15,7 +15,7 @@ namespace cpp_redis { class redis_client { public: //! ctor & dtor - redis_client(const std::shared_ptr pIO =NULL); + redis_client(const std::shared_ptr& IO = nullptr); ~redis_client(void); //! copy ctor & assignment operator diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index bbfa14de..6544a703 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -5,10 +5,10 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(const std::shared_ptr pIO/*=NULL*/) -: m_reply_callback(nullptr) +redis_connection::redis_connection(const std::shared_ptr& IO) +: m_client(IO) +, m_reply_callback(nullptr) , m_disconnection_handler(nullptr) -, m_client(pIO) { __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection created"); } diff --git a/sources/network/io_service.cpp b/sources/network/unix/io_service.cpp similarity index 99% rename from sources/network/io_service.cpp rename to sources/network/unix/io_service.cpp index 52ed2b91..ca0e7a12 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include #include diff --git a/sources/network/tcp_client.cpp b/sources/network/unix/tcp_client.cpp similarity index 97% rename from sources/network/tcp_client.cpp rename to sources/network/unix/tcp_client.cpp index fdfb3e54..56493e3c 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/unix/tcp_client.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include namespace cpp_redis { @@ -16,8 +16,8 @@ namespace network { //! //! that way, any object containing a tcp_client has an attribute (or through its attributes) //! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(void) -: m_io_service(io_service::get_instance()) +tcp_client::tcp_client(const std::shared_ptr& IO) +: m_io_service(IO ? IO : io_service::get_instance()) , m_fd(-1) , m_is_connected(false) , m_receive_handler(nullptr) diff --git a/sources/network/win_io_service.cpp b/sources/network/windows/io_service.cpp similarity index 98% rename from sources/network/win_io_service.cpp rename to sources/network/windows/io_service.cpp index a199103e..80140d6c 100644 --- a/sources/network/win_io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -1,4 +1,4 @@ -#include "cpp_redis/network/win_io_service.hpp" +#include "cpp_redis/network/windows/io_service.hpp" #include "cpp_redis/redis_error.hpp" namespace cpp_redis { @@ -19,7 +19,7 @@ io_service::io_service(size_t max_worker_threads) SYSTEM_INFO info; ::GetSystemInfo(&info); m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); - + if (m_worker_thread_pool_size > max_worker_threads) m_worker_thread_pool_size = max_worker_threads; @@ -44,11 +44,11 @@ io_service::~io_service(void) shutdown(); } -void +void io_service::shutdown() { m_should_stop = true; - + //Iterate all of our sockets and shutdown any IO worker threads by posting a issuing a special //message to the thread to tell them to wake up and shut down. io_context_info* pInfo = NULL; @@ -78,14 +78,14 @@ io_service::shutdown() //! add or remove a given socket from the io service //! untrack should never be called from inside a callback -void +void io_service::track(SOCKET sock, const disconnection_handler_t& handler) { std::lock_guard lock(m_socket_mutex); - + //Add the socket to our map and return the allocated struct auto& info = m_sockets[sock]; - + info.hsock = sock; info.disconnection_handler = handler; @@ -96,7 +96,7 @@ io_service::track(SOCKET sock, const disconnection_handler_t& handler) } } -void +void io_service::untrack(SOCKET sock) { auto sock_it = m_sockets.find(sock); @@ -104,7 +104,7 @@ io_service::untrack(SOCKET sock) return; auto& sockinfo = sock_it->second; - //Wait until the posted i/o has completed. + //Wait until the posted i/o has completed. //while (m_completion_port && !HasOverlappedIoCompleted((LPOVERLAPPED)&sockinfo.io_info.overlapped)) // std::this_thread::sleep_for(std::chrono::milliseconds(200)); @@ -134,7 +134,7 @@ io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_ buffRecv.buf = sockinfo.read_buffer->data(); buffRecv.len = read_size; - + //We need a new overlapped struct for EACH overlapped operation. //we reuse them over and over. io_context_info* p_io_info = sockinfo.get_pool_io_context(); @@ -256,8 +256,8 @@ io_service::process_io(void) psock_info->disconnection_handler(*this); continue; } - - // determine what type of IO packet has completed by checking the io_context_info + + // determine what type of IO packet has completed by checking the io_context_info // associated with this io operation. This will determine what action to take. switch (e_op) { diff --git a/sources/network/win_tcp_client.cpp b/sources/network/windows/tcp_client.cpp similarity index 98% rename from sources/network/win_tcp_client.cpp rename to sources/network/windows/tcp_client.cpp index 15979da7..feba659d 100644 --- a/sources/network/win_tcp_client.cpp +++ b/sources/network/windows/tcp_client.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include namespace cpp_redis { @@ -17,8 +17,8 @@ namespace network { //! //! that way, any object containing a tcp_client has an attribute (or through its attributes) //! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(const std::shared_ptr pIO) -: m_io_service(pIO?pIO:io_service::get_instance()) +tcp_client::tcp_client(const std::shared_ptr& IO) +: m_io_service(IO ? IO : io_service::get_instance()) , m_sock(-1) , m_is_connected(false) , m_receive_handler(nullptr) @@ -52,7 +52,7 @@ tcp_client::connect(const std::string& host, unsigned int port, throw redis_error("Can't open a socket"); //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. - //The advantage is performance because we save a buffer copy between the TCP stack buffer + //The advantage is performance because we save a buffer copy between the TCP stack buffer //and our user buffer for each I/O call. //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero))) { @@ -107,7 +107,7 @@ tcp_client::disconnect(void) { closesocket(m_sock); m_sock = INVALID_SOCKET; - + clear_buffer(); reset_state(); @@ -160,7 +160,7 @@ tcp_client::async_read(void) { if (m_receive_handler) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls receive_handler"); - + if (!m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { @@ -169,7 +169,7 @@ tcp_client::async_read(void) { return; } } - + //! clear read buffer and re-issue async read to receive more incoming bytes m_read_buffer.clear(); diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index f1dafe8f..08536905 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -1,15 +1,10 @@ #include #include -#ifdef _MSC_VER -#include -#else -#include -#endif - namespace cpp_redis { -redis_client::redis_client(const std::shared_ptr pIO) : m_client(pIO){ +redis_client::redis_client(const std::shared_ptr& IO) +: m_client(IO) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_client created"); } From 7dc2d30c8531407b0b07adbf2ea773a578d3f696 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Wed, 12 Oct 2016 22:43:31 +0200 Subject: [PATCH 3/7] merge master branch --- .travis.yml | 2 +- CMakeLists.txt | 33 ++++++++++++++----- examples/CMakeLists.txt | 13 ++------ .../cpp_redis/network/windows/io_service.hpp | 6 ++-- tests/CMakeLists.txt | 7 ++-- 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/.travis.yml b/.travis.yml index 32f90934..f4f6adfd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,4 +23,4 @@ install: - if [ "$CXX" = "g++" ]; then export CXX="g++-4.8" CC="gcc-4.8"; fi - ./install_deps.sh -script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./target/cpp_redis_tests +script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./bin/cpp_redis_tests diff --git a/CMakeLists.txt b/CMakeLists.txt index f48e024d..f3799dbb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,11 @@ project(${PROJECT} CXX) ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") +IF (WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 -D '_UNICODE' -D 'UNICODE' -D 'WIN32_LEAN_AND_MEAN'") +ELSE () + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") +ENDIF (WIN32) ### @@ -63,27 +67,38 @@ foreach(dir ${SRC_DIRS}) endforeach() +### +# outputs +### +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) + + ### # executable ### -add_library(${PROJECT} SHARED ${SOURCES}) +add_library(${PROJECT} STATIC ${SOURCES}) target_link_libraries(${PROJECT} pthread) + +IF (READ_SIZE) set_target_properties(${PROJECT} PROPERTIES - LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") + COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}") +ENDIF (READ_SIZE) -IF (CPP_REDIS_READ_SIZE) +IF (NO_LOGGING) set_target_properties(${PROJECT} PROPERTIES - COMPILE_DEFINITIONS "CPP_REDIS_READ_SIZE=${CPP_REDIS_READ_SIZE}") -ENDIF (CPP_REDIS_READ_SIZE) - + COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") +ENDIF (NO_LOGGING) ### # install ### -install (TARGETS ${PROJECT} DESTINATION lib) -install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include) +install (DIRECTORY ${CMAKE_BINARY_DIR}/lib/ DESTINATION lib USE_SOURCE_PERMISSIONS) +install (DIRECTORY ${CMAKE_BINARY_DIR}/bin/ DESTINATION bin USE_SOURCE_PERMISSIONS) +install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include USE_SOURCE_PERMISSIONS) ### diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cc906fb2..344d45ee 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,7 +1,9 @@ ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +IF (NOT WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +ENDIF (NOT WIN32) ### @@ -16,18 +18,9 @@ include_directories(${PROJECT_SOURCE_DIR}/includes ### add_executable(redis_client redis_client.cpp) target_link_libraries(redis_client cpp_redis) -set_target_properties(redis_client - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") add_executable(redis_subscriber redis_subscriber.cpp) target_link_libraries(redis_subscriber cpp_redis) -set_target_properties(redis_subscriber - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") add_executable(logger logger.cpp) target_link_libraries(logger cpp_redis) -set_target_properties(logger - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp index 5c57b464..0d2f3c5d 100644 --- a/includes/cpp_redis/network/windows/io_service.hpp +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -8,7 +8,7 @@ #include -#define MAX_BUFF_SIZE CPP_REDIS_READ_SIZE +#define MAX_BUFF_SIZE __CPP_REDIS_READ_SIZE #define MAX_WORKER_THREADS 16 namespace cpp_redis { @@ -81,7 +81,7 @@ class io_service { SOCKET hsock; std::size_t sent_bytes; - //Must protect the members of our structure from access by multiple threads during IO Completion + //Must protect the members of our structure from access by multiple threads during IO Completion std::recursive_mutex sock_info_mutex; //We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations @@ -101,7 +101,7 @@ class io_service { io_context_info* get_pool_io_context() { io_context_info* pInfo = NULL; std::lock_guard socklock(sock_info_mutex); - if (!io_contexts_pool.empty()) + if (!io_contexts_pool.empty()) { pInfo = io_contexts_pool.back(); io_contexts_pool.pop_back(); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 02a67e0c..4a9eca18 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,7 +8,9 @@ project(${PROJECT} CXX) ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +IF (NOT WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +ENDIF (NOT WIN32) ### @@ -37,6 +39,3 @@ endforeach() ### add_executable(${PROJECT} ${SOURCES}) target_link_libraries(${PROJECT} cpp_redis gtest pthread) -set_target_properties(${PROJECT} - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") From 844894dc9d7f5b0b4cf3a2798ea0abb64bca6191 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Wed, 12 Oct 2016 23:33:46 +0200 Subject: [PATCH 4/7] fix windows compilaton --- CMakeLists.txt | 14 ++++++++++++-- sources/builders/integer_builder.cpp | 2 ++ sources/network/redis_connection.cpp | 2 +- tests/CMakeLists.txt | 9 ++++++++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f3799dbb..db8f6bc2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ cmake_minimum_required(VERSION 2.8.7) set(CMAKE_MACOSX_RPATH 1) + ### # verbose make ### @@ -21,7 +22,10 @@ project(${PROJECT} CXX) # compilation options ### IF (WIN32) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2 -D '_UNICODE' -D 'UNICODE' -D 'WIN32_LEAN_AND_MEAN'") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2") + add_definitions(-D_UNICODE) + add_definitions(-DUNICODE) + add_definitions(-DWIN32_LEAN_AND_MEAN) ELSE () set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") ENDIF (WIN32) @@ -79,7 +83,12 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) # executable ### add_library(${PROJECT} STATIC ${SOURCES}) -target_link_libraries(${PROJECT} pthread) + +IF (WIN32) + target_link_libraries(${PROJECT} ws2_32) +ELSE () + target_link_libraries(${PROJECT} pthread) +ENDIF (WIN32) IF (READ_SIZE) set_target_properties(${PROJECT} @@ -93,6 +102,7 @@ set_target_properties(${PROJECT} COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") ENDIF (NO_LOGGING) + ### # install ### diff --git a/sources/builders/integer_builder.cpp b/sources/builders/integer_builder.cpp index 2915d7c3..c7494dbf 100644 --- a/sources/builders/integer_builder.cpp +++ b/sources/builders/integer_builder.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 6544a703..2e832126 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -86,7 +86,7 @@ redis_connection::tcp_client_receive_handler(network::tcp_client&, const std::ve __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection receives packet, attempts to build reply"); m_builder << std::string(buffer.begin(), buffer.end()); } - catch (const redis_error& e) { + catch (const redis_error&) { __CPP_REDIS_LOG(error, "cpp_redis::network::redis_connection could not build reply (invalid format), disconnecting"); if (m_disconnection_handler) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4a9eca18..439d2e16 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -38,4 +38,11 @@ endforeach() # executable ### add_executable(${PROJECT} ${SOURCES}) -target_link_libraries(${PROJECT} cpp_redis gtest pthread) + +target_link_libraries(${PROJECT} cpp_redis gtest) + +IF (WIN32) + target_link_libraries(${PROJECT} ws2_32) +ELSE () + target_link_libraries(${PROJECT} pthread) +ENDIF (WIN32) From fd05ec028e29ecd0b3b126cdaf96ab2a66fdd6c5 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Thu, 13 Oct 2016 00:59:07 +0200 Subject: [PATCH 5/7] fix concurrency issues in the windows tcp_client/io_service --- .../cpp_redis/network/windows/tcp_client.hpp | 3 ++- sources/network/windows/io_service.cpp | 21 +++++++++---------- sources/network/windows/tcp_client.cpp | 13 ++++++------ 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/includes/cpp_redis/network/windows/tcp_client.hpp b/includes/cpp_redis/network/windows/tcp_client.hpp index 3e4e0496..24e235d7 100644 --- a/includes/cpp_redis/network/windows/tcp_client.hpp +++ b/includes/cpp_redis/network/windows/tcp_client.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -69,7 +70,7 @@ class tcp_client { //! buffers static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE; std::vector m_read_buffer; - std::vector m_write_buffer; + std::list> m_write_buffer; //! handlers receive_handler_t m_receive_handler; diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp index 80140d6c..fc0a3959 100644 --- a/sources/network/windows/io_service.cpp +++ b/sources/network/windows/io_service.cpp @@ -99,15 +99,6 @@ io_service::track(SOCKET sock, const disconnection_handler_t& handler) void io_service::untrack(SOCKET sock) { - auto sock_it = m_sockets.find(sock); - if (sock_it == m_sockets.end()) - return; - auto& sockinfo = sock_it->second; - - //Wait until the posted i/o has completed. - //while (m_completion_port && !HasOverlappedIoCompleted((LPOVERLAPPED)&sockinfo.io_info.overlapped)) - // std::this_thread::sleep_for(std::chrono::milliseconds(200)); - std::lock_guard lock(m_socket_mutex); m_sockets.erase(sock); } @@ -263,7 +254,11 @@ io_service::process_io(void) { case IO_OP_READ: // a read operation has completed //TODO Resize Read buffer to the size that we read? - psock_info->read_callback(io_size); + { + std::lock_guard lock(m_socket_mutex); + if (m_sockets.find(psock_info->hsock) != m_sockets.end()) + psock_info->read_callback(io_size); + } break; case IO_OP_WRITE: // a write operation has completed @@ -272,7 +267,11 @@ io_service::process_io(void) std::lock_guard socklock(psock_info->sock_info_mutex); psock_info->write_size -= io_size; } - psock_info->write_callback(io_size); + { + std::lock_guard lock(m_socket_mutex); + if (m_sockets.find(psock_info->hsock) != m_sockets.end()) + psock_info->write_callback(io_size); + } break; } //switch } //while diff --git a/sources/network/windows/tcp_client.cpp b/sources/network/windows/tcp_client.cpp index feba659d..d5e33c93 100644 --- a/sources/network/windows/tcp_client.cpp +++ b/sources/network/windows/tcp_client.cpp @@ -133,14 +133,12 @@ tcp_client::send(const std::vector& buffer) { std::lock_guard lock(m_write_buffer_mutex); - bool bytes_in_buffer = m_write_buffer.size() > 0; - //! concat buffer - m_write_buffer.insert(m_write_buffer.end(), buffer.begin(), buffer.end()); + m_write_buffer.push_back(buffer); //! if there were already bytes in buffer, simply return //! otherwise async_write calls itself recursively until the write buffer is empty - if (bytes_in_buffer) { + if (m_write_buffer.size() > 1) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client is already processing an async_write"); return ; } @@ -183,14 +181,17 @@ void tcp_client::async_write(void) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_write"); - m_io_service->async_write(m_sock, m_write_buffer, m_write_buffer.size(), + m_io_service->async_write(m_sock, m_write_buffer.front(), m_write_buffer.front().size(), [&](std::size_t length) { __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client wrote data and cleans write_buffer"); std::lock_guard lock(m_write_buffer_mutex); //Remove what has already been sent and see if we have any more to send - m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); + if (length >= m_write_buffer.front().size()) + m_write_buffer.pop_front(); + else + m_write_buffer.front().erase(m_write_buffer.front().begin(), m_write_buffer.front().begin() + length); //If we still have data to write the call ourselves recursivly until the buffer is completely sent if (m_is_connected && m_write_buffer.size()) From 0d470a99a3479799be918e34db246e10d2827e23 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Thu, 13 Oct 2016 01:01:30 +0200 Subject: [PATCH 6/7] little code cleaning: change const param into const-ref param --- includes/cpp_redis/redis_subscriber.hpp | 2 +- sources/redis_subscriber.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/includes/cpp_redis/redis_subscriber.hpp b/includes/cpp_redis/redis_subscriber.hpp index 2c615fdb..f71a3a4f 100644 --- a/includes/cpp_redis/redis_subscriber.hpp +++ b/includes/cpp_redis/redis_subscriber.hpp @@ -12,7 +12,7 @@ namespace cpp_redis { class redis_subscriber { public: //! ctor & dtor - redis_subscriber(const std::shared_ptr pIO = NULL); + redis_subscriber(const std::shared_ptr& IO = nullptr); ~redis_subscriber(void); //! copy ctor & assignment operator diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 74cb90ad..bc190ff2 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -4,8 +4,8 @@ namespace cpp_redis { -redis_subscriber::redis_subscriber(const std::shared_ptr pIO) : - m_client(pIO) { +redis_subscriber::redis_subscriber(const std::shared_ptr& IO) : + m_client(IO) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber created"); } From bd7e5749e1011dd63979af754dac42405e988563 Mon Sep 17 00:00:00 2001 From: Simon Ninon Date: Thu, 13 Oct 2016 01:10:15 +0200 Subject: [PATCH 7/7] remove .gitignore.bak file --- .gitignore.bak | 46 ---------------------------------------------- 1 file changed, 46 deletions(-) delete mode 100644 .gitignore.bak diff --git a/.gitignore.bak b/.gitignore.bak deleted file mode 100644 index 78a41448..00000000 --- a/.gitignore.bak +++ /dev/null @@ -1,46 +0,0 @@ -# Compiled Object files -*.slo -*.lo -*.o -*.obj - -# Precompiled Headers -*.gch -*.pch - -# Compiled Dynamic libraries -*.so -*.dylib -*.dll - -# Fortran module files -*.mod - -# Compiled Static libraries -*.lai -*.la -*.a -*.lib - -build -deps - -# Executables -*.exe -*.out -*.app -*.sln -*.vcxproj -*.suo -*.user -*.filters -*.VC.db -*.opendb -.gitignore -cmake_install.cmake - -rts-redis-test/ -CMakeCache.txt -CMakeFiles/ -Release/ -Debug/ \ No newline at end of file