diff --git a/.gitignore b/.gitignore index 6854a22f..62e9524d 100644 --- a/.gitignore +++ b/.gitignore @@ -22,10 +22,24 @@ *.a *.lib +build +deps + # Executables *.exe *.out *.app +*.sln +*.vcxproj +*.suo +*.user +*.filters +*.VC.db +*.opendb +.gitignore +cmake_install.cmake -build -deps +CMakeCache.txt +CMakeFiles/ +Release/ +Debug/ \ No newline at end of file diff --git a/.gitignore.bak b/.gitignore.bak new file mode 100644 index 00000000..78a41448 --- /dev/null +++ b/.gitignore.bak @@ -0,0 +1,46 @@ +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +build +deps + +# Executables +*.exe +*.out +*.app +*.sln +*.vcxproj +*.suo +*.user +*.filters +*.VC.db +*.opendb +.gitignore +cmake_install.cmake + +rts-redis-test/ +CMakeCache.txt +CMakeFiles/ +Release/ +Debug/ \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a188713..34a1478a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -65,17 +65,12 @@ set_target_properties(${PROJECT} PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") -IF (READ_SIZE) +IF (CPP_REDIS_READ_SIZE) set_target_properties(${PROJECT} PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}") -ENDIF (READ_SIZE) + COMPILE_DEFINITIONS "CPP_REDIS_READ_SIZE=${CPP_REDIS_READ_SIZE}") +ENDIF (CPP_REDIS_READ_SIZE) -IF (NO_LOGGING) -set_target_properties(${PROJECT} - PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") -ENDIF (NO_LOGGING) ### # install diff --git a/README.md b/README.md index db490a5c..f7d87845 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,6 @@ A [Wiki](https://github.com/Cylix/cpp_redis/wiki) is available and provides docu Some examples are provided in this repository: * [redis_client.cpp](examples/redis_client.cpp) shows how to use the redis client class. * [redis_subscriber.cpp](examples/redis_subscriber.cpp) shows how to use the redis subscriber class. -* [logger.cpp](examples/logger.cpp) shows how to setup a logger for cpp_redis. These examples can also be found inside the [Wiki](https://github.com/Cylix/cpp_redis/wiki/Examples). diff --git a/examples/redis_subscriber.cpp.bak b/examples/redis_subscriber.cpp.bak new file mode 100644 index 00000000..ee40334d --- /dev/null +++ b/examples/redis_subscriber.cpp.bak @@ -0,0 +1,35 @@ +#include + +#include +#include + +volatile std::atomic_bool should_exit(false); +cpp_redis::redis_subscriber sub; + +void +sigint_handler(int) { + std::cout << "disconnected (sigint handler)" << std::endl; + sub.disconnect(); + should_exit = true; +} + +int +main(void) { + sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) { + std::cout << "sub disconnected (disconnection handler)" << std::endl; + should_exit = true; + }); + + sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) { + std::cout << "MESSAGE " << chan << ": " << msg << std::endl; + }); + sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) { + std::cout << "PMESSAGE " << chan << ": " << msg << std::endl; + }); + sub.commit(); + + signal(SIGINT, &sigint_handler); + while (not should_exit); + + return 0; +} diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 4b3a756c..76f2c918 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -5,7 +5,12 @@ #include #include +#ifdef _MSC_VER +#include +#else #include +#endif + #include namespace cpp_redis { @@ -15,7 +20,7 @@ namespace network { class redis_connection { public: //! ctor & dtor - redis_connection(void); + redis_connection::redis_connection(const std::shared_ptr pIO); ~redis_connection(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/win_io_service.hpp b/includes/cpp_redis/network/win_io_service.hpp new file mode 100644 index 00000000..5c57b464 --- /dev/null +++ b/includes/cpp_redis/network/win_io_service.hpp @@ -0,0 +1,149 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#define MAX_BUFF_SIZE CPP_REDIS_READ_SIZE +#define MAX_WORKER_THREADS 16 + +namespace cpp_redis { + +namespace network { + +typedef enum _enIoOperation { + //IO_OP_ACCEPT, + IO_OP_READ, + IO_OP_WRITE + } enIoOperation; + + +class io_service { +public: + //! instance getter (singleton pattern) + static const std::shared_ptr& get_instance(void); + io_service(size_t max_worker_threads = MAX_WORKER_THREADS); + ~io_service(void); + + void shutdown(); + +private: + //! copy ctor & assignment operator + io_service(const io_service&) = delete; + io_service& operator=(const io_service&) = delete; + +public: + //! disconnection handler declaration + typedef std::function disconnection_handler_t; + + //! add or remove a given socket from the io service + //! untrack should never be called from inside a callback + void track(SOCKET sock, const disconnection_handler_t& handler); + void untrack(SOCKET sock); + + //! asynchronously read read_size bytes and append them to the given buffer + //! on completion, call the read_callback to notify of the success or failure of the operation + //! return false if another async_read operation is in progress or socket is not registered + typedef std::function read_callback_t; + bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); + + //! asynchronously write write_size bytes from buffer to the specified fd + //!on completion, call the write_callback to notify of the success or failure of the operation + //! return false if another async_write operation is in progress or socket is not registered + typedef std::function write_callback_t; + bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + +private: + + struct io_context_info : OVERLAPPED { + WSAOVERLAPPED overlapped; + enIoOperation eOperation; + }; + + //! simple struct to keep track of ongoing operations on a given sockeet + class sock_info + { + public: + sock_info(void) = default; + virtual ~sock_info(void) + { + std::lock_guard socklock(sock_info_mutex); + for (auto it = io_contexts_pool.begin(); it != io_contexts_pool.end(); it++) + delete *it; + + io_contexts_pool.clear(); + } + + SOCKET hsock; + std::size_t sent_bytes; + + //Must protect the members of our structure from access by multiple threads during IO Completion + std::recursive_mutex sock_info_mutex; + + //We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations + //Since each must have its OWN struct if we issue them at the same time. + //othewise things get tangled up and borked. + std::vector io_contexts_pool; + + disconnection_handler_t disconnection_handler; + + std::vector* read_buffer; + read_callback_t read_callback; + + std::vector write_buffer; + std::size_t write_size; + write_callback_t write_callback; + + io_context_info* get_pool_io_context() { + io_context_info* pInfo = NULL; + std::lock_guard socklock(sock_info_mutex); + if (!io_contexts_pool.empty()) + { + pInfo = io_contexts_pool.back(); + io_contexts_pool.pop_back(); + } + if (!pInfo) + pInfo = new io_context_info(); + //MUST clear the overlapped structure between IO calls! + memset(&pInfo->overlapped, 0, sizeof(OVERLAPPED)); + return pInfo; + } + + void return_pool_io_context(io_context_info* p_io) { + std::lock_guard socklock(sock_info_mutex); + io_contexts_pool.push_back(p_io); + } + }; + +typedef std::function callback_t; + + //! wait for incoming events and notify + int process_io(void); + + HANDLE m_completion_port; + unsigned int m_worker_thread_pool_size; + std::vector m_worker_threads; //vector containing all the threads we start to service our i/o requests + +private: + //! whether the worker should terminate or not + std::atomic_bool m_should_stop; + + //! tracked sockets + std::unordered_map m_sockets; + + //! mutex to protect m_notify_socket access against race condition + //! + //! specific mutex for untrack: we dont want someone to untrack a socket while we process it + //! this behavior could cause some issues when executing callbacks in another thread + //! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time + //! a callback is executed from within another thread: the untrack mutex avoid this without being costly + std::recursive_mutex m_socket_mutex; +}; + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/network/win_tcp_client.hpp b/includes/cpp_redis/network/win_tcp_client.hpp new file mode 100644 index 00000000..3d5754c0 --- /dev/null +++ b/includes/cpp_redis/network/win_tcp_client.hpp @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifndef __CPP_REDIS_READ_SIZE +# define __CPP_REDIS_READ_SIZE 4096 +#endif /* __CPP_REDIS_READ_SIZE */ + +namespace cpp_redis { + +namespace network { + +//! tcp_client +//! async tcp client based on boost asio +class tcp_client { +public: + //! ctor & dtor + tcp_client(const std::shared_ptr pIO=NULL); + ~tcp_client(void); + + //! assignment operator & copy ctor + tcp_client(const tcp_client&) = delete; + tcp_client& operator=(const tcp_client&) = delete; + + //! returns whether the client is connected or not + bool is_connected(void); + + //! handle connection & disconnection + typedef std::function disconnection_handler_t; + typedef std::function& buffer)> receive_handler_t; + void connect(const std::string& host, unsigned int port, + const disconnection_handler_t& disconnection_handler = nullptr, + const receive_handler_t& receive_handler = nullptr); + void disconnect(void); + + //! send data + void send(const std::string& buffer); + void send(const std::vector& buffer); + +private: + //! make async read and write operations + void async_read(void); + void async_write(void); + + //! io service callback + void io_service_disconnection_handler(io_service&); + + void reset_state(void); + void clear_buffer(void); + +private: + //! io service instance + const std::shared_ptr m_io_service; + + //! socket + SOCKET m_sock; + + //! is connected + std::atomic_bool m_is_connected; + + //! buffers + static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE; + std::vector m_read_buffer; + std::vector m_write_buffer; + + //! handlers + receive_handler_t m_receive_handler; + disconnection_handler_t m_disconnection_handler; + + //! thread safety + std::mutex m_write_buffer_mutex; +}; + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 8345b1a6..0ab7fab9 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -15,7 +15,7 @@ namespace cpp_redis { class redis_client { public: //! ctor & dtor - redis_client(void); + redis_client(const std::shared_ptr pIO =NULL); ~redis_client(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/redis_subscriber.hpp b/includes/cpp_redis/redis_subscriber.hpp index 5c5e1ae1..2c615fdb 100644 --- a/includes/cpp_redis/redis_subscriber.hpp +++ b/includes/cpp_redis/redis_subscriber.hpp @@ -12,7 +12,7 @@ namespace cpp_redis { class redis_subscriber { public: //! ctor & dtor - redis_subscriber(void); + redis_subscriber(const std::shared_ptr pIO = NULL); ~redis_subscriber(void); //! copy ctor & assignment operator diff --git a/rts-redis-test/rts-redis-test.cpp b/rts-redis-test/rts-redis-test.cpp new file mode 100644 index 00000000..d39348d3 --- /dev/null +++ b/rts-redis-test/rts-redis-test.cpp @@ -0,0 +1,120 @@ +// rts-redis-test.cpp : Defines the entry point for the console application. +// +#include "cpp_redis/redis_client.hpp" +#include "cpp_redis/redis_subscriber.hpp" +#include "cpp_redis/network/win_io_service.hpp" +#include + +#include +#include + +cpp_redis::redis_client* client; +volatile std::atomic_bool should_exit(false); + +void +sigint_handler(int) { + std::cout << "disconnected (sigint handler)" << std::endl; + if(client) + client->disconnect(); + should_exit = true; +} + +int +main(void) { + + std::shared_ptr m_pIOService = std::shared_ptr(new cpp_redis::network::io_service(1)); + + //client = new cpp_redis::redis_client(m_pIOService); + + cpp_redis::redis_subscriber sub(m_pIOService); + + sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) { + printf("sub disconnected (disconnection handler)\n"); + should_exit = true; + }); + + sub.subscribe("some_chan", [](const std::string& chan, const std::string& msg) { + // a read operation has completed + printf("MESSAGE chan %s: %s\n", chan.c_str(), msg.c_str()); + }); + sub.psubscribe("*", [](const std::string& chan, const std::string& msg) { + printf("PMESSAGE chan %s: %s\n", chan.c_str(), msg.c_str()); + }); + sub.commit(); + + /* + cpp_redis::redis_subscriber sub(m_pIOService); + //cpp_redis::redis_client client; + + client->connect(); + + std::atomic_bool callback_run(false); + sub.subscribe("/chan", [&](const std::string&, const std::string&) + { + callback_run = true; + }); + + sub.connect(); + sub.commit(); + client->publish("/chan", "hello"); + client->commit(); + Sleep(120000); + //std::this_thread::sleep_for(std::chrono::seconds(1)); + assert(callback_run); + */ + + /* + std::atomic_bool disconnection_handler_called(false); + + client->connect("127.0.0.1", 6379, [&](cpp_redis::redis_client&) + { + disconnection_handler_called = true; + } + ); + + client->send({ "QUIT" }); + client->sync_commit(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + assert(true == disconnection_handler_called); + */ + + + /* + client->send({ "SET", "HELLO", "MultipleSend" }, + [&](cpp_redis::reply& reply) + { + if (!reply.is_string()) + printf("Reply is NOT a string!\n"); + if (!(reply.as_string() == "OK")) + printf("Reply expected OK!\n"); + else + printf("Reply received OK"); + } + ); + client->sync_commit(); + + client->send({ "GET", "HELLO" }, + [&](cpp_redis::reply& reply) + { + if (!reply.is_string()) + printf("Reply is NOT a string!\n"); + if (!(reply.as_string() == "MultipleSend")) + printf("Reply expected MultipleSend!\n"); + else + printf("Reply received MultipleSend\n"); + } + ); + client->sync_commit(); + */ + + signal(SIGINT, &sigint_handler); + while (!should_exit); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + client->disconnect(); + + m_pIOService->shutdown(); + m_pIOService.reset(); + + return 0; +} diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 4c155209..bbfa14de 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -5,9 +5,10 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(void) +redis_connection::redis_connection(const std::shared_ptr pIO/*=NULL*/) : m_reply_callback(nullptr) , m_disconnection_handler(nullptr) +, m_client(pIO) { __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection created"); } diff --git a/sources/network/win_io_service.cpp b/sources/network/win_io_service.cpp new file mode 100644 index 00000000..a199103e --- /dev/null +++ b/sources/network/win_io_service.cpp @@ -0,0 +1,284 @@ +#include "cpp_redis/network/win_io_service.hpp" +#include "cpp_redis/redis_error.hpp" + +namespace cpp_redis { + +namespace network { + +const std::shared_ptr& + io_service::get_instance(void) { + static std::shared_ptr instance = std::shared_ptr{ new io_service }; + return instance; +} + +io_service::io_service(size_t max_worker_threads) + : m_should_stop(false) +{ + //Determine the size of the thread pool dynamically. + //2 * number of processors in the system is our rule here. + SYSTEM_INFO info; + ::GetSystemInfo(&info); + m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); + + if (m_worker_thread_pool_size > max_worker_threads) + m_worker_thread_pool_size = max_worker_threads; + + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); + + //Create completion port. Pass 0 for parameter 4 to allow as many threads as there are processors in the system + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);; + if( INVALID_HANDLE_VALUE == m_completion_port) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); + + //Now startup worker thread pool which will service our async io requests + for (unsigned int i=0; isecond; + //Post for each of our worker threads. + int workers = m_worker_threads.size(); + for(int i=0; i lock(m_socket_mutex); + + //Add the socket to our map and return the allocated struct + auto& info = m_sockets[sock]; + + info.hsock = sock; + info.disconnection_handler = handler; + + //Associate the socket with our io completion port. + if( NULL == CreateIoCompletionPort((HANDLE)sock, m_completion_port, (DWORD_PTR)&m_sockets[sock], 0)) + { + throw cpp_redis::redis_error("Track() failed to create CreateIoCompletionPort"); + } +} + +void +io_service::untrack(SOCKET sock) +{ + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return; + auto& sockinfo = sock_it->second; + + //Wait until the posted i/o has completed. + //while (m_completion_port && !HasOverlappedIoCompleted((LPOVERLAPPED)&sockinfo.io_info.overlapped)) + // std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + //Resize the buffer to the correct size if we need it to be to hold the incoming data. + if (buffer.size() < read_size) + buffer.resize(read_size); + + auto& sockinfo = sock_it->second; + sockinfo.read_buffer = &buffer; + sockinfo.read_callback = callback; + + DWORD dwRecvNumBytes = 0; + DWORD dwFlags = 0; + WSABUF buffRecv; + + buffRecv.buf = sockinfo.read_buffer->data(); + buffRecv.len = read_size; + + //We need a new overlapped struct for EACH overlapped operation. + //we reuse them over and over. + io_context_info* p_io_info = sockinfo.get_pool_io_context(); + + int nRet = WSARecv(sock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &(p_io_info->overlapped), NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + sockinfo.write_buffer = buffer; + sockinfo.write_size = write_size; + sockinfo.write_callback = callback; + + WSABUF buffSend; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + buffSend.buf = sockinfo.write_buffer.data(); + buffSend.len = write_size; + + //We need a new overlapped struct for EACH overlapped operation. + //we reuse them over and over. + io_context_info* p_io_info = sockinfo.get_pool_io_context(); + p_io_info->eOperation = IO_OP_WRITE; + + int nRet = WSASend(sock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(p_io_info->overlapped), NULL); + if (SOCKET_ERROR == nRet) + { + int error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) + { + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + } + + return true; +} + +//function used by worker thread(s) used to process io requests +int +io_service::process_io(void) +{ + BOOL bSuccess = FALSE; + int nRet = 0; + LPWSAOVERLAPPED pOverlapped = NULL; + sock_info* psock_info = NULL; + io_context_info* pio_info = NULL; + + DWORD dwRecvNumBytes = 0; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + DWORD io_size = 0; + enIoOperation e_op; + int sock_error = 0; + + while (!m_should_stop) + { + // continually loop to service io completion packets + psock_info = NULL; + pOverlapped = NULL; + + bSuccess = GetQueuedCompletionStatus(m_completion_port, &io_size, (PDWORD_PTR)&psock_info, (LPOVERLAPPED *)&pOverlapped, INFINITE); + if (!bSuccess) + { + //client socket must have died. continue... + if (0 == io_size) + { + sock_error = WSAGetLastError(); + //Fire the disconnect handler if we can. + if(psock_info && sock_error != ERROR_CONNECTION_ABORTED) //Seems the psock_info structure is crap when ERROR_CONNECTION_ABORTED is captured. + psock_info->disconnection_handler(*this); + continue; + } + if (m_should_stop) + return 0; + } + + //get the base address of the struct holding lpOverlapped (the io_context_info) pointer. + if (pOverlapped) + pio_info = CONTAINING_RECORD(pOverlapped, io_context_info, overlapped); + else + pio_info = NULL; + + // Somebody used PostQueuedCompletionStatus to post an I/O packet with + // a NULL CompletionKey (or if we get one for any reason). It is time to exit. + if (!psock_info || !pOverlapped) + return 0; + + e_op = pio_info->eOperation; + + //Push it to the pool so another IO operation can use it again later + if(pio_info) + psock_info->return_pool_io_context(pio_info); + pio_info = NULL; + + // First check to see if an error has occurred on the socket and if so + // then close the socket and cleanup the SOCKET_INFORMATION structure associated with the socket + if(0 == io_size) + { + //Fire the disconnect handler. Check if stopped first because of some weird timing issues that may get us here. + if(bSuccess && false == m_should_stop && psock_info) + psock_info->disconnection_handler(*this); + continue; + } + + // determine what type of IO packet has completed by checking the io_context_info + // associated with this io operation. This will determine what action to take. + switch (e_op) + { + case IO_OP_READ: // a read operation has completed + //TODO Resize Read buffer to the size that we read? + psock_info->read_callback(io_size); + break; + + case IO_OP_WRITE: // a write operation has completed + //Lock our socket structure before updating the write_size + { + std::lock_guard socklock(psock_info->sock_info_mutex); + psock_info->write_size -= io_size; + } + psock_info->write_callback(io_size); + break; + } //switch + } //while + + return 0; +} + +} //! network +} //! cpp_redis diff --git a/sources/network/win_tcp_client.cpp b/sources/network/win_tcp_client.cpp new file mode 100644 index 00000000..15979da7 --- /dev/null +++ b/sources/network/win_tcp_client.cpp @@ -0,0 +1,238 @@ +#include + +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings for gethostbyname() +#include + +#include +#include + +namespace cpp_redis { + +namespace network { + +//! note that we call io_service::get_instance in the init list +//! +//! this will force force io_service instance creation +//! this is a workaround to handle static object destructions order +//! +//! that way, any object containing a tcp_client has an attribute (or through its attributes) +//! is guaranteed to be destructed before the io_service is destructed, even if it is global +tcp_client::tcp_client(const std::shared_ptr pIO) +: m_io_service(pIO?pIO:io_service::get_instance()) +, m_sock(-1) +, m_is_connected(false) +, m_receive_handler(nullptr) +, m_disconnection_handler(nullptr) +{ + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client created"); +} + +tcp_client::~tcp_client(void) { + disconnect(); + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client destroyed"); +} + +void +tcp_client::connect(const std::string& host, unsigned int port, + const disconnection_handler_t& disconnection_handler, + const receive_handler_t& receive_handler) +{ + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attempts to connect"); + + if (m_is_connected) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client is already connected"); + return throw cpp_redis::redis_error("Client already connected"); + } + + //! create the socket + int nZero = 0; + //Enable socket for overlapped i/o + m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); + if (m_sock < 0) + throw redis_error("Can't open a socket"); + + //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. + //The advantage is performance because we save a buffer copy between the TCP stack buffer + //and our user buffer for each I/O call. + //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! + if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero))) { + return throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to disable buffering"); + } + + //! get the server's DNS entry + struct hostent *server = gethostbyname(host.c_str()); + if (!server) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not resolve DNS"); + throw redis_error("No such host: " + host); + } + + //! build the server's Internet address + struct sockaddr_in server_addr; + std::memset(&server_addr, 0, sizeof(server_addr)); + std::memcpy(&server_addr.sin_addr.s_addr, server->h_addr, server->h_length); + server_addr.sin_port = htons(port); + server_addr.sin_family = AF_INET; + + //! create a connection with the server + if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { + throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); + } + + //! add socket to the io_service and set the disconnection & recv handlers + m_disconnection_handler = disconnection_handler; + m_receive_handler = receive_handler; + + u_long ulValue = 1; + if (0 != ioctlsocket(m_sock, FIONBIO, &ulValue)) //Set socket to non blocking. + throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to set socket to non-blocking"); + + m_io_service->track(m_sock, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); + m_is_connected = true; + + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client connected"); + + //! start async read + async_read(); +} + +void +tcp_client::disconnect(void) { + if (!m_is_connected) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client already disconnected"); + return ; + } + + m_is_connected = false; + m_io_service->untrack(m_sock); + + closesocket(m_sock); + m_sock = INVALID_SOCKET; + + clear_buffer(); + reset_state(); + + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client disconnected"); +} + +void +tcp_client::send(const std::string& buffer) { + send(std::vector{ buffer.begin(), buffer.end() }); +} + +void +tcp_client::send(const std::vector& buffer) { + if (!m_is_connected) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client is not connected"); + throw redis_error("Not connected"); + } + + if (!buffer.size()) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has nothing to send"); + return ; + } + + std::lock_guard lock(m_write_buffer_mutex); + + bool bytes_in_buffer = m_write_buffer.size() > 0; + + //! concat buffer + m_write_buffer.insert(m_write_buffer.end(), buffer.begin(), buffer.end()); + + //! if there were already bytes in buffer, simply return + //! otherwise async_write calls itself recursively until the write buffer is empty + if (bytes_in_buffer) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client is already processing an async_write"); + return ; + } + + async_write(); +} + +void +tcp_client::async_read(void) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_read"); + + m_io_service->async_read(m_sock, m_read_buffer, READ_SIZE, + [&](std::size_t length) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client received data"); + + if (m_receive_handler) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls receive_handler"); + + if (!m_receive_handler(*this, + { m_read_buffer.begin(), m_read_buffer.begin() + length })) + { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has been asked for disconnection by receive_handler"); + disconnect(); + return; + } + } + + //! clear read buffer and re-issue async read to receive more incoming bytes + m_read_buffer.clear(); + + if (m_is_connected) + async_read(); + } + ); +} + +void +tcp_client::async_write(void) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_write"); + + m_io_service->async_write(m_sock, m_write_buffer, m_write_buffer.size(), + [&](std::size_t length) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client wrote data and cleans write_buffer"); + std::lock_guard lock(m_write_buffer_mutex); + + //Remove what has already been sent and see if we have any more to send + m_write_buffer.erase(m_write_buffer.begin(), m_write_buffer.begin() + length); + + //If we still have data to write the call ourselves recursivly until the buffer is completely sent + if (m_is_connected && m_write_buffer.size()) + async_write(); + } + ); +} + +bool +tcp_client::is_connected(void) { + return m_is_connected; +} + +void +tcp_client::io_service_disconnection_handler(network::io_service&) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client has been disconnected"); + + reset_state(); + + if (m_disconnection_handler) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls disconnection handler"); + m_disconnection_handler(*this); + } +} + +void +tcp_client::reset_state(void) { + if(m_sock != INVALID_SOCKET) + closesocket(m_sock); + m_is_connected = false; + m_sock = INVALID_SOCKET; + + clear_buffer(); +} + +void +tcp_client::clear_buffer(void) { + std::lock_guard lock(m_write_buffer_mutex); + m_write_buffer.clear(); + m_read_buffer.clear(); +} + +} //! network + +} //! cpp_redis diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index c44c70fd..f1dafe8f 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -1,9 +1,15 @@ #include #include +#ifdef _MSC_VER +#include +#else +#include +#endif + namespace cpp_redis { -redis_client::redis_client(void) { +redis_client::redis_client(const std::shared_ptr pIO) : m_client(pIO){ __CPP_REDIS_LOG(debug, "cpp_redis::redis_client created"); } diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 76f777e5..74cb90ad 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -4,7 +4,8 @@ namespace cpp_redis { -redis_subscriber::redis_subscriber(void) { +redis_subscriber::redis_subscriber(const std::shared_ptr pIO) : + m_client(pIO) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber created"); } @@ -124,8 +125,8 @@ redis_subscriber::handle_subscribe_reply(const std::vector& reply) { const auto& message = reply[2]; if (!title.is_string() - || !channel.is_string() - || !message.is_string()) + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "message") diff --git a/sources/reply.cpp b/sources/reply.cpp index cc674e33..36944ad8 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -1,5 +1,5 @@ -#include -#include +#include "cpp_redis/reply.hpp" +#include "cpp_redis/redis_error.hpp" namespace cpp_redis {