Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,24 @@
*.a
*.lib

build
deps

# Executables
*.exe
*.out
*.app
*.sln
*.vcxproj
*.suo
*.user
*.filters
*.VC.db
*.opendb
.gitignore
cmake_install.cmake

build
deps
CMakeCache.txt
CMakeFiles/
Release/
Debug/
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ install:
- if [ "$CXX" = "g++" ]; then export CXX="g++-4.8" CC="gcc-4.8"; fi
- ./install_deps.sh

script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./target/cpp_redis_tests
script: mkdir build && cd build && cmake .. -DBUILD_TESTS=true -DBUILD_EXAMPLES=true && make && ./bin/cpp_redis_tests
49 changes: 38 additions & 11 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
cmake_minimum_required(VERSION 2.8.7)
set(CMAKE_MACOSX_RPATH 1)


###
# verbose make
###
Expand All @@ -20,7 +21,14 @@ project(${PROJECT} CXX)
###
# compilation options
###
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
IF (WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3 /O2")
add_definitions(-D_UNICODE)
add_definitions(-DUNICODE)
add_definitions(-DWIN32_LEAN_AND_MEAN)
ELSE ()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -W -Wall -Wextra -O3")
ENDIF (WIN32)


###
Expand All @@ -47,23 +55,40 @@ link_directories(${GTEST_LIBS})
###
# sources
###
set(DIRS "sources" "sources/network" "sources/builders")
foreach(dir ${DIRS})
set(SRC_DIRS "sources" "sources/network" "sources/builders")

IF (WIN32)
set(SRC_DIRS ${SRC_DIRS} "sources/network/windows")
ELSE ()
set(SRC_DIRS ${SRC_DIRS} "sources/network/unix")
ENDIF (WIN32)

foreach(dir ${SRC_DIRS})
# get directory sources
file(GLOB s_${dir} "${dir}/*.cpp")
# set sources
set(SOURCES ${SOURCES} ${s_${dir}})
endforeach()


###
# outputs
###
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)


###
# executable
###
add_library(${PROJECT} SHARED ${SOURCES})
target_link_libraries(${PROJECT} pthread)
set_target_properties(${PROJECT}
PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target")
add_library(${PROJECT} STATIC ${SOURCES})

IF (WIN32)
target_link_libraries(${PROJECT} ws2_32)
ELSE ()
target_link_libraries(${PROJECT} pthread)
ENDIF (WIN32)

IF (READ_SIZE)
set_target_properties(${PROJECT}
Expand All @@ -74,14 +99,16 @@ ENDIF (READ_SIZE)
IF (NO_LOGGING)
set_target_properties(${PROJECT}
PROPERTIES
COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}")
COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}")
ENDIF (NO_LOGGING)


###
# install
###
install (TARGETS ${PROJECT} DESTINATION lib)
install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include)
install (DIRECTORY ${CMAKE_BINARY_DIR}/lib/ DESTINATION lib USE_SOURCE_PERMISSIONS)
install (DIRECTORY ${CMAKE_BINARY_DIR}/bin/ DESTINATION bin USE_SOURCE_PERMISSIONS)
install (DIRECTORY ${CPP_REDIS_INCLUDES}/ DESTINATION include USE_SOURCE_PERMISSIONS)


