From 983764fd93092c45008d628b894b8e664556e7bb Mon Sep 17 00:00:00 2001 From: MikesAracade Date: Tue, 11 Oct 2016 15:53:10 -0500 Subject: [PATCH] windows port stage 1 --- .gitignore | 19 +- examples/redis_client.cpp | 4 +- includes/cpp_redis/network/io_service.hpp | 112 ++-- .../cpp_redis/network/redis_connection.hpp | 2 +- includes/cpp_redis/network/tcp_client.hpp | 10 +- includes/cpp_redis/redis_client.hpp | 6 +- sources/builders/array_builder.cpp | 12 +- sources/builders/bulk_string_builder.cpp | 6 +- sources/builders/integer_builder.cpp | 8 +- sources/builders/reply_builder.cpp | 8 +- sources/network/io_service.cpp | 230 -------- sources/network/redis_connection.cpp | 3 +- sources/network/win_io_service.cpp | 547 ++++++++++++++++++ .../{tcp_client.cpp => win_tcp_client.cpp} | 63 +- sources/redis_client.cpp | 12 +- sources/redis_subscriber.cpp | 16 +- sources/reply.cpp | 8 +- tests/sources/spec/redis_client_spec.cpp | 10 +- tests/sources/spec/redis_subscriber_spec.cpp | 26 +- 19 files changed, 754 insertions(+), 348 deletions(-) delete mode 100644 sources/network/io_service.cpp create mode 100644 sources/network/win_io_service.cpp rename sources/network/{tcp_client.cpp => win_tcp_client.cpp} (62%) diff --git a/.gitignore b/.gitignore index 6854a22f..78a41448 100644 --- a/.gitignore +++ b/.gitignore @@ -22,10 +22,25 @@ *.a *.lib +build +deps + # Executables *.exe *.out *.app +*.sln +*.vcxproj +*.suo +*.user +*.filters +*.VC.db +*.opendb +.gitignore +cmake_install.cmake -build -deps +rts-redis-test/ +CMakeCache.txt +CMakeFiles/ +Release/ +Debug/ \ No newline at end of file diff --git a/examples/redis_client.cpp b/examples/redis_client.cpp index 5f1083fd..c1fcee7f 100644 --- a/examples/redis_client.cpp +++ b/examples/redis_client.cpp @@ -40,7 +40,7 @@ main(void) { // client.sync_commit(std::chrono::milliseconds(100)); signal(SIGINT, &sigint_handler); - while (not should_exit); - + while (!should_exit); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 0; } diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/io_service.hpp index 462bba85..680147ac 100644 --- a/includes/cpp_redis/network/io_service.hpp +++ b/includes/cpp_redis/network/io_service.hpp @@ -6,24 +6,38 @@ #include #include +#ifndef _MSC_VER #include #include #include +#else +#include + +#define MAX_BUFF_SIZE CPP_REDIS_READ_SIZE +#define MAX_WORKER_THREADS 32 +#endif namespace cpp_redis { namespace network { +typedef enum _enIoOperation { + //IO_OP_ACCEPT, + IO_OP_READ, + IO_OP_WRITE + } enIoOperation; + + class io_service { public: //! instance getter (singleton pattern) static io_service& get_instance(void); - -private: - //! ctor & dtor - io_service(void); + io_service(size_t max_worker_threads = MAX_WORKER_THREADS); ~io_service(void); + void shutdown(); + +private: //! copy ctor & assignment operator io_service(const io_service&) = delete; io_service& operator=(const io_service&) = delete; @@ -32,42 +46,64 @@ class io_service { //! disconnection handler declaration typedef std::function disconnection_handler_t; - //! add or remove a given fd from the io service + //! add or remove a given socket from the io service //! untrack should never be called from inside a callback - void track(int fd, const disconnection_handler_t& handler); - void untrack(int fd); + void track(SOCKET sock, const disconnection_handler_t& handler); + void untrack(SOCKET sock); //! asynchronously read read_size bytes and append them to the given buffer //! on completion, call the read_callback to notify of the success or failure of the operation - //! return false if another async_read operation is in progress or fd is not registered + //! return false if another async_read operation is in progress or socket is not registered typedef std::function read_callback_t; - bool async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); + bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); //! asynchronously write write_size bytes from buffer to the specified fd //!on completion, call the write_callback to notify of the success or failure of the operation //! return false if another async_write operation is in progress or fd is not registered typedef std::function write_callback_t; - bool async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); private: - //! simple struct to keep track of ongoing operations on a given fd - struct fd_info { + +#ifdef _WIN32 + struct io_context_info { + WSAOVERLAPPED overlapped; + enIoOperation eOperation; + }; +#endif + +//! simple struct to keep track of ongoing operations on a given sockeet + struct sock_info { +#ifdef _WIN32 + SOCKET hsock; + std::size_t sent_bytes; + + //Must protect the members of our structure from access by multiple threads during IO Completion + std::recursive_mutex sock_info_mutex; +#endif disconnection_handler_t disconnection_handler; - std::atomic_bool async_read; + std::atomic_bool async_read; std::vector* read_buffer; - std::size_t read_size; - read_callback_t read_callback; + std::size_t read_size; + read_callback_t read_callback; + + std::atomic_bool async_write; + std::vector write_buffer; + std::size_t write_size; + write_callback_t write_callback; - std::atomic_bool async_write; - std::vector write_buffer; - std::size_t write_size; - write_callback_t write_callback; +#ifdef _WIN32 + io_context_info io_info; +#endif }; +typedef std::function callback_t; + +#ifndef _WIN32 private: - //! listen for incoming events and notify - void listen(void); + //! wait for incoming events and notify + void wait_for_events(void); //! notify the select call so that it can wake up to process new events void notify_select(void); @@ -77,30 +113,36 @@ class io_service { int init_sets(fd_set* rd_set, fd_set* wr_set); void process_sets(fd_set* rd_set, fd_set* wr_set); - typedef std::function callback_t; - callback_t read_fd(int fd); - callback_t write_fd(int fd); + callback_t read_sock(SOCKET sock); + callback_t write_sock(SOCKET sock); +#else + //! wait for incoming events and notify + int process_io(void); + + HANDLE m_completion_port; + unsigned int m_worker_thread_pool_size; + std::vector m_worker_threads; //vector containing all the threads we start to service our i/o requests +#endif private: //! whether the worker should terminate or not std::atomic_bool m_should_stop; - //! worker in the background, listening for events - std::thread m_worker; - - //! tracked fds - std::unordered_map m_fds; + //! tracked sockets + std::unordered_map m_sockets; - //! fd associated to the pipe used to wake up the select call - int m_notif_pipe_fds[2]; +#ifndef _WIN32 + //! socket associated used to wake up the select call + SOCKET m_notify_socket; +#endif - //! mutex to protect m_fds access against race condition + //! mutex to protect m_notify_socket access against race condition //! - //! specific mutex for untrack: we dont want someone to untrack a fd while we process it + //! specific mutex for untrack: we dont want someone to untrack a socket while we process it //! this behavior could cause some issues when executing callbacks in another thread - //! for example, obj is destroyed, in its dtor it untracks the fd, but at the same time + //! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time //! a callback is executed from within another thread: the untrack mutex avoid this without being costly - std::recursive_mutex m_fds_mutex; + std::recursive_mutex m_socket_mutex; }; } //! network diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 7fa92002..157298f1 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -15,7 +15,7 @@ namespace network { class redis_connection { public: //! ctor & dtor - redis_connection(void); + redis_connection(network::io_service* pIO=NULL); ~redis_connection(void) = default; //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/tcp_client.hpp index 13899bef..71b199eb 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/tcp_client.hpp @@ -23,7 +23,7 @@ namespace network { class tcp_client { public: //! ctor & dtor - tcp_client(void); + tcp_client(io_service* pIO=NULL); ~tcp_client(void); //! assignment operator & copy ctor @@ -46,7 +46,7 @@ class tcp_client { void send(const std::vector& buffer); private: - //! make boost asio async read and write operations + //! make async read and write operations void async_read(void); void async_write(void); @@ -57,10 +57,10 @@ class tcp_client { private: //! io service instance - io_service& m_io_service; + io_service* m_pio_service; - //! socket fd - int m_fd; + //! socket + SOCKET m_sock; //! is connected std::atomic_bool m_is_connected; diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 0f9a2b7e..6a2b306c 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -14,8 +14,8 @@ namespace cpp_redis { class redis_client { public: //! ctor & dtor - redis_client(void); - ~redis_client(void) = default; + redis_client(network::io_service* pIO=NULL); + ~redis_client(void); //! copy ctor & assignment operator redis_client(const redis_client&) = delete; @@ -42,7 +42,7 @@ class redis_client { try_commit(); std::unique_lock lock_callback(m_callbacks_mutex); - m_sync_condvar.wait_for(lock_callback, timeout, [=]{ return m_callbacks_running == 0 and m_callbacks.empty(); }); + m_sync_condvar.wait_for(lock_callback, timeout, [=]{ return m_callbacks_running == 0 && m_callbacks.empty(); }); return *this; } diff --git a/sources/builders/array_builder.cpp b/sources/builders/array_builder.cpp index 814c4a6e..24c5e640 100644 --- a/sources/builders/array_builder.cpp +++ b/sources/builders/array_builder.cpp @@ -17,7 +17,7 @@ array_builder::fetch_array_size(std::string& buffer) { return true; m_int_builder << buffer; - if (not m_int_builder.reply_ready()) + if (!m_int_builder.reply_ready()) return false; int size = m_int_builder.get_integer(); @@ -33,13 +33,13 @@ array_builder::fetch_array_size(std::string& buffer) { bool array_builder::build_row(std::string& buffer) { - if (not m_current_builder) { + if (!m_current_builder) { m_current_builder = create_builder(buffer.front()); buffer.erase(0, 1); } *m_current_builder << buffer; - if (not m_current_builder->reply_ready()) + if (!m_current_builder->reply_ready()) return false; m_reply << m_current_builder->get_reply(); @@ -56,11 +56,11 @@ array_builder::operator<<(std::string& buffer) { if (m_reply_ready) return *this; - if (not fetch_array_size(buffer)) + if (!fetch_array_size(buffer)) return *this; - while (buffer.size() and not m_reply_ready) - if (not build_row(buffer)) + while (buffer.size() && !m_reply_ready) + if (!build_row(buffer)) return *this; return *this; diff --git a/sources/builders/bulk_string_builder.cpp b/sources/builders/bulk_string_builder.cpp index 575241a0..56937093 100644 --- a/sources/builders/bulk_string_builder.cpp +++ b/sources/builders/bulk_string_builder.cpp @@ -27,7 +27,7 @@ bulk_string_builder::fetch_size(std::string& buffer) { return true; m_int_builder << buffer; - if (not m_int_builder.reply_ready()) + if (!m_int_builder.reply_ready()) return false; m_str_size = m_int_builder.get_integer(); @@ -44,7 +44,7 @@ bulk_string_builder::fetch_str(std::string& buffer) { if (buffer.size() < static_cast(m_str_size) + 2) // also wait for end sequence return ; - if (buffer[m_str_size] != '\r' or buffer[m_str_size + 1] != '\n') + if (buffer[m_str_size] != '\r' || buffer[m_str_size + 1] != '\n') throw redis_error("Wrong ending sequence"); m_str = buffer.substr(0, m_str_size); @@ -58,7 +58,7 @@ bulk_string_builder::operator<<(std::string& buffer) { return *this; //! if we don't have the size, try to get it with the current buffer - if (not fetch_size(buffer) or m_reply_ready) + if (!fetch_size(buffer) || m_reply_ready) return *this; fetch_str(buffer); diff --git a/sources/builders/integer_builder.cpp b/sources/builders/integer_builder.cpp index cd400b21..a600a575 100644 --- a/sources/builders/integer_builder.cpp +++ b/sources/builders/integer_builder.cpp @@ -22,11 +22,15 @@ integer_builder::operator<<(std::string& buffer) { unsigned int i; for (i = 0; i < end_sequence; i++) { //! check for negative numbers - if (not i and m_negative_multiplicator == 1 and buffer[i] == '-') { + if (!i && m_negative_multiplicator == 1 && buffer[i] == '-') { m_negative_multiplicator = -1; continue; } - else if (not std::isdigit(buffer[i])) +#ifdef _MSC_VER + else if (!isdigit(buffer[i])) +#else + else if (!std::isdigit(buffer[i])) +#endif throw redis_error("Invalid character for integer redis reply"); m_nbr *= 10; diff --git a/sources/builders/reply_builder.cpp b/sources/builders/reply_builder.cpp index 8a5166a2..1a5652f4 100644 --- a/sources/builders/reply_builder.cpp +++ b/sources/builders/reply_builder.cpp @@ -20,10 +20,10 @@ reply_builder::operator<<(const std::string& data) { bool reply_builder::build_reply(void) { - if (not m_buffer.size()) + if (!m_buffer.size()) return false; - if (not m_builder) { + if (!m_builder) { m_builder = create_builder(m_buffer.front()); m_buffer.erase(0, 1); } @@ -47,7 +47,7 @@ reply_builder::operator>>(reply& reply) { const reply& reply_builder::get_front(void) const { - if (not reply_available()) + if (!reply_available()) throw redis_error("No available reply"); return m_available_replies.front(); @@ -55,7 +55,7 @@ reply_builder::get_front(void) const { void reply_builder::pop_front(void) { - if (not reply_available()) + if (!reply_available()) throw redis_error("No available reply"); m_available_replies.pop_front(); diff --git a/sources/network/io_service.cpp b/sources/network/io_service.cpp deleted file mode 100644 index 0eab9790..00000000 --- a/sources/network/io_service.cpp +++ /dev/null @@ -1,230 +0,0 @@ -#include "cpp_redis/network/io_service.hpp" -#include "cpp_redis/redis_error.hpp" - -#include - -namespace cpp_redis { - -namespace network { - -io_service& -io_service::get_instance(void) { - static io_service instance; - return instance; -} - -io_service::io_service(void) -: m_should_stop(false) -{ - if (pipe(m_notif_pipe_fds) == -1) - throw cpp_redis::redis_error("Could not init cpp_redis::io_service, pipe() failure"); - - int flags = fcntl(m_notif_pipe_fds[1], F_GETFL, 0); - if (flags == -1 or fcntl(m_notif_pipe_fds[1], F_SETFL, flags | O_NONBLOCK) == -1) - throw cpp_redis::redis_error("Could not init cpp_redis::io_service, fcntl() failure"); - - m_worker = std::thread(&io_service::listen, this); -} - -io_service::~io_service(void) { - m_should_stop = true; - notify_select(); - - m_worker.join(); - - close(m_notif_pipe_fds[0]); - close(m_notif_pipe_fds[1]); -} - -int -io_service::init_sets(fd_set* rd_set, fd_set* wr_set) { - int max_fd = m_notif_pipe_fds[0]; - - FD_ZERO(rd_set); - FD_ZERO(wr_set); - FD_SET(m_notif_pipe_fds[0], rd_set); - - std::lock_guard lock(m_fds_mutex); - for (const auto& fd : m_fds) { - if (fd.second.async_read) - FD_SET(fd.first, rd_set); - - if (fd.second.async_write) - FD_SET(fd.first, wr_set); - - if ((fd.second.async_read or fd.second.async_write) and fd.first > max_fd) - max_fd = fd.first; - } - - return max_fd; -} - -io_service::callback_t -io_service::read_fd(int fd) { - std::lock_guard lock(m_fds_mutex); - - auto fd_it = m_fds.find(fd); - if (fd_it == m_fds.end()) - return nullptr; - - auto& buffer = *fd_it->second.read_buffer; - int original_buffer_size = buffer.size(); - buffer.resize(original_buffer_size + fd_it->second.read_size); - - int nb_bytes_read = recv(fd_it->first, buffer.data() + original_buffer_size, fd_it->second.read_size, 0); - fd_it->second.async_read = false; - - if (nb_bytes_read <= 0) { - buffer.resize(original_buffer_size); - fd_it->second.disconnection_handler(*this); - m_fds.erase(fd_it); - - return nullptr; - } - else { - buffer.resize(original_buffer_size + nb_bytes_read); - - return std::bind(fd_it->second.read_callback, nb_bytes_read); - } -} - -io_service::callback_t -io_service::write_fd(int fd) { - std::lock_guard lock(m_fds_mutex); - - auto fd_it = m_fds.find(fd); - if (fd_it == m_fds.end()) - return nullptr; - - int nb_bytes_written = send(fd_it->first, fd_it->second.write_buffer.data(), fd_it->second.write_size, 0); - fd_it->second.async_write = false; - - if (nb_bytes_written <= 0) { - fd_it->second.disconnection_handler(*this); - m_fds.erase(fd_it); - - return nullptr; - } - else - return std::bind(fd_it->second.write_callback, nb_bytes_written); -} - -void -io_service::process_sets(fd_set* rd_set, fd_set* wr_set) { - std::vector fds_to_read; - std::vector fds_to_write; - - //! Quickly fetch fds that are readable/writeable - //! This reduce lock time and avoid possible deadlock in callbacks - { - std::lock_guard lock(m_fds_mutex); - - for (const auto& fd : m_fds) { - if (fd.second.async_read and FD_ISSET(fd.first, rd_set)) - fds_to_read.push_back(fd.first); - - if (fd.second.async_write and FD_ISSET(fd.first, wr_set)) - fds_to_write.push_back(fd.first); - } - } - - for (int fd : fds_to_read) { - auto callback = read_fd(fd); - if (callback) - callback(); - } - for (int fd : fds_to_write) { - auto callback = write_fd(fd); - if (callback) - callback(); - } - - if (FD_ISSET(m_notif_pipe_fds[0], rd_set)) { - char buf[1024]; - (void)read(m_notif_pipe_fds[0], buf, 1024); - } -} - -void -io_service::listen(void) { - fd_set rd_set; - fd_set wr_set; - - while (not m_should_stop) { - int max_fd = init_sets(&rd_set, &wr_set); - - if (select(max_fd + 1, &rd_set, &wr_set, nullptr, nullptr) > 0) - process_sets(&rd_set, &wr_set); - } -} - -void -io_service::track(int fd, const disconnection_handler_t& handler) { - std::lock_guard lock(m_fds_mutex); - - auto& info = m_fds[fd]; - info.async_read = false; - info.async_write = false; - info.disconnection_handler = handler; - - notify_select(); -} - -void -io_service::untrack(int fd) { - std::lock_guard lock(m_fds_mutex); - m_fds.erase(fd); -} - -bool -io_service::async_read(int fd, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { - std::lock_guard lock(m_fds_mutex); - - auto reg_fd_it = m_fds.find(fd); - if (reg_fd_it == m_fds.end()) - return false; - - auto& reg_fd = reg_fd_it->second; - bool expected = false; - if (not reg_fd.async_read.compare_exchange_strong(expected, true)) - return false; - - reg_fd.read_buffer = &buffer; - reg_fd.read_size = read_size; - reg_fd.read_callback = callback; - - notify_select(); - - return true; -} - -bool -io_service::async_write(int fd, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { - std::lock_guard lock(m_fds_mutex); - - auto reg_fd_it = m_fds.find(fd); - if (reg_fd_it == m_fds.end()) - return false; - - auto& reg_fd = reg_fd_it->second; - bool expected = false; - if (not reg_fd.async_write.compare_exchange_strong(expected, true)) - return false; - - reg_fd.write_buffer = buffer; - reg_fd.write_size = write_size; - reg_fd.write_callback = callback; - - notify_select(); - - return true; -} - -void -io_service::notify_select(void) { - (void)write(m_notif_pipe_fds[1], "a", 1); -} - -} //! network - -} //! cpp_redis diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 52f52b20..5f7ba9bc 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -4,9 +4,10 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(void) +redis_connection::redis_connection(network::io_service* pIO/*=NULL*/) : m_reply_callback(nullptr) , m_disconnection_handler(nullptr) +, m_client(pIO) {} void diff --git a/sources/network/win_io_service.cpp b/sources/network/win_io_service.cpp new file mode 100644 index 00000000..533e9551 --- /dev/null +++ b/sources/network/win_io_service.cpp @@ -0,0 +1,547 @@ +#include "cpp_redis/network/io_service.hpp" +#include "cpp_redis/redis_error.hpp" + +#ifdef _MSC_VER +#include //needed for fcntl, open etc. +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings +#endif + +namespace cpp_redis { + +namespace network { + +io_service& + io_service::get_instance(void) { + static io_service instance; + return instance; +} + +#ifdef _MSC_VER +io_service::io_service(size_t max_worker_threads /*= MAX_WORKER_THREADS*/) + : m_should_stop(false) +{ + //Determine the size of the thread pool dynamically. + //2 * number of processors in the system is our rule here. + SYSTEM_INFO info; + ::GetSystemInfo(&info); + m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); + + if (m_worker_thread_pool_size > MAX_WORKER_THREADS) + m_worker_thread_pool_size = MAX_WORKER_THREADS; + + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); + + //Create completion port. Pass 0 for parameter 4 to allow as many threads as there are processors in the system + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);; + if( INVALID_HANDLE_VALUE == m_completion_port) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); + + //Now startup worker thread pool which will service our async io requests + for (unsigned int i=0; i lock(m_socket_mutex); + + //Add the socket to our map and return the allocated struct + auto& info = m_sockets[sock]; + + memset(&info.io_info.overlapped, 0, sizeof(OVERLAPPED)); + info.hsock = sock; + info.async_read = false; + info.async_write = false; + info.disconnection_handler = handler; + + //Associate the socket with our io completion port. + if( NULL == CreateIoCompletionPort((HANDLE)sock, m_completion_port, (DWORD_PTR)&m_sockets[sock], 0)) + { + printf("CreateIoCompletionPort() failed: %d\n", GetLastError()); + throw cpp_redis::redis_error("Track() failed to create CreateIoCompletionPort"); + } + + printf("Socket(%d) added to IOCP ok\n", sock); +} + +void +io_service::untrack(SOCKET sock) +{ + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return; + auto& sockinfo = sock_it->second; + + //Wait until the posted i/o has completed. + //while (m_completion_port && !HasOverlappedIoCompleted((LPOVERLAPPED)&sockinfo.io_info.overlapped)) + // std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + bool expected = false; + if (!sockinfo.async_read.compare_exchange_strong(expected, true)) + return false; + + //Resize the buffer to the correct size if we need to. + if (buffer.size() < read_size) + buffer.resize(read_size); + + sockinfo.read_buffer = &buffer; + sockinfo.read_size = read_size; + sockinfo.read_callback = callback; + sockinfo.io_info.eOperation = IO_OP_READ; + + DWORD dwRecvNumBytes = 0; + DWORD dwFlags = 0; + WSABUF buffRecv; + + buffRecv.buf = sockinfo.read_buffer->data(); + buffRecv.len = sockinfo.read_size; + + int nRet = WSARecv(sock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &sockinfo.io_info.overlapped, NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf("WSARecv() failed: %d Closing socket\n", WSAGetLastError()); + + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + bool expected = false; + if (!sockinfo.async_write.compare_exchange_strong(expected, true)) + return false; + + sockinfo.write_buffer = buffer; + sockinfo.write_size = write_size; + sockinfo.write_callback = callback; + sockinfo.io_info.eOperation = IO_OP_WRITE; + + auto& sbuffer = sockinfo.write_buffer; + + WSABUF buffSend; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + buffSend.buf = sockinfo.write_buffer.data(); + buffSend.len = write_size; + + int nRet = WSASend(sock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(sockinfo.io_info.overlapped), NULL); + if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf("WSASend() failed: %d\n", WSAGetLastError()); + + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +//function used by worker thread(s) used to process io requests +int +io_service::process_io(void) +{ + BOOL bSuccess = FALSE; + int nRet = 0; + LPWSAOVERLAPPED pOverlapped = NULL; + sock_info* pPerSocketInfo = NULL; + + WSABUF buffRecv; + WSABUF buffSend; + DWORD dwRecvNumBytes = 0; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + DWORD io_size = 0; + + while (!m_should_stop) + { + // continually loop to service io completion packets + pPerSocketInfo = NULL; + bSuccess = GetQueuedCompletionStatus(m_completion_port, &io_size, (PDWORD_PTR)&pPerSocketInfo, (LPOVERLAPPED *)&pOverlapped, INFINITE); + if(!bSuccess && m_should_stop) + return 0; + + //assert(bSuccess==TRUE, "GetQueuedCompletionStatus failed."); + + // First check to see if an error has occurred on the socket and if so + // then close the socket and cleanup the SOCKET_INFORMATION structure + // associated with the socket + if (0 == io_size) + { + //Fire the disconnect handler. Check if stopped first because of some weird timing issues that may get us here. + if(bSuccess && false == m_should_stop && pPerSocketInfo) + pPerSocketInfo->disconnection_handler(*this); + continue; + } + + if (!pPerSocketInfo || !pOverlapped) + { + // Somebody used PostQueuedCompletionStatus to post an I/O packet with + // a NULL CompletionKey (or if we get one for any reason). It is time to exit. + return 0; + } + + //Lock our socket structure to prevent multiple access + std::lock_guard socklock(pPerSocketInfo->sock_info_mutex); + auto& io_info = pPerSocketInfo->io_info; + + // determine what type of IO packet has completed by checking the PER_IO_CONTEXT + // associated with this socket. This will determine what action to take. + switch (io_info.eOperation) + { + case IO_OP_READ: + // a read operation has completed + printf("WorkerThread %d: Socket(%d) Received %d bytes\n", GetCurrentThreadId(), pPerSocketInfo->hsock, io_size); + //TODO Resize Read buffer to the size read??? + pPerSocketInfo->read_callback(io_size); + break; + + case IO_OP_WRITE: + // a write operation has completed, determine if all the data intended to be + // sent actually was sent. + io_info.eOperation = IO_OP_WRITE; + pPerSocketInfo->sent_bytes += io_size; + dwFlags = 0; + + auto& buffer = pPerSocketInfo->write_buffer; + + // the previous write operation didn't send all the data, + // post another send to complete the operation + if (pPerSocketInfo->sent_bytes < pPerSocketInfo->write_size) + { + //Advance pointer past what we sent already and send the remaining. + size_t size = pPerSocketInfo->write_size; + buffSend.buf = buffer.data() + pPerSocketInfo->sent_bytes; + buffSend.len = pPerSocketInfo->write_size - pPerSocketInfo->sent_bytes; + + nRet = WSASend(pPerSocketInfo->hsock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(io_info.overlapped), NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf ("WSASend() failed: %d\n", WSAGetLastError()); + //Fire the disconnect handler + pPerSocketInfo->disconnection_handler(*this); + } + else + { + printf("WorkerThread %d: Socket(%d) Partial send %d of %d bytes. Posting another send\n", GetCurrentThreadId(), pPerSocketInfo->hsock, pPerSocketInfo->sent_bytes, size); + pPerSocketInfo->write_callback(io_size); + } + } + else + { + // previous write operation completed for this socket, post another recv + io_info.eOperation = IO_OP_READ; + dwRecvNumBytes = 0; + dwFlags = 0; + buffRecv.buf = pPerSocketInfo->read_buffer->data(); + buffRecv.len = pPerSocketInfo->read_size; + + int nRet = WSARecv(pPerSocketInfo->hsock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &io_info.overlapped, NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + printf ("WSARecv() failed. Error: %d\n", WSAGetLastError()); + //Fire the disconnect handler + pPerSocketInfo->disconnection_handler(*this); + } + else + { + printf("WorkerThread %d: Socket(%d) Sent %d bytes, Posted Recv\n", GetCurrentThreadId(), pPerSocketInfo->hsock, io_size); + pPerSocketInfo->write_callback(io_size); + } + } + break; + } //switch + } //while + + return 0; +} + +#else //End of Windows only code + +//Start of Linux code +io_service::io_service() +: m_should_stop(false) +{ +#ifdef _MSC_VER + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); +#endif + + m_notify_socket = socket(AF_INET, SOCK_STREAM, 0); + if(m_notify_socket < 0) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, socket() creation failure"); + + u_long ulValue = 1; + if (0 != ioctlsocket(m_notify_socket, FIONBIO, &ulValue)) //Set sockets to non blocking. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, ioctlsocket() failure"); + + m_worker = std::thread(&io_service::wait_for_events, this); +} + +io_service::~io_service(void) { + m_should_stop = true; + notify_select(); + + m_worker.join(); +} + +int +io_service::init_sets(fd_set* rd_set, fd_set* wr_set) { + SOCKET max_sock = m_notify_socket; + + FD_ZERO(rd_set); + FD_ZERO(wr_set); + FD_SET(m_notify_socket, rd_set); + + std::lock_guard lock(m_socket_mutex); + for (const auto& sock : m_sockets) { + if (sock.second.async_read) + FD_SET(sock.first, rd_set); + + if (sock.second.async_write) + FD_SET(sock.first, wr_set); + + if ((sock.second.async_read || sock.second.async_write) && sock.first > max_sock) + max_sock = sock.first; + } + + return max_sock; +} + +io_service::callback_t +io_service::read_sock(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return nullptr; + + auto& buffer = *sock_it->second.read_buffer; + int original_buffer_size = buffer.size(); + buffer.resize(original_buffer_size + sock_it->second.read_size); + + int nb_bytes_read = recv(sock_it->first, buffer.data() + original_buffer_size, sock_it->second.read_size, 0); + sock_it->second.async_read = false; + + if (nb_bytes_read <= 0) { + buffer.resize(original_buffer_size); + sock_it->second.disconnection_handler(*this); + m_sockets.erase(sock_it); + + return nullptr; + } + else { + buffer.resize(original_buffer_size + nb_bytes_read); + + return std::bind(sock_it->second.read_callback, nb_bytes_read); + } +} + +io_service::callback_t +io_service::write_sock(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return nullptr; + + int nb_bytes_written = send(sock_it->first, sock_it->second.write_buffer.data(), sock_it->second.write_size, 0); + sock_it->second.async_write = false; + + if (nb_bytes_written <= 0) { + sock_it->second.disconnection_handler(*this); + m_sockets.erase(sock_it); + + return nullptr; + } + else + return std::bind(sock_it->second.write_callback, nb_bytes_written); +} + +void +io_service::process_sets(fd_set* rd_set, fd_set* wr_set) { + std::vector socks_to_read; + std::vector socks_to_write; + + //! Quickly fetch fds that are readable/writeable + //! This reduce lock time and avoid possible deadlock in callbacks + { + std::lock_guard lock(m_socket_mutex); + + for (const auto& sock : m_sockets) { + if (sock.second.async_read && FD_ISSET(sock.first, rd_set)) + socks_to_read.push_back(sock.first); + + if (sock.second.async_write && FD_ISSET(sock.first, wr_set)) + socks_to_write.push_back(sock.first); + } + } + + for (SOCKET sock : socks_to_read) { + auto callback = read_sock(sock); + if (callback) + callback(); + } + for (SOCKET sock : socks_to_write) { + auto callback = write_sock(sock); + if (callback) + callback(); + } + + if (FD_ISSET(m_notify_socket, rd_set)) { + char buf[1024]; + (void)read(m_notify_socket, buf, 1024); + } +} + +void +io_service::wait_for_events(void) { + fd_set rd_set; + fd_set wr_set; + + while (!m_should_stop) { + int max_fd = init_sets(&rd_set, &wr_set); + + //Wait here until select returns and process any reads or writes that are pending. + if (select(max_fd + 1, &rd_set, &wr_set, nullptr, nullptr) > 0) + process_sets(&rd_set, &wr_set); + } +} + +void +io_service::track(SOCKET socket, const disconnection_handler_t& handler) { + std::lock_guard lock(m_socket_mutex); + + auto& info = m_sockets[socket]; + info.async_read = false; + info.async_write = false; + info.disconnection_handler = handler; + + notify_select(); +} + +void +io_service::untrack(SOCKET sock) { + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto reg_fd_it = m_sockets.find(sock); + if (reg_fd_it == m_sockets.end()) + return false; + + auto& reg_fd = reg_fd_it->second; + bool expected = false; + if (!reg_fd.async_read.compare_exchange_strong(expected, true)) + return false; + + reg_fd.read_buffer = &buffer; + reg_fd.read_size = read_size; + reg_fd.read_callback = callback; + + notify_select(); + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto reg_sock_it = m_sockets.find(sock); + if (reg_sock_it == m_sockets.end()) + return false; + + auto& reg_sock = reg_sock_it->second; + bool expected = false; + if (!reg_sock.async_write.compare_exchange_strong(expected, true)) + return false; + + reg_sock.write_buffer = buffer; + reg_sock.write_size = write_size; + reg_sock.write_callback = callback; + + notify_select(); + + return true; +} + +void +io_service::notify_select(void) { + (void)send(m_notify_socket, "a", 1, 0); +} +#endif + +} //! network +} //! cpp_redis diff --git a/sources/network/tcp_client.cpp b/sources/network/win_tcp_client.cpp similarity index 62% rename from sources/network/tcp_client.cpp rename to sources/network/win_tcp_client.cpp index 50e36460..9df4be17 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/win_tcp_client.cpp @@ -1,5 +1,7 @@ #include -#include + +#include //needed for fcntl, open etc. +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings #include #include "cpp_redis/network/tcp_client.hpp" @@ -15,9 +17,9 @@ namespace network { //! //! that way, any object containing a tcp_client has an attribute (or through its attributes) //! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(void) -: m_io_service(io_service::get_instance()) -, m_fd(-1) +tcp_client::tcp_client(io_service* pIO/*= NULL*/) +: m_pio_service((pIO?pIO:&io_service::get_instance())) +, m_sock(-1) , m_is_connected(false) , m_receive_handler(nullptr) , m_disconnection_handler(nullptr) @@ -37,13 +39,23 @@ tcp_client::connect(const std::string& host, unsigned int port, return throw cpp_redis::redis_error("Client already connected"); //! create the socket - m_fd = socket(AF_INET, SOCK_STREAM, 0); - if (m_fd < 0) - throw redis_error("Can't open a socket"); + int nZero = 0; + //Enable socket for overlapped i/o + m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); + if (m_sock < 0) + throw redis_error("Can't open a socket"); + + //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. + //The advantage is performance because we save a buffer copy between the TCP stack buffer + //and our user buffer for each I/O call. + //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! + if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero))) { + return throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to disable buffering"); + } //! get the server's DNS entry struct hostent *server = gethostbyname(host.c_str()); - if (not server) + if (!server) throw redis_error("No such host: " + host); //! build the server's Internet address @@ -54,13 +66,18 @@ tcp_client::connect(const std::string& host, unsigned int port, server_addr.sin_family = AF_INET; //! create a connection with the server - if (::connect(m_fd, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) + if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); - //! add fd to the io_service and set the disconnection & recv handlers + //! add sockeet to the io_service and set the disconnection & recv handlers m_disconnection_handler = disconnection_handler; m_receive_handler = receive_handler; - m_io_service.track(m_fd, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); + + u_long ulValue = 1; + if (0 != ioctlsocket(m_sock, FIONBIO, &ulValue)) //Set socket to non blocking. + throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to set socket to non-blocking"); + + m_pio_service->track(m_sock, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); m_is_connected = true; //! start async read @@ -69,12 +86,14 @@ tcp_client::connect(const std::string& host, unsigned int port, void tcp_client::disconnect(void) { - if (not m_is_connected) + if (!m_is_connected) return ; m_is_connected = false; - m_io_service.untrack(m_fd); - close(m_fd); + m_pio_service->untrack(m_sock); + + closesocket(m_sock); + clear_buffer(); } @@ -85,10 +104,10 @@ tcp_client::send(const std::string& buffer) { void tcp_client::send(const std::vector& buffer) { - if (not m_is_connected) + if (!m_is_connected) throw redis_error("Not connected"); - if (not buffer.size()) + if (!buffer.size()) return ; std::lock_guard lock(m_write_buffer_mutex); @@ -108,10 +127,10 @@ tcp_client::send(const std::vector& buffer) { void tcp_client::async_read(void) { - m_io_service.async_read(m_fd, m_read_buffer, READ_SIZE, + m_pio_service->async_read(m_sock, m_read_buffer, READ_SIZE, [&](std::size_t length) { if (m_receive_handler) - if (not m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { + if (!m_receive_handler(*this, { m_read_buffer.begin(), m_read_buffer.begin() + length })) { disconnect(); return ; } @@ -126,13 +145,13 @@ tcp_client::async_read(void) { void tcp_client::async_write(void) { - m_io_service.async_write(m_fd, m_write_buffer, m_write_buffer.size(), + m_pio_service->async_write(m_sock, m_write_buffer, m_write_buffer.size(), [&](std::size_t length) { std::lock_guard lock(m_write_buffer_mutex); m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); - if (m_is_connected and m_write_buffer.size()) + if (m_is_connected && m_write_buffer.size()) async_write(); }); } @@ -145,7 +164,9 @@ tcp_client::is_connected(void) { void tcp_client::io_service_disconnection_handler(network::io_service&) { m_is_connected = false; - close(m_fd); + + closesocket(m_sock); + clear_buffer(); if (m_disconnection_handler) diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index 18ddc6b2..515eda4d 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -3,8 +3,8 @@ namespace cpp_redis { -redis_client::redis_client(void) -: m_callbacks_running(0) +redis_client::redis_client(network::io_service* pIO/*= NULL*/) +: m_callbacks_running(0), m_client(pIO) {} void @@ -18,6 +18,12 @@ redis_client::connect(const std::string& host, unsigned int port, m_disconnection_handler = client_disconnection_handler; } +redis_client::~redis_client(void) +{ + if(m_client.is_connected()) + m_client.disconnect(); +} + void redis_client::disconnect(void) { m_client.disconnect(); @@ -51,7 +57,7 @@ redis_client::sync_commit(void) { try_commit(); std::unique_lock lock_callback(m_callbacks_mutex); - m_sync_condvar.wait(lock_callback, [=]{ return m_callbacks_running == 0 and m_callbacks.empty(); }); + m_sync_condvar.wait(lock_callback, [=]{ return m_callbacks_running == 0 && m_callbacks.empty(); }); return *this; } diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 49822eae..fcd550b4 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -88,9 +88,9 @@ redis_subscriber::handle_subscribe_reply(const std::vector& reply) { const auto& channel = reply[1]; const auto& message = reply[2]; - if (not title.is_string() - or not channel.is_string() - or not message.is_string()) + if (!title.is_string() + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "message") @@ -115,10 +115,10 @@ redis_subscriber::handle_psubscribe_reply(const std::vector& reply) { const auto& channel = reply[2]; const auto& message = reply[3]; - if (not title.is_string() - or not pchannel.is_string() - or not channel.is_string() - or not message.is_string()) + if (!title.is_string() + || !pchannel.is_string() + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "pmessage") @@ -136,7 +136,7 @@ redis_subscriber::handle_psubscribe_reply(const std::vector& reply) { void redis_subscriber::connection_receive_handler(network::redis_connection&, reply& reply) { //! alaway return an array - if (not reply.is_array()) + if (!reply.is_array()) return ; auto& array = reply.as_array(); diff --git a/sources/reply.cpp b/sources/reply.cpp index 436fa8d4..36944ad8 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -56,7 +56,7 @@ reply::is_array(void) const { bool reply::is_string(void) const { - return is_simple_string() or is_bulk_string() or is_error(); + return is_simple_string() || is_bulk_string() || is_error(); } bool @@ -86,7 +86,7 @@ reply::is_null(void) const { const std::vector& reply::as_array(void) const { - if (not is_array()) + if (!is_array()) throw cpp_redis::redis_error("Reply is not an array"); return m_rows; @@ -94,7 +94,7 @@ reply::as_array(void) const { const std::string& reply::as_string(void) const { - if (not is_string()) + if (!is_string()) throw cpp_redis::redis_error("Reply is not a string"); return m_strval; @@ -102,7 +102,7 @@ reply::as_string(void) const { int reply::as_integer(void) const { - if (not is_integer()) + if (!is_integer()) throw cpp_redis::redis_error("Reply is not an integer"); return m_intval; diff --git a/tests/sources/spec/redis_client_spec.cpp b/tests/sources/spec/redis_client_spec.cpp index e2f068f4..ca3ecefe 100644 --- a/tests/sources/spec/redis_client_spec.cpp +++ b/tests/sources/spec/redis_client_spec.cpp @@ -100,12 +100,12 @@ TEST(RedisClient, SyncCommitTimeout) { client.connect(); volatile std::atomic_bool callback_exit(false); client.send({ "GET", "HELLO" }, [&] (cpp_redis::reply&) { - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); callback_exit = true; }); EXPECT_NO_THROW(client.sync_commit(std::chrono::milliseconds(100))); EXPECT_FALSE(callback_exit); - while (not callback_exit); + while (!callback_exit); } TEST(RedisClient, SyncCommitNoTimeout) { @@ -114,7 +114,7 @@ TEST(RedisClient, SyncCommitNoTimeout) { client.connect(); std::atomic_bool callback_exit(false); client.send({ "GET", "HELLO" }, [&] (cpp_redis::reply&) { - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); callback_exit = true; }); EXPECT_NO_THROW(client.sync_commit()); @@ -241,7 +241,7 @@ TEST(RedisClient, DisconnectionHandlerWithQuit) { client.send({ "QUIT" }); client.sync_commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(disconnection_handler_called); } @@ -254,7 +254,7 @@ TEST(RedisClient, DisconnectionHandlerWithoutQuit) { }); client.sync_commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(disconnection_handler_called); } diff --git a/tests/sources/spec/redis_subscriber_spec.cpp b/tests/sources/spec/redis_subscriber_spec.cpp index 7af3f4e1..2678f00f 100644 --- a/tests/sources/spec/redis_subscriber_spec.cpp +++ b/tests/sources/spec/redis_subscriber_spec.cpp @@ -136,7 +136,7 @@ TEST(RedisSubscriber, SubConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -155,7 +155,7 @@ TEST(RedisSubscriber, SubNotConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -175,7 +175,7 @@ TEST(RedisSubscriber, SubNotConnectedCommitNotConnectedCommitConnected) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -196,7 +196,7 @@ TEST(RedisSubscriber, SubscribeSomethingPublished) { sub.commit(); client.publish("/chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -220,7 +220,7 @@ TEST(RedisSubscriber, SubscribeMultiplePublished) { client.publish("/chan", "first"); client.publish("/chan", "second"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(number_times_called == 2); } @@ -239,7 +239,7 @@ TEST(RedisSubscriber, SubscribeNothingPublished) { sub.commit(); client.publish("/other_chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -267,7 +267,7 @@ TEST(RedisSubscriber, MultipleSubscribeSomethingPublished) { client.publish("/chan_1", "hello"); client.publish("/chan_2", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -289,7 +289,7 @@ TEST(RedisSubscriber, PSubscribeSomethingPublished) { sub.commit(); client.publish("/chan/hello", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_run); } @@ -319,7 +319,7 @@ TEST(RedisSubscriber, PSubscribeMultiplePublished) { client.publish("/chan/hello", "first"); client.publish("/chan/world", "second"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(number_times_called == 2); } @@ -338,7 +338,7 @@ TEST(RedisSubscriber, PSubscribeNothingPublished) { sub.commit(); client.publish("/other_chan", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_run); } @@ -366,7 +366,7 @@ TEST(RedisSubscriber, MultiplePSubscribeSomethingPublished) { client.publish("/chan/1", "hello"); client.publish("/other_chan/2", "world"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_TRUE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -392,7 +392,7 @@ TEST(RedisSubscriber, Unsubscribe) { client.publish("/chan_1", "hello"); client.publish("/chan_2", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_1_run); EXPECT_TRUE(callback_2_run); } @@ -418,7 +418,7 @@ TEST(RedisSubscriber, PUnsubscribe) { client.publish("/chan_1/hello", "hello"); client.publish("/chan_2/hello", "hello"); client.commit(); - sleep(1); + std::this_thread::sleep_for(std::chrono::seconds(1)); EXPECT_FALSE(callback_1_run); EXPECT_TRUE(callback_2_run); }