diff --git a/.gitignore b/.gitignore index 6854a22f..62e9524d 100644 --- a/.gitignore +++ b/.gitignore @@ -22,10 +22,24 @@ *.a *.lib +build +deps + # Executables *.exe *.out *.app +*.sln +*.vcxproj +*.suo +*.user +*.filters +*.VC.db +*.opendb +.gitignore +cmake_install.cmake -build -deps +CMakeCache.txt +CMakeFiles/ +Release/ +Debug/ \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 32f90934..f4f6adfd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,4 +23,4 @@ install: - if [ "$CXX" = "g++" ]; then export CXX="g++-4.8" CC="gcc-4.8"; fi - ./install_deps.sh -script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./target/cpp_redis_tests +script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./bin/cpp_redis_tests diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a188713..db8f6bc2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ cmake_minimum_required(VERSION 2.8.7) set(CMAKE_MACOSX_RPATH 1) + ### # verbose make ### @@ -20,7 +21,14 @@ project(${PROJECT} CXX) ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") +IF (WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2") + add_definitions(-D_UNICODE) + add_definitions(-DUNICODE) + add_definitions(-DWIN32_LEAN_AND_MEAN) +ELSE () + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3") +ENDIF (WIN32) ### @@ -47,8 +55,15 @@ link_directories(${GTEST_LIBS}) ### # sources ### -set(DIRS "sources" "sources/network" "sources/builders") -foreach(dir ${DIRS}) +set(SRC_DIRS "sources" "sources/network" "sources/builders") + +IF (WIN32) + set(SRC_DIRS ${SRC_DIRS} "sources/network/windows") +ELSE () + set(SRC_DIRS ${SRC_DIRS} "sources/network/unix") +ENDIF (WIN32) + +foreach(dir ${SRC_DIRS}) # get directory sources file(GLOB s_${dir} "${dir}/*.cpp") # set sources @@ -56,14 +71,24 @@ foreach(dir ${DIRS}) endforeach() +### +# outputs +### +set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) +set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) + + ### # executable ### -add_library(${PROJECT} SHARED ${SOURCES}) -target_link_libraries(${PROJECT} pthread) -set_target_properties(${PROJECT} - PROPERTIES - LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") +add_library(${PROJECT} STATIC ${SOURCES}) + +IF (WIN32) + target_link_libraries(${PROJECT} ws2_32) +ELSE () + target_link_libraries(${PROJECT} pthread) +ENDIF (WIN32) IF (READ_SIZE) set_target_properties(${PROJECT} @@ -74,14 +99,16 @@ ENDIF (READ_SIZE) IF (NO_LOGGING) set_target_properties(${PROJECT} PROPERTIES - COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") + COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}") ENDIF (NO_LOGGING) + ### # install ### -install (TARGETS ${PROJECT} DESTINATION lib) -install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include) +install (DIRECTORY ${CMAKE_BINARY_DIR}/lib/ DESTINATION lib USE_SOURCE_PERMISSIONS) +install (DIRECTORY ${CMAKE_BINARY_DIR}/bin/ DESTINATION bin USE_SOURCE_PERMISSIONS) +install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include USE_SOURCE_PERMISSIONS) ### diff --git a/README.md b/README.md index db490a5c..f7d87845 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,6 @@ A [Wiki](https://github.com/Cylix/cpp_redis/wiki) is available and provides docu Some examples are provided in this repository: * [redis_client.cpp](examples/redis_client.cpp) shows how to use the redis client class. * [redis_subscriber.cpp](examples/redis_subscriber.cpp) shows how to use the redis subscriber class. -* [logger.cpp](examples/logger.cpp) shows how to setup a logger for cpp_redis. These examples can also be found inside the [Wiki](https://github.com/Cylix/cpp_redis/wiki/Examples). diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index cc906fb2..344d45ee 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,7 +1,9 @@ ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +IF (NOT WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +ENDIF (NOT WIN32) ### @@ -16,18 +18,9 @@ include_directories(${PROJECT_SOURCE_DIR}/includes ### add_executable(redis_client redis_client.cpp) target_link_libraries(redis_client cpp_redis) -set_target_properties(redis_client - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") add_executable(redis_subscriber redis_subscriber.cpp) target_link_libraries(redis_subscriber cpp_redis) -set_target_properties(redis_subscriber - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") add_executable(logger logger.cpp) target_link_libraries(logger cpp_redis) -set_target_properties(logger - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") diff --git a/examples/redis_subscriber.cpp.bak b/examples/redis_subscriber.cpp.bak new file mode 100644 index 00000000..ee40334d --- /dev/null +++ b/examples/redis_subscriber.cpp.bak @@ -0,0 +1,35 @@ +#include + +#include +#include + +volatile std::atomic_bool should_exit(false); +cpp_redis::redis_subscriber sub; + +void +sigint_handler(int) { + std::cout << "disconnected (sigint handler)" << std::endl; + sub.disconnect(); + should_exit = true; +} + +int +main(void) { + sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) { + std::cout << "sub disconnected (disconnection handler)" << std::endl; + should_exit = true; + }); + + sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) { + std::cout << "MESSAGE " << chan << ": " << msg << std::endl; + }); + sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) { + std::cout << "PMESSAGE " << chan << ": " << msg << std::endl; + }); + sub.commit(); + + signal(SIGINT, &sigint_handler); + while (not should_exit); + + return 0; +} diff --git a/includes/cpp_redis/network/redis_connection.hpp b/includes/cpp_redis/network/redis_connection.hpp index 4b3a756c..bd79ccb3 100644 --- a/includes/cpp_redis/network/redis_connection.hpp +++ b/includes/cpp_redis/network/redis_connection.hpp @@ -5,7 +5,12 @@ #include #include -#include +#ifdef _MSC_VER +# include +#else +# include +#endif /* _MSC_VER */ + #include namespace cpp_redis { @@ -15,7 +20,7 @@ namespace network { class redis_connection { public: //! ctor & dtor - redis_connection(void); + redis_connection(const std::shared_ptr& IO); ~redis_connection(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/network/io_service.hpp b/includes/cpp_redis/network/unix/io_service.hpp similarity index 100% rename from includes/cpp_redis/network/io_service.hpp rename to includes/cpp_redis/network/unix/io_service.hpp diff --git a/includes/cpp_redis/network/tcp_client.hpp b/includes/cpp_redis/network/unix/tcp_client.hpp similarity index 94% rename from includes/cpp_redis/network/tcp_client.hpp rename to includes/cpp_redis/network/unix/tcp_client.hpp index d74204fa..1514afc6 100644 --- a/includes/cpp_redis/network/tcp_client.hpp +++ b/includes/cpp_redis/network/unix/tcp_client.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #ifndef __CPP_REDIS_READ_SIZE @@ -23,7 +23,7 @@ namespace network { class tcp_client { public: //! ctor & dtor - tcp_client(void); + tcp_client(const std::shared_ptr& IO = nullptr); ~tcp_client(void); //! assignment operator & copy ctor diff --git a/includes/cpp_redis/network/windows/io_service.hpp b/includes/cpp_redis/network/windows/io_service.hpp new file mode 100644 index 00000000..0d2f3c5d --- /dev/null +++ b/includes/cpp_redis/network/windows/io_service.hpp @@ -0,0 +1,149 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#define MAX_BUFF_SIZE __CPP_REDIS_READ_SIZE +#define MAX_WORKER_THREADS 16 + +namespace cpp_redis { + +namespace network { + +typedef enum _enIoOperation { + //IO_OP_ACCEPT, + IO_OP_READ, + IO_OP_WRITE + } enIoOperation; + + +class io_service { +public: + //! instance getter (singleton pattern) + static const std::shared_ptr& get_instance(void); + io_service(size_t max_worker_threads = MAX_WORKER_THREADS); + ~io_service(void); + + void shutdown(); + +private: + //! copy ctor & assignment operator + io_service(const io_service&) = delete; + io_service& operator=(const io_service&) = delete; + +public: + //! disconnection handler declaration + typedef std::function disconnection_handler_t; + + //! add or remove a given socket from the io service + //! untrack should never be called from inside a callback + void track(SOCKET sock, const disconnection_handler_t& handler); + void untrack(SOCKET sock); + + //! asynchronously read read_size bytes and append them to the given buffer + //! on completion, call the read_callback to notify of the success or failure of the operation + //! return false if another async_read operation is in progress or socket is not registered + typedef std::function read_callback_t; + bool async_read(SOCKET socket, std::vector& buffer, std::size_t read_size, const read_callback_t& callback); + + //! asynchronously write write_size bytes from buffer to the specified fd + //!on completion, call the write_callback to notify of the success or failure of the operation + //! return false if another async_write operation is in progress or socket is not registered + typedef std::function write_callback_t; + bool async_write(SOCKET socket, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback); + +private: + + struct io_context_info : OVERLAPPED { + WSAOVERLAPPED overlapped; + enIoOperation eOperation; + }; + + //! simple struct to keep track of ongoing operations on a given sockeet + class sock_info + { + public: + sock_info(void) = default; + virtual ~sock_info(void) + { + std::lock_guard socklock(sock_info_mutex); + for (auto it = io_contexts_pool.begin(); it != io_contexts_pool.end(); it++) + delete *it; + + io_contexts_pool.clear(); + } + + SOCKET hsock; + std::size_t sent_bytes; + + //Must protect the members of our structure from access by multiple threads during IO Completion + std::recursive_mutex sock_info_mutex; + + //We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations + //Since each must have its OWN struct if we issue them at the same time. + //othewise things get tangled up and borked. + std::vector io_contexts_pool; + + disconnection_handler_t disconnection_handler; + + std::vector* read_buffer; + read_callback_t read_callback; + + std::vector write_buffer; + std::size_t write_size; + write_callback_t write_callback; + + io_context_info* get_pool_io_context() { + io_context_info* pInfo = NULL; + std::lock_guard socklock(sock_info_mutex); + if (!io_contexts_pool.empty()) + { + pInfo = io_contexts_pool.back(); + io_contexts_pool.pop_back(); + } + if (!pInfo) + pInfo = new io_context_info(); + //MUST clear the overlapped structure between IO calls! + memset(&pInfo->overlapped, 0, sizeof(OVERLAPPED)); + return pInfo; + } + + void return_pool_io_context(io_context_info* p_io) { + std::lock_guard socklock(sock_info_mutex); + io_contexts_pool.push_back(p_io); + } + }; + +typedef std::function callback_t; + + //! wait for incoming events and notify + int process_io(void); + + HANDLE m_completion_port; + unsigned int m_worker_thread_pool_size; + std::vector m_worker_threads; //vector containing all the threads we start to service our i/o requests + +private: + //! whether the worker should terminate or not + std::atomic_bool m_should_stop; + + //! tracked sockets + std::unordered_map m_sockets; + + //! mutex to protect m_notify_socket access against race condition + //! + //! specific mutex for untrack: we dont want someone to untrack a socket while we process it + //! this behavior could cause some issues when executing callbacks in another thread + //! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time + //! a callback is executed from within another thread: the untrack mutex avoid this without being costly + std::recursive_mutex m_socket_mutex; +}; + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/network/windows/tcp_client.hpp b/includes/cpp_redis/network/windows/tcp_client.hpp new file mode 100644 index 00000000..24e235d7 --- /dev/null +++ b/includes/cpp_redis/network/windows/tcp_client.hpp @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#ifndef __CPP_REDIS_READ_SIZE +# define __CPP_REDIS_READ_SIZE 4096 +#endif /* __CPP_REDIS_READ_SIZE */ + +namespace cpp_redis { + +namespace network { + +//! tcp_client +//! async tcp client based on boost asio +class tcp_client { +public: + //! ctor & dtor + tcp_client(const std::shared_ptr& IO = nullptr); + ~tcp_client(void); + + //! assignment operator & copy ctor + tcp_client(const tcp_client&) = delete; + tcp_client& operator=(const tcp_client&) = delete; + + //! returns whether the client is connected or not + bool is_connected(void); + + //! handle connection & disconnection + typedef std::function disconnection_handler_t; + typedef std::function& buffer)> receive_handler_t; + void connect(const std::string& host, unsigned int port, + const disconnection_handler_t& disconnection_handler = nullptr, + const receive_handler_t& receive_handler = nullptr); + void disconnect(void); + + //! send data + void send(const std::string& buffer); + void send(const std::vector& buffer); + +private: + //! make async read and write operations + void async_read(void); + void async_write(void); + + //! io service callback + void io_service_disconnection_handler(io_service&); + + void reset_state(void); + void clear_buffer(void); + +private: + //! io service instance + const std::shared_ptr m_io_service; + + //! socket + SOCKET m_sock; + + //! is connected + std::atomic_bool m_is_connected; + + //! buffers + static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE; + std::vector m_read_buffer; + std::list> m_write_buffer; + + //! handlers + receive_handler_t m_receive_handler; + disconnection_handler_t m_disconnection_handler; + + //! thread safety + std::mutex m_write_buffer_mutex; +}; + +} //! network + +} //! cpp_redis diff --git a/includes/cpp_redis/redis_client.hpp b/includes/cpp_redis/redis_client.hpp index 8345b1a6..a633dcbf 100644 --- a/includes/cpp_redis/redis_client.hpp +++ b/includes/cpp_redis/redis_client.hpp @@ -15,7 +15,7 @@ namespace cpp_redis { class redis_client { public: //! ctor & dtor - redis_client(void); + redis_client(const std::shared_ptr& IO = nullptr); ~redis_client(void); //! copy ctor & assignment operator diff --git a/includes/cpp_redis/redis_subscriber.hpp b/includes/cpp_redis/redis_subscriber.hpp index 5c5e1ae1..f71a3a4f 100644 --- a/includes/cpp_redis/redis_subscriber.hpp +++ b/includes/cpp_redis/redis_subscriber.hpp @@ -12,7 +12,7 @@ namespace cpp_redis { class redis_subscriber { public: //! ctor & dtor - redis_subscriber(void); + redis_subscriber(const std::shared_ptr& IO = nullptr); ~redis_subscriber(void); //! copy ctor & assignment operator diff --git a/rts-redis-test/rts-redis-test.cpp b/rts-redis-test/rts-redis-test.cpp new file mode 100644 index 00000000..d39348d3 --- /dev/null +++ b/rts-redis-test/rts-redis-test.cpp @@ -0,0 +1,120 @@ +// rts-redis-test.cpp : Defines the entry point for the console application. +// +#include "cpp_redis/redis_client.hpp" +#include "cpp_redis/redis_subscriber.hpp" +#include "cpp_redis/network/win_io_service.hpp" +#include + +#include +#include + +cpp_redis::redis_client* client; +volatile std::atomic_bool should_exit(false); + +void +sigint_handler(int) { + std::cout << "disconnected (sigint handler)" << std::endl; + if(client) + client->disconnect(); + should_exit = true; +} + +int +main(void) { + + std::shared_ptr m_pIOService = std::shared_ptr(new cpp_redis::network::io_service(1)); + + //client = new cpp_redis::redis_client(m_pIOService); + + cpp_redis::redis_subscriber sub(m_pIOService); + + sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) { + printf("sub disconnected (disconnection handler)\n"); + should_exit = true; + }); + + sub.subscribe("some_chan", [](const std::string& chan, const std::string& msg) { + // a read operation has completed + printf("MESSAGE chan %s: %s\n", chan.c_str(), msg.c_str()); + }); + sub.psubscribe("*", [](const std::string& chan, const std::string& msg) { + printf("PMESSAGE chan %s: %s\n", chan.c_str(), msg.c_str()); + }); + sub.commit(); + + /* + cpp_redis::redis_subscriber sub(m_pIOService); + //cpp_redis::redis_client client; + + client->connect(); + + std::atomic_bool callback_run(false); + sub.subscribe("/chan", [&](const std::string&, const std::string&) + { + callback_run = true; + }); + + sub.connect(); + sub.commit(); + client->publish("/chan", "hello"); + client->commit(); + Sleep(120000); + //std::this_thread::sleep_for(std::chrono::seconds(1)); + assert(callback_run); + */ + + /* + std::atomic_bool disconnection_handler_called(false); + + client->connect("127.0.0.1", 6379, [&](cpp_redis::redis_client&) + { + disconnection_handler_called = true; + } + ); + + client->send({ "QUIT" }); + client->sync_commit(); + std::this_thread::sleep_for(std::chrono::seconds(5)); + assert(true == disconnection_handler_called); + */ + + + /* + client->send({ "SET", "HELLO", "MultipleSend" }, + [&](cpp_redis::reply& reply) + { + if (!reply.is_string()) + printf("Reply is NOT a string!\n"); + if (!(reply.as_string() == "OK")) + printf("Reply expected OK!\n"); + else + printf("Reply received OK"); + } + ); + client->sync_commit(); + + client->send({ "GET", "HELLO" }, + [&](cpp_redis::reply& reply) + { + if (!reply.is_string()) + printf("Reply is NOT a string!\n"); + if (!(reply.as_string() == "MultipleSend")) + printf("Reply expected MultipleSend!\n"); + else + printf("Reply received MultipleSend\n"); + } + ); + client->sync_commit(); + */ + + signal(SIGINT, &sigint_handler); + while (!should_exit); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + client->disconnect(); + + m_pIOService->shutdown(); + m_pIOService.reset(); + + return 0; +} diff --git a/sources/builders/integer_builder.cpp b/sources/builders/integer_builder.cpp index 2915d7c3..c7494dbf 100644 --- a/sources/builders/integer_builder.cpp +++ b/sources/builders/integer_builder.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include diff --git a/sources/network/redis_connection.cpp b/sources/network/redis_connection.cpp index 4c155209..2e832126 100644 --- a/sources/network/redis_connection.cpp +++ b/sources/network/redis_connection.cpp @@ -5,8 +5,9 @@ namespace cpp_redis { namespace network { -redis_connection::redis_connection(void) -: m_reply_callback(nullptr) +redis_connection::redis_connection(const std::shared_ptr& IO) +: m_client(IO) +, m_reply_callback(nullptr) , m_disconnection_handler(nullptr) { __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection created"); @@ -85,7 +86,7 @@ redis_connection::tcp_client_receive_handler(network::tcp_client&, const std::ve __CPP_REDIS_LOG(debug, "cpp_redis::network::redis_connection receives packet, attempts to build reply"); m_builder << std::string(buffer.begin(), buffer.end()); } - catch (const redis_error& e) { + catch (const redis_error&) { __CPP_REDIS_LOG(error, "cpp_redis::network::redis_connection could not build reply (invalid format), disconnecting"); if (m_disconnection_handler) { diff --git a/sources/network/io_service.cpp b/sources/network/unix/io_service.cpp similarity index 99% rename from sources/network/io_service.cpp rename to sources/network/unix/io_service.cpp index 52ed2b91..ca0e7a12 100644 --- a/sources/network/io_service.cpp +++ b/sources/network/unix/io_service.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include #include diff --git a/sources/network/tcp_client.cpp b/sources/network/unix/tcp_client.cpp similarity index 97% rename from sources/network/tcp_client.cpp rename to sources/network/unix/tcp_client.cpp index fdfb3e54..56493e3c 100644 --- a/sources/network/tcp_client.cpp +++ b/sources/network/unix/tcp_client.cpp @@ -3,7 +3,7 @@ #include #include -#include +#include namespace cpp_redis { @@ -16,8 +16,8 @@ namespace network { //! //! that way, any object containing a tcp_client has an attribute (or through its attributes) //! is guaranteed to be destructed before the io_service is destructed, even if it is global -tcp_client::tcp_client(void) -: m_io_service(io_service::get_instance()) +tcp_client::tcp_client(const std::shared_ptr& IO) +: m_io_service(IO ? IO : io_service::get_instance()) , m_fd(-1) , m_is_connected(false) , m_receive_handler(nullptr) diff --git a/sources/network/windows/io_service.cpp b/sources/network/windows/io_service.cpp new file mode 100644 index 00000000..fc0a3959 --- /dev/null +++ b/sources/network/windows/io_service.cpp @@ -0,0 +1,283 @@ +#include "cpp_redis/network/windows/io_service.hpp" +#include "cpp_redis/redis_error.hpp" + +namespace cpp_redis { + +namespace network { + +const std::shared_ptr& + io_service::get_instance(void) { + static std::shared_ptr instance = std::shared_ptr{ new io_service }; + return instance; +} + +io_service::io_service(size_t max_worker_threads) + : m_should_stop(false) +{ + //Determine the size of the thread pool dynamically. + //2 * number of processors in the system is our rule here. + SYSTEM_INFO info; + ::GetSystemInfo(&info); + m_worker_thread_pool_size = (info.dwNumberOfProcessors * 2); + + if (m_worker_thread_pool_size > max_worker_threads) + m_worker_thread_pool_size = max_worker_threads; + + WSADATA wsaData; + int nRet = 0; + if ((nRet = WSAStartup(0x202, &wsaData)) != 0) //Start winsock before any other socket calls. + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, WSAStartup() failure"); + + //Create completion port. Pass 0 for parameter 4 to allow as many threads as there are processors in the system + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);; + if( INVALID_HANDLE_VALUE == m_completion_port) + throw cpp_redis::redis_error("Could not init cpp_redis::io_service, CreateIoCompletionPort() failure"); + + //Now startup worker thread pool which will service our async io requests + for (unsigned int i=0; isecond; + //Post for each of our worker threads. + int workers = m_worker_threads.size(); + for(int i=0; i lock(m_socket_mutex); + + //Add the socket to our map and return the allocated struct + auto& info = m_sockets[sock]; + + info.hsock = sock; + info.disconnection_handler = handler; + + //Associate the socket with our io completion port. + if( NULL == CreateIoCompletionPort((HANDLE)sock, m_completion_port, (DWORD_PTR)&m_sockets[sock], 0)) + { + throw cpp_redis::redis_error("Track() failed to create CreateIoCompletionPort"); + } +} + +void +io_service::untrack(SOCKET sock) +{ + std::lock_guard lock(m_socket_mutex); + m_sockets.erase(sock); +} + +bool +io_service::async_read(SOCKET sock, std::vector& buffer, std::size_t read_size, const read_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + //Resize the buffer to the correct size if we need it to be to hold the incoming data. + if (buffer.size() < read_size) + buffer.resize(read_size); + + auto& sockinfo = sock_it->second; + sockinfo.read_buffer = &buffer; + sockinfo.read_callback = callback; + + DWORD dwRecvNumBytes = 0; + DWORD dwFlags = 0; + WSABUF buffRecv; + + buffRecv.buf = sockinfo.read_buffer->data(); + buffRecv.len = read_size; + + //We need a new overlapped struct for EACH overlapped operation. + //we reuse them over and over. + io_context_info* p_io_info = sockinfo.get_pool_io_context(); + + int nRet = WSARecv(sock, &buffRecv, 1, &dwRecvNumBytes, &dwFlags, &(p_io_info->overlapped), NULL); + + if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) + { + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + + return true; +} + +bool +io_service::async_write(SOCKET sock, const std::vector& buffer, std::size_t write_size, const write_callback_t& callback) { + std::lock_guard lock(m_socket_mutex); + + auto sock_it = m_sockets.find(sock); + if (sock_it == m_sockets.end()) + return false; + + auto& sockinfo = sock_it->second; + sockinfo.write_buffer = buffer; + sockinfo.write_size = write_size; + sockinfo.write_callback = callback; + + WSABUF buffSend; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + buffSend.buf = sockinfo.write_buffer.data(); + buffSend.len = write_size; + + //We need a new overlapped struct for EACH overlapped operation. + //we reuse them over and over. + io_context_info* p_io_info = sockinfo.get_pool_io_context(); + p_io_info->eOperation = IO_OP_WRITE; + + int nRet = WSASend(sock, &buffSend, 1, &dwSendNumBytes, dwFlags, &(p_io_info->overlapped), NULL); + if (SOCKET_ERROR == nRet) + { + int error = WSAGetLastError(); + if (error != ERROR_IO_PENDING) + { + //Fire the disconnect handler + sockinfo.disconnection_handler(*this); + return false; + } + } + + return true; +} + +//function used by worker thread(s) used to process io requests +int +io_service::process_io(void) +{ + BOOL bSuccess = FALSE; + int nRet = 0; + LPWSAOVERLAPPED pOverlapped = NULL; + sock_info* psock_info = NULL; + io_context_info* pio_info = NULL; + + DWORD dwRecvNumBytes = 0; + DWORD dwSendNumBytes = 0; + DWORD dwFlags = 0; + DWORD io_size = 0; + enIoOperation e_op; + int sock_error = 0; + + while (!m_should_stop) + { + // continually loop to service io completion packets + psock_info = NULL; + pOverlapped = NULL; + + bSuccess = GetQueuedCompletionStatus(m_completion_port, &io_size, (PDWORD_PTR)&psock_info, (LPOVERLAPPED *)&pOverlapped, INFINITE); + if (!bSuccess) + { + //client socket must have died. continue... + if (0 == io_size) + { + sock_error = WSAGetLastError(); + //Fire the disconnect handler if we can. + if(psock_info && sock_error != ERROR_CONNECTION_ABORTED) //Seems the psock_info structure is crap when ERROR_CONNECTION_ABORTED is captured. + psock_info->disconnection_handler(*this); + continue; + } + if (m_should_stop) + return 0; + } + + //get the base address of the struct holding lpOverlapped (the io_context_info) pointer. + if (pOverlapped) + pio_info = CONTAINING_RECORD(pOverlapped, io_context_info, overlapped); + else + pio_info = NULL; + + // Somebody used PostQueuedCompletionStatus to post an I/O packet with + // a NULL CompletionKey (or if we get one for any reason). It is time to exit. + if (!psock_info || !pOverlapped) + return 0; + + e_op = pio_info->eOperation; + + //Push it to the pool so another IO operation can use it again later + if(pio_info) + psock_info->return_pool_io_context(pio_info); + pio_info = NULL; + + // First check to see if an error has occurred on the socket and if so + // then close the socket and cleanup the SOCKET_INFORMATION structure associated with the socket + if(0 == io_size) + { + //Fire the disconnect handler. Check if stopped first because of some weird timing issues that may get us here. + if(bSuccess && false == m_should_stop && psock_info) + psock_info->disconnection_handler(*this); + continue; + } + + // determine what type of IO packet has completed by checking the io_context_info + // associated with this io operation. This will determine what action to take. + switch (e_op) + { + case IO_OP_READ: // a read operation has completed + //TODO Resize Read buffer to the size that we read? + { + std::lock_guard lock(m_socket_mutex); + if (m_sockets.find(psock_info->hsock) != m_sockets.end()) + psock_info->read_callback(io_size); + } + break; + + case IO_OP_WRITE: // a write operation has completed + //Lock our socket structure before updating the write_size + { + std::lock_guard socklock(psock_info->sock_info_mutex); + psock_info->write_size -= io_size; + } + { + std::lock_guard lock(m_socket_mutex); + if (m_sockets.find(psock_info->hsock) != m_sockets.end()) + psock_info->write_callback(io_size); + } + break; + } //switch + } //while + + return 0; +} + +} //! network +} //! cpp_redis diff --git a/sources/network/windows/tcp_client.cpp b/sources/network/windows/tcp_client.cpp new file mode 100644 index 00000000..d5e33c93 --- /dev/null +++ b/sources/network/windows/tcp_client.cpp @@ -0,0 +1,239 @@ +#include + +#pragma warning(disable:4996) //Disable "The POSIX name for this item is deprecated" warnings for gethostbyname() +#include + +#include +#include + +namespace cpp_redis { + +namespace network { + +//! note that we call io_service::get_instance in the init list +//! +//! this will force force io_service instance creation +//! this is a workaround to handle static object destructions order +//! +//! that way, any object containing a tcp_client has an attribute (or through its attributes) +//! is guaranteed to be destructed before the io_service is destructed, even if it is global +tcp_client::tcp_client(const std::shared_ptr& IO) +: m_io_service(IO ? IO : io_service::get_instance()) +, m_sock(-1) +, m_is_connected(false) +, m_receive_handler(nullptr) +, m_disconnection_handler(nullptr) +{ + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client created"); +} + +tcp_client::~tcp_client(void) { + disconnect(); + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client destroyed"); +} + +void +tcp_client::connect(const std::string& host, unsigned int port, + const disconnection_handler_t& disconnection_handler, + const receive_handler_t& receive_handler) +{ + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client attempts to connect"); + + if (m_is_connected) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client is already connected"); + return throw cpp_redis::redis_error("Client already connected"); + } + + //! create the socket + int nZero = 0; + //Enable socket for overlapped i/o + m_sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); + if (m_sock < 0) + throw redis_error("Can't open a socket"); + + //Instruct the TCP stack to directly perform I/O using the buffer provided in our I/O call. + //The advantage is performance because we save a buffer copy between the TCP stack buffer + //and our user buffer for each I/O call. + //BUT we have to make sure we don't access the buffer once it's submitted for overlapped operation and before the overlapped operation completes! + if (0 != setsockopt(m_sock, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero))) { + return throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to disable buffering"); + } + + //! get the server's DNS entry + struct hostent *server = gethostbyname(host.c_str()); + if (!server) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client could not resolve DNS"); + throw redis_error("No such host: " + host); + } + + //! build the server's Internet address + struct sockaddr_in server_addr; + std::memset(&server_addr, 0, sizeof(server_addr)); + std::memcpy(&server_addr.sin_addr.s_addr, server->h_addr, server->h_length); + server_addr.sin_port = htons(port); + server_addr.sin_family = AF_INET; + + //! create a connection with the server + if (::connect(m_sock, reinterpret_cast(&server_addr), sizeof(server_addr)) < 0) { + throw redis_error("Fail to connect to " + host + ":" + std::to_string(port)); + } + + //! add socket to the io_service and set the disconnection & recv handlers + m_disconnection_handler = disconnection_handler; + m_receive_handler = receive_handler; + + u_long ulValue = 1; + if (0 != ioctlsocket(m_sock, FIONBIO, &ulValue)) //Set socket to non blocking. + throw cpp_redis::redis_error("tcp_client::connect() setsockopt failed to set socket to non-blocking"); + + m_io_service->track(m_sock, std::bind(&tcp_client::io_service_disconnection_handler, this, std::placeholders::_1)); + m_is_connected = true; + + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client connected"); + + //! start async read + async_read(); +} + +void +tcp_client::disconnect(void) { + if (!m_is_connected) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client already disconnected"); + return ; + } + + m_is_connected = false; + m_io_service->untrack(m_sock); + + closesocket(m_sock); + m_sock = INVALID_SOCKET; + + clear_buffer(); + reset_state(); + + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client disconnected"); +} + +void +tcp_client::send(const std::string& buffer) { + send(std::vector{ buffer.begin(), buffer.end() }); +} + +void +tcp_client::send(const std::vector& buffer) { + if (!m_is_connected) { + __CPP_REDIS_LOG(error, "cpp_redis::network::tcp_client is not connected"); + throw redis_error("Not connected"); + } + + if (!buffer.size()) { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has nothing to send"); + return ; + } + + std::lock_guard lock(m_write_buffer_mutex); + + //! concat buffer + m_write_buffer.push_back(buffer); + + //! if there were already bytes in buffer, simply return + //! otherwise async_write calls itself recursively until the write buffer is empty + if (m_write_buffer.size() > 1) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client is already processing an async_write"); + return ; + } + + async_write(); +} + +void +tcp_client::async_read(void) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_read"); + + m_io_service->async_read(m_sock, m_read_buffer, READ_SIZE, + [&](std::size_t length) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client received data"); + + if (m_receive_handler) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls receive_handler"); + + if (!m_receive_handler(*this, + { m_read_buffer.begin(), m_read_buffer.begin() + length })) + { + __CPP_REDIS_LOG(warn, "cpp_redis::network::tcp_client has been asked for disconnection by receive_handler"); + disconnect(); + return; + } + } + + //! clear read buffer and re-issue async read to receive more incoming bytes + m_read_buffer.clear(); + + if (m_is_connected) + async_read(); + } + ); +} + +void +tcp_client::async_write(void) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client starts async_write"); + + m_io_service->async_write(m_sock, m_write_buffer.front(), m_write_buffer.front().size(), + [&](std::size_t length) + { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client wrote data and cleans write_buffer"); + std::lock_guard lock(m_write_buffer_mutex); + + //Remove what has already been sent and see if we have any more to send + if (length >= m_write_buffer.front().size()) + m_write_buffer.pop_front(); + else + m_write_buffer.front().erase(m_write_buffer.front().begin(), m_write_buffer.front().begin() + length); + + //If we still have data to write the call ourselves recursivly until the buffer is completely sent + if (m_is_connected && m_write_buffer.size()) + async_write(); + } + ); +} + +bool +tcp_client::is_connected(void) { + return m_is_connected; +} + +void +tcp_client::io_service_disconnection_handler(network::io_service&) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client has been disconnected"); + + reset_state(); + + if (m_disconnection_handler) { + __CPP_REDIS_LOG(debug, "cpp_redis::network::tcp_client calls disconnection handler"); + m_disconnection_handler(*this); + } +} + +void +tcp_client::reset_state(void) { + if(m_sock != INVALID_SOCKET) + closesocket(m_sock); + m_is_connected = false; + m_sock = INVALID_SOCKET; + + clear_buffer(); +} + +void +tcp_client::clear_buffer(void) { + std::lock_guard lock(m_write_buffer_mutex); + m_write_buffer.clear(); + m_read_buffer.clear(); +} + +} //! network + +} //! cpp_redis diff --git a/sources/redis_client.cpp b/sources/redis_client.cpp index c44c70fd..08536905 100644 --- a/sources/redis_client.cpp +++ b/sources/redis_client.cpp @@ -3,7 +3,8 @@ namespace cpp_redis { -redis_client::redis_client(void) { +redis_client::redis_client(const std::shared_ptr& IO) +: m_client(IO) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_client created"); } diff --git a/sources/redis_subscriber.cpp b/sources/redis_subscriber.cpp index 76f777e5..bc190ff2 100644 --- a/sources/redis_subscriber.cpp +++ b/sources/redis_subscriber.cpp @@ -4,7 +4,8 @@ namespace cpp_redis { -redis_subscriber::redis_subscriber(void) { +redis_subscriber::redis_subscriber(const std::shared_ptr& IO) : + m_client(IO) { __CPP_REDIS_LOG(debug, "cpp_redis::redis_subscriber created"); } @@ -124,8 +125,8 @@ redis_subscriber::handle_subscribe_reply(const std::vector& reply) { const auto& message = reply[2]; if (!title.is_string() - || !channel.is_string() - || !message.is_string()) + || !channel.is_string() + || !message.is_string()) return ; if (title.as_string() != "message") diff --git a/sources/reply.cpp b/sources/reply.cpp index cc674e33..36944ad8 100644 --- a/sources/reply.cpp +++ b/sources/reply.cpp @@ -1,5 +1,5 @@ -#include -#include +#include "cpp_redis/reply.hpp" +#include "cpp_redis/redis_error.hpp" namespace cpp_redis { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 02a67e0c..439d2e16 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,7 +8,9 @@ project(${PROJECT} CXX) ### # compilation options ### -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +IF (NOT WIN32) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +ENDIF (NOT WIN32) ### @@ -36,7 +38,11 @@ endforeach() # executable ### add_executable(${PROJECT} ${SOURCES}) -target_link_libraries(${PROJECT} cpp_redis gtest pthread) -set_target_properties(${PROJECT} - PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target") + +target_link_libraries(${PROJECT} cpp_redis gtest) + +IF (WIN32) + target_link_libraries(${PROJECT} ws2_32) +ELSE () + target_link_libraries(${PROJECT} pthread) +ENDIF (WIN32)