Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,25 @@ ELSE ()
target_link_libraries(${PROJECT} pthread)
ENDIF (WIN32)

# __CPP_REDIS_READ_SIZE
IF (READ_SIZE)
set_target_properties(${PROJECT}
PROPERTIES
COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}")
set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_READ_SIZE=${READ_SIZE}")
ENDIF (READ_SIZE)

IF (NO_LOGGING)
set_target_properties(${PROJECT}
PROPERTIES
COMPILE_DEFINITIONS "__CPP_REDIS_NO_LOGGING=${NO_LOGGING}")
ENDIF (NO_LOGGING)
# __CPP_REDIS_LOGGING_ENABLED
IF (LOGGING_ENABLED)
set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_LOGGING_ENABLED=${LOGGING_ENABLED}")
ENDIF (LOGGING_ENABLED)

# _CPP_REDIS_MAX_NB_FDS
IF (MAX_NB_FDS)
set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "_CPP_REDIS_MAX_NB_FDS=${MAX_NB_FDS}")
ENDIF (MAX_NB_FDS)

# __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS
IF (DEFAULT_NB_IO_SERVICE_WORKERS)
set_target_properties(${PROJECT} PROPERTIES COMPILE_DEFINITIONS "__CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS=${DEFAULT_NB_IO_SERVICE_WORKERS}")
ENDIF (DEFAULT_NB_IO_SERVICE_WORKERS)


###
Expand Down
9 changes: 4 additions & 5 deletions includes/cpp_redis/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ void warn(const std::string& msg, const std::string& file, unsigned int line);
void error(const std::string& msg, const std::string& file, unsigned int line);

//! convenience macro to log with file and line information
//! if __CPP_REDIS_NO_LOGGING, all logging related lines are removed from source code
#ifdef __CPP_REDIS_NO_LOGGING
#define __CPP_REDIS_LOG(level, msg)
#else
#ifdef __CPP_REDIS_LOGGING_ENABLED
#define __CPP_REDIS_LOG(level, msg) cpp_redis::level(msg, __FILE__, __LINE__);
#endif /* __CPP_REDIS_NO_LOGGING */
#else
#define __CPP_REDIS_LOG(level, msg)
#endif /* __CPP_REDIS_LOGGING_ENABLED */

} //! cpp_redis
71 changes: 71 additions & 0 deletions includes/cpp_redis/network/io_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#pragma once

#include <cstddef>
#include <functional>
#include <memory>
#include <vector>

#include <cpp_redis/network/socket.hpp>

#ifndef __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS
#define __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS 16
#endif /* __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS */

namespace cpp_redis {

namespace network {

class io_service {
public:
//! get default global instance
static const std::shared_ptr<network::io_service>& get_global_instance(void);
//! set default global instance
static void set_global_instance(const std::shared_ptr<network::io_service>& instance);

public:
//! ctor & dtor
io_service(size_t nb_workers);
virtual ~io_service(void) = default;

//! copy ctor & assignment operator
io_service(const io_service&) = default;
io_service& operator=(const io_service&) = default;

public:
//! disconnection handler declaration
typedef std::function<void(io_service&)> disconnection_handler_t;

//! add or remove a given fd from the io service
//! untrack should never be called from inside a callback
virtual void track(_sock_t sock, const disconnection_handler_t& handler) = 0;
virtual void untrack(_sock_t sock) = 0;

//! asynchronously read read_size bytes and append them to the given buffer
//! on completion, call the read_callback to notify of the success or failure of the operation
//! return false if another async_read operation is in progress or fd is not registered
typedef std::function<void(std::size_t)> read_callback_t;
virtual bool async_read(_sock_t sock, std::vector<char>& buffer, std::size_t read_size, const read_callback_t& callback) = 0;

//! asynchronously write write_size bytes from buffer to the specified fd
//! on completion, call the write_callback to notify of the success or failure of the operation
//! return false if another async_write operation is in progress or fd is not registered
typedef std::function<void(std::size_t)> write_callback_t;
virtual bool async_write(_sock_t sock, const std::vector<char>& buffer, std::size_t write_size, const write_callback_t& callback) = 0;

public:
size_t get_nb_workers(void) const;

private:
//! listen for incoming events and notify
virtual void process_io(void) = 0;

private:
size_t m_nb_workers;
};

//! multi-platform instance builder
std::shared_ptr<network::io_service> create_io_service(size_t nb_workers = __CPP_REDIS_DEFAULT_NB_IO_SERVICE_WORKERS);

} //! network

} //! cpp_redis
7 changes: 1 addition & 6 deletions includes/cpp_redis/network/redis_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
#include <string>
#include <vector>

