Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,24 @@
*.a
*.lib

build
deps

# Executables
*.exe
*.out
*.app
*.sln
*.vcxproj
*.suo
*.user
*.filters
*.VC.db
*.opendb
.gitignore
cmake_install.cmake

build
deps
CMakeCache.txt
CMakeFiles/
Release/
Debug/
46 changes: 46 additions & 0 deletions .gitignore.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Compiled Object files
*.slo
*.lo
*.o
*.obj

# Precompiled Headers
*.gch
*.pch

# Compiled Dynamic libraries
*.so
*.dylib
*.dll

# Fortran module files
*.mod

# Compiled Static libraries
*.lai
*.la
*.a
*.lib

build
deps

# Executables
*.exe
*.out
*.app
*.sln
*.vcxproj
*.suo
*.user
*.filters
*.VC.db
*.opendb
.gitignore
cmake_install.cmake

rts-redis-test/
CMakeCache.txt
CMakeFiles/
Release/
Debug/
11 changes: 3 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,12 @@ set_target_properties(${PROJECT}
PROPERTIES
LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/target")

IF (READ_SIZE)
IF (CPP_REDIS_READ_SIZE)
set_target_properties(${PROJECT}
PROPERTIES
COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}")
ENDIF (READ_SIZE)
COMPILE_DEFINITIONS "CPP_REDIS_READ_SIZE=${CPP_REDIS_READ_SIZE}")
ENDIF (CPP_REDIS_READ_SIZE)

IF (NO_LOGGING)
set_target_properties(${PROJECT}
PROPERTIES
COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}")
ENDIF (NO_LOGGING)