###
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ A [Wiki](https://github.com/Cylix/cpp_redis/wiki) is available and provides docu
Some examples are provided in this repository:
* [redis_client.cpp](examples/redis_client.cpp) shows how to use the redis client class.
* [redis_subscriber.cpp](examples/redis_subscriber.cpp) shows how to use the redis subscriber class.
* [logger.cpp](examples/logger.cpp) shows how to setup a logger for cpp_redis.

These examples can also be found inside the [Wiki](https://github.com/Cylix/cpp_redis/wiki/Examples).

Expand Down
13 changes: 3 additions & 10 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
###
# compilation options
###
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
IF (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
ENDIF (NOT WIN32)


###
Expand All @@ -16,18 +18,9 @@ include_directories(${PROJECT_SOURCE_DIR}/includes
###
add_executable(redis_client redis_client.cpp)
target_link_libraries(redis_client cpp_redis)
set_target_properties(redis_client
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target")

add_executable(redis_subscriber redis_subscriber.cpp)
target_link_libraries(redis_subscriber cpp_redis)
set_target_properties(redis_subscriber
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target")

add_executable(logger logger.cpp)
target_link_libraries(logger cpp_redis)
set_target_properties(logger
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target")
35 changes: 35 additions & 0 deletions examples/redis_subscriber.cpp.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include <cpp_redis/cpp_redis>

#include <signal.h>
#include <iostream>

volatile std::atomic_bool should_exit(false);
cpp_redis::redis_subscriber sub;

void
sigint_handler(int) {
std::cout << "disconnected (sigint handler)" << std::endl;
sub.disconnect();
should_exit = true;
}

int
main(void) {
sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) {
std::cout << "sub disconnected (disconnection handler)" << std::endl;
should_exit = true;
});

sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) {
std::cout << "MESSAGE " << chan << ": " << msg << std::endl;
});
sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) {
std::cout << "PMESSAGE " << chan << ": " << msg << std::endl;
});
sub.commit();

signal(SIGINT, &sigint_handler);
while (not should_exit);

return 0;
}
9 changes: 7 additions & 2 deletions includes/cpp_redis/network/redis_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
#include <vector>
#include <functional>

#include <cpp_redis/network/tcp_client.hpp>
#ifdef _MSC_VER
# include <cpp_redis/network/windows/tcp_client.hpp>
#else
# include <cpp_redis/network/unix/tcp_client.hpp>
#endif /* _MSC_VER */

#include <cpp_redis/builders/reply_builder.hpp>

namespace cpp_redis {
Expand All @@ -15,7 +20,7 @@ namespace network {
class redis_connection {
public:
//! ctor & dtor
redis_connection(void);
redis_connection(const std::shared_ptr<io_service>& IO);
~redis_connection(void);

//! copy ctor & assignment operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <atomic>
#include <stdexcept>

#include <cpp_redis/network/io_service.hpp>
#include <cpp_redis/network/unix/io_service.hpp>
#include <cpp_redis/redis_error.hpp>

#ifndef __CPP_REDIS_READ_SIZE
Expand All @@ -23,7 +23,7 @@ namespace network {
class tcp_client {
public:
//! ctor & dtor
tcp_client(void);
tcp_client(const std::shared_ptr<io_service>& IO = nullptr);
~tcp_client(void);

//! assignment operator & copy ctor
Expand Down
149 changes: 149 additions & 0 deletions includes/cpp_redis/network/windows/io_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#pragma once

#include <thread>
#include <atomic>
#include <unordered_map>
#include <vector>
#include <mutex>

#include <WinSock2.h>

#define MAX_BUFF_SIZE __CPP_REDIS_READ_SIZE
#define MAX_WORKER_THREADS 16

namespace cpp_redis {

namespace network {

typedef enum _enIoOperation {
//IO_OP_ACCEPT,
IO_OP_READ,
IO_OP_WRITE
} enIoOperation;


class io_service {
public:
//! instance getter (singleton pattern)
static const std::shared_ptr<io_service>& get_instance(void);
io_service(size_t max_worker_threads = MAX_WORKER_THREADS);
~io_service(void);

void shutdown();

private:
//! copy ctor & assignment operator
io_service(const io_service&) = delete;
io_service& operator=(const io_service&) = delete;

public:
//! disconnection handler declaration
typedef std::function<void(io_service&)> disconnection_handler_t;

//! add or remove a given socket from the io service
//! untrack should never be called from inside a callback
void track(SOCKET sock, const disconnection_handler_t& handler);
void untrack(SOCKET sock);

//! asynchronously read read_size bytes and append them to the given buffer
//! on completion, call the read_callback to notify of the success or failure of the operation
//! return false if another async_read operation is in progress or socket is not registered
typedef std::function<void(std::size_t)> read_callback_t;
bool async_read(SOCKET socket, std::vector<char>& buffer, std::size_t read_size, const read_callback_t& callback);

//! asynchronously write write_size bytes from buffer to the specified fd
//!on completion, call the write_callback to notify of the success or failure of the operation
//! return false if another async_write operation is in progress or socket is not registered
typedef std::function<void(std::size_t)> write_callback_t;
bool async_write(SOCKET socket, const std::vector<char>& buffer, std::size_t write_size, const write_callback_t& callback);

private:

struct io_context_info : OVERLAPPED {
WSAOVERLAPPED overlapped;
enIoOperation eOperation;
};

//! simple struct to keep track of ongoing operations on a given sockeet
class sock_info
{
public:
sock_info(void) = default;
virtual ~sock_info(void)
{
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
for (auto it = io_contexts_pool.begin(); it != io_contexts_pool.end(); it++)
delete *it;

io_contexts_pool.clear();
}

SOCKET hsock;
std::size_t sent_bytes;

//Must protect the members of our structure from access by multiple threads during IO Completion
std::recursive_mutex sock_info_mutex;

//We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations
//Since each must have its OWN struct if we issue them at the same time.
//othewise things get tangled up and borked.
std::vector<io_context_info*> io_contexts_pool;

disconnection_handler_t disconnection_handler;

std::vector<char>* read_buffer;
read_callback_t read_callback;

std::vector<char> write_buffer;
std::size_t write_size;
write_callback_t write_callback;

io_context_info* get_pool_io_context() {
io_context_info* pInfo = NULL;
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
if (!io_contexts_pool.empty())
{
pInfo = io_contexts_pool.back();
io_contexts_pool.pop_back();
}
if (!pInfo)
pInfo = new io_context_info();
//MUST clear the overlapped structure between IO calls!
memset(&pInfo->overlapped, 0, sizeof(OVERLAPPED));
return pInfo;
}

void return_pool_io_context(io_context_info* p_io) {
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
io_contexts_pool.push_back(p_io);
}
};

typedef std::function<void()> callback_t;

//! wait for incoming events and notify
int process_io(void);

HANDLE m_completion_port;
unsigned int m_worker_thread_pool_size;
std::vector<std::thread> m_worker_threads; //vector containing all the threads we start to service our i/o requests

private:
//! whether the worker should terminate or not
std::atomic_bool m_should_stop;

//! tracked sockets
std::unordered_map<SOCKET, sock_info> m_sockets;

//! mutex to protect m_notify_socket access against race condition
//!
//! specific mutex for untrack: we dont want someone to untrack a socket while we process it
//! this behavior could cause some issues when executing callbacks in another thread
//! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time
//! a callback is executed from within another thread: the untrack mutex avoid this without being costly
std::recursive_mutex m_socket_mutex;
};

} //! network

} //! cpp_redis
Loading