#ifdef _MSC_VER
#include <cpp_redis/network/windows/tcp_client.hpp>
#else
#include <cpp_redis/network/unix/tcp_client.hpp>
#endif /* _MSC_VER */

#include <cpp_redis/builders/reply_builder.hpp>
#include <cpp_redis/network/tcp_client.hpp>

namespace cpp_redis {

Expand Down
20 changes: 20 additions & 0 deletions includes/cpp_redis/network/socket.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

namespace cpp_redis {

namespace network {

#ifdef _WIN32
#include <WinSock2.h>
typedef SOCKET _sock_t;
#else
typedef int _sock_t;
#endif /* _WIN32 */

#ifndef INVALID_SOCKET
#define INVALID_SOCKET -1
#endif /* INVALID_SOCKET */

} //! network

} //! cpp_redis
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#pragma once

#include <atomic>
#include <list>
#include <mutex>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>

#include <cpp_redis/network/unix/io_service.hpp>
#include <cpp_redis/network/io_service.hpp>
#include <cpp_redis/network/socket.hpp>
#include <cpp_redis/redis_error.hpp>

#ifndef __CPP_REDIS_READ_SIZE
Expand Down Expand Up @@ -56,20 +58,21 @@ class tcp_client {
void reset_state(void);
void clear_buffer(void);

void setup_socket(void);

private:
//! io service instance
std::shared_ptr<io_service> m_io_service;

//! socket fd
int m_fd;
_sock_t m_sock;

//! is connected
std::atomic_bool m_is_connected;

//! buffers
static const unsigned int READ_SIZE = __CPP_REDIS_READ_SIZE;
std::vector<char> m_read_buffer;
std::vector<char> m_write_buffer;
std::list<std::vector<char>> m_write_buffer;

//! handlers
receive_handler_t m_receive_handler;
Expand Down
48 changes: 19 additions & 29 deletions includes/cpp_redis/network/unix/io_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,34 @@
#include <sys/socket.h>
#include <unistd.h>

#include <cpp_redis/network/io_service.hpp>

#ifndef _CPP_REDIS_MAX_NB_FDS
#define _CPP_REDIS_MAX_NB_FDS 1024
#endif /* _CPP_REDIS_MAX_NB_FDS */

namespace cpp_redis {

namespace network {

class io_service {
public:
//! instance getter (singleton pattern)
static const std::shared_ptr<io_service>& get_instance(void);
namespace unix {

//! dtor
class io_service : public network::io_service {
public:
//! ctor & dtor
io_service(size_t nb_workers);
~io_service(void);

private:
//! ctor
io_service(void);

//! copy ctor & assignment operator
io_service(const io_service&) = delete;
io_service& operator=(const io_service&) = delete;

public:
//! disconnection handler declaration
typedef std::function<void(io_service&)> disconnection_handler_t;

//! add or remove a given fd from the io service
//! untrack should never be called from inside a callback
void track(int fd, const disconnection_handler_t& handler);
void untrack(int fd);

//! asynchronously read read_size bytes and append them to the given buffer
//! on completion, call the read_callback to notify of the success or failure of the operation
//! return false if another async_read operation is in progress or fd is not registered
typedef std::function<void(std::size_t)> read_callback_t;
bool async_read(int fd, std::vector<char>& buffer, std::size_t read_size, const read_callback_t& callback);

//! asynchronously write write_size bytes from buffer to the specified fd
//!on completion, call the write_callback to notify of the success or failure of the operation
//! return false if another async_write operation is in progress or fd is not registered
typedef std::function<void(std::size_t)> write_callback_t;
bool async_write(int fd, const std::vector<char>& buffer, std::size_t write_size, const write_callback_t& callback);
void track(_sock_t fd, const disconnection_handler_t& handler) override;
void untrack(_sock_t fd) override;

bool async_read(_sock_t fd, std::vector<char>& buffer, std::size_t read_size, const read_callback_t& callback) override;
bool async_write(_sock_t fd, const std::vector<char>& buffer, std::size_t write_size, const write_callback_t& callback) override;

private:
//! simple struct to keep track of ongoing operations on a given fd
Expand All @@ -73,7 +61,7 @@ class io_service {

private:
//! listen for incoming events and notify
void listen(void);
void process_io(void) override;

//! notify the poll call so that it can wake up to process new events
void notify_poll(void);
Expand Down Expand Up @@ -108,6 +96,8 @@ class io_service {
std::recursive_mutex m_fds_mutex;
};

} //! unix

} //! network

} //! cpp_redis
Loading