###
# install
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ A [Wiki](https://github.com/Cylix/cpp_redis/wiki) is available and provides docu
Some examples are provided in this repository:
* [redis_client.cpp](examples/redis_client.cpp) shows how to use the redis client class.
* [redis_subscriber.cpp](examples/redis_subscriber.cpp) shows how to use the redis subscriber class.
* [logger.cpp](examples/logger.cpp) shows how to setup a logger for cpp_redis.

These examples can also be found inside the [Wiki](https://github.com/Cylix/cpp_redis/wiki/Examples).

Expand Down
35 changes: 35 additions & 0 deletions examples/redis_subscriber.cpp.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include <cpp_redis/cpp_redis>

#include <signal.h>
#include <iostream>

volatile std::atomic_bool should_exit(false);
cpp_redis::redis_subscriber sub;

void
sigint_handler(int) {
std::cout << "disconnected (sigint handler)" << std::endl;
sub.disconnect();
should_exit = true;
}

int
main(void) {
sub.connect("127.0.0.1", 6379, [](cpp_redis::redis_subscriber&) {
std::cout << "sub disconnected (disconnection handler)" << std::endl;
should_exit = true;
});

sub.subscribe("some_chan", [] (const std::string& chan, const std::string& msg) {
std::cout << "MESSAGE " << chan << ": " << msg << std::endl;
});
sub.psubscribe("*", [] (const std::string& chan, const std::string& msg) {
std::cout << "PMESSAGE " << chan << ": " << msg << std::endl;
});
sub.commit();

signal(SIGINT, &sigint_handler);
while (not should_exit);

return 0;
}
7 changes: 6 additions & 1 deletion includes/cpp_redis/network/redis_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
#include <vector>
#include <functional>

#ifdef _MSC_VER
#include <cpp_redis/network/win_tcp_client.hpp>
#else
#include <cpp_redis/network/tcp_client.hpp>
#endif

#include <cpp_redis/builders/reply_builder.hpp>

namespace cpp_redis {
Expand All @@ -15,7 +20,7 @@ namespace network {
class redis_connection {
public:
//! ctor & dtor
redis_connection(void);
redis_connection::redis_connection(const std::shared_ptr<io_service> pIO);
~redis_connection(void);

//! copy ctor & assignment operator
Expand Down
149 changes: 149 additions & 0 deletions includes/cpp_redis/network/win_io_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#pragma once

#include <thread>
#include <atomic>
#include <unordered_map>
#include <vector>
#include <mutex>

#include <WinSock2.h>

#define MAX_BUFF_SIZE CPP_REDIS_READ_SIZE
#define MAX_WORKER_THREADS 16

namespace cpp_redis {

namespace network {

typedef enum _enIoOperation {
//IO_OP_ACCEPT,
IO_OP_READ,
IO_OP_WRITE
} enIoOperation;


class io_service {
public:
//! instance getter (singleton pattern)
static const std::shared_ptr<io_service>& get_instance(void);
io_service(size_t max_worker_threads = MAX_WORKER_THREADS);
~io_service(void);

void shutdown();

private:
//! copy ctor & assignment operator
io_service(const io_service&) = delete;
io_service& operator=(const io_service&) = delete;

public:
//! disconnection handler declaration
typedef std::function<void(io_service&)> disconnection_handler_t;

//! add or remove a given socket from the io service
//! untrack should never be called from inside a callback
void track(SOCKET sock, const disconnection_handler_t& handler);
void untrack(SOCKET sock);

//! asynchronously read read_size bytes and append them to the given buffer
//! on completion, call the read_callback to notify of the success or failure of the operation
//! return false if another async_read operation is in progress or socket is not registered
typedef std::function<void(std::size_t)> read_callback_t;
bool async_read(SOCKET socket, std::vector<char>& buffer, std::size_t read_size, const read_callback_t& callback);

//! asynchronously write write_size bytes from buffer to the specified fd
//!on completion, call the write_callback to notify of the success or failure of the operation
//! return false if another async_write operation is in progress or socket is not registered
typedef std::function<void(std::size_t)> write_callback_t;
bool async_write(SOCKET socket, const std::vector<char>& buffer, std::size_t write_size, const write_callback_t& callback);

private:

struct io_context_info : OVERLAPPED {
WSAOVERLAPPED overlapped;
enIoOperation eOperation;
};

//! simple struct to keep track of ongoing operations on a given sockeet
class sock_info
{
public:
sock_info(void) = default;
virtual ~sock_info(void)
{
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
for (auto it = io_contexts_pool.begin(); it != io_contexts_pool.end(); it++)
delete *it;

io_contexts_pool.clear();
}

SOCKET hsock;
std::size_t sent_bytes;

//Must protect the members of our structure from access by multiple threads during IO Completion
std::recursive_mutex sock_info_mutex;

//We keep a simple vector of io_context_info structs to reuse for overlapped WSARecv and WSASend operations
//Since each must have its OWN struct if we issue them at the same time.
//othewise things get tangled up and borked.
std::vector<io_context_info*> io_contexts_pool;

disconnection_handler_t disconnection_handler;

std::vector<char>* read_buffer;
read_callback_t read_callback;

std::vector<char> write_buffer;
std::size_t write_size;
write_callback_t write_callback;

io_context_info* get_pool_io_context() {
io_context_info* pInfo = NULL;
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
if (!io_contexts_pool.empty())
{
pInfo = io_contexts_pool.back();
io_contexts_pool.pop_back();
}
if (!pInfo)
pInfo = new io_context_info();
//MUST clear the overlapped structure between IO calls!
memset(&pInfo->overlapped, 0, sizeof(OVERLAPPED));
return pInfo;
}

void return_pool_io_context(io_context_info* p_io) {
std::lock_guard<std::recursive_mutex> socklock(sock_info_mutex);
io_contexts_pool.push_back(p_io);
}
};

typedef std::function<void()> callback_t;

//! wait for incoming events and notify
int process_io(void);

HANDLE m_completion_port;
unsigned int m_worker_thread_pool_size;
std::vector<std::thread> m_worker_threads; //vector containing all the threads we start to service our i/o requests

private:
//! whether the worker should terminate or not
std::atomic_bool m_should_stop;

//! tracked sockets
std::unordered_map<SOCKET, sock_info> m_sockets;

//! mutex to protect m_notify_socket access against race condition
//!
//! specific mutex for untrack: we dont want someone to untrack a socket while we process it
//! this behavior could cause some issues when executing callbacks in another thread
//! for example, obj is destroyed, in its dtor it untracks the socket, but at the same time
//! a callback is executed from within another thread: the untrack mutex avoid this without being costly
std::recursive_mutex m_socket_mutex;
};

} //! network

} //! cpp_redis
84 changes: 84 additions & 0 deletions includes/cpp_redis/network/win_tcp_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include <string>
#include <vector>
#include <mutex>
#include <thread>
#include <atomic>
#include <stdexcept>

#include <cpp_redis/network/win_io_service.hpp>
#include <cpp_redis/redis_error.hpp>

#ifndef __CPP_REDIS_READ_SIZE
# define __CPP_REDIS_READ_SIZE 4096
#endif /* __CPP_REDIS_READ_SIZE */

namespace cpp_redis {

namespace network {

//! tcp_client
//! async tcp client based on boost asio
class tcp_client {
public:
//! ctor & dtor
tcp_client(const std::shared_ptr<io_service> pIO=NULL);
~tcp_client(void);

//! assignment operator & copy ctor
tcp_client(const tcp_client&) = delete;
tcp_client& operator=(const tcp_client&) = delete;

//! returns whether the client is connected or not
bool is_connected(void);

//! handle connection & disconnection
typedef std::function<void(tcp_client&)> disconnection_handler_t;
typedef std::function<bool(tcp_client&, const std::vector<char>& buffer)> receive_handler_t;
void connect(const std::string& host, unsigned int port,
const disconnection_handler_t& disconnection_handler = nullptr,
const receive_handler_t& receive_handler = nullptr);
void disconnect(void);

//! send data
void send(const std::string& buffer);
void send(const std::vector<char>& buffer);

private:
//! make async read and write operations
void async_read(void);
void async_write(void);

//! io service callback
void io_service_disconnection_handler(io_service&);

void reset_state(void);
void clear_buffer(void);

private:
//! io service instance
const std::shared_ptr<network::io_service> m_io_service;

//! socket
SOCKET m_sock;

//! is connected
std::atomic_bool m_is_connected;

//! buffers
static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE;
std::vector<char> m_read_buffer;
std::vector<char> m_write_buffer;

//! handlers
receive_handler_t m_receive_handler;
disconnection_handler_t m_disconnection_handler;

//! thread safety
std::mutex m_write_buffer_mutex;
};

} //! network

} //! cpp_redis
Loading