Skip to content

Commit

Permalink
Pipeline support on each iteration of sending data over the socket
Browse files Browse the repository at this point in the history
  • Loading branch information
hamidr committed Nov 24, 2016
1 parent 1186cd6 commit 25c3522
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 27 deletions.
29 changes: 22 additions & 7 deletions includes/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,33 @@ namespace async_redis {
}

void send(const string& command, const reply_cb_t& reply_cb) {
//accumilate requests and then send them
buffer_write_ += command;
req_queue_.emplace(reply_cb, nullptr);

socket_->async_write(command, [this, reply_cb]() {
req_queue_.emplace(reply_cb, nullptr);
if (!write_queued_)
socket_->async_write(buffer_write_, std::bind(&connection::request_recieved, this, std::placeholders::_1));

if (req_queue_.size() == 1)
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
});
if (req_queue_.size() == 1)
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));

write_queued_ = true;
}

private:
void request_recieved(ssize_t sent_chunk_len) {
write_queued_ = false;
buffer_write_.erase(0, sent_chunk_len);

if (buffer_write_.size())
socket_->async_write(buffer_write_, std::bind(&connection::request_recieved, this, std::placeholders::_1));
}

void reply_received(int len) {
ssize_t acc = 0;

while (acc < len && req_queue_.size()) {
while (acc < len && req_queue_.size())
{
auto& request = req_queue_.front();

auto &cb = std::get<0>(request);
Expand All @@ -67,11 +80,13 @@ namespace async_redis {
}
}

if (req_queue_.size() != 0)
if (req_queue_.size())
socket_->async_read(data_, max_data_size, std::bind(&connection::reply_received, this, std::placeholders::_1));
}

private:
string buffer_write_;
bool write_queued_ = false;
std::unique_ptr<SocketType> socket_;
InputOutputHandler& event_loop_;
std::queue<std::tuple<reply_cb_t, parser_t>> req_queue_;
Expand Down
31 changes: 16 additions & 15 deletions includes/network/async_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ namespace async_redis {
{
public:
using socket_identifier_t = typename InputOutputHanler::socket_identifier_t;
using recv_cb_t = std::function<void (int)>;
using ready_cb_t = std::function<void ()>;
using recv_cb_t = std::function<void (ssize_t)>;
using ready_cb_t = std::function<void (ssize_t)>;
using connect_handler_t = std::function<void (bool)>;

async_socket(InputOutputHanler& io)
Expand All @@ -45,21 +45,18 @@ namespace async_redis {
}

~async_socket() {
if (is_connected_) {
close();
io_.unwatch(id_);
}
}

inline int send(const string& data) {
inline ssize_t send(const string& data) {
return ::send(fd_, data.data(), data.size(), 0);
}

inline int send(const char *data, size_t len) {
inline ssize_t send(const char *data, size_t len) {
return ::send(fd_, data, len, 0);
}

inline int receive(char *data, size_t len) {
inline ssize_t receive(char *data, size_t len) {
return ::recv(fd_, data, len, 0);
}

Expand All @@ -72,6 +69,12 @@ namespace async_redis {
}

bool close() {
if (!is_connected_)
return true;

if(id_)
io_.unwatch(id_);

auto res = ::close(fd_) == 0;
is_connected_ = false;
fd_ = -1;
Expand All @@ -80,12 +83,11 @@ namespace async_redis {

void async_write(const string& data, const ready_cb_t& cb)
{
if (!is_connected())
if (!is_connected() || data.size() == 0)
return;

return io_.async_write(id_, [this, data, cb]() {
send(data);
cb();
return io_.async_write(id_, [this, &data, cb]() {
cb(send(data));
});
}

Expand All @@ -96,10 +98,9 @@ namespace async_redis {

return io_.async_read(id_, [&, buffer, max_len, cb]() {
auto l = receive(buffer, max_len);
if (l == 0) {
io_.unwatch(id_);
if (l == 0)
close();
}

cb(l);
});
}
Expand Down
2 changes: 1 addition & 1 deletion includes/network/tcp_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace tcp_server {
fflush(stdout);

if (command == "close") {
socket->async_write("good bye!", [this, &socket]() {
socket->async_write("good bye!", [this, &socket](ssize_t l) {
loop_.async_timeout(1, [this, &socket]() {
conns_.erase(socket);
});
Expand Down
8 changes: 8 additions & 0 deletions includes/redis_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ namespace async_redis {
send({"decr", field}, reply);
}

void ping(reply_cb_t reply) {
send({"ping"}, reply);
}

void select(uint catalog, reply_cb_t reply) {
send({"select", std::to_string(catalog)}, reply);
}

//just to cause error!
void err(reply_cb_t reply) {
send({"set 1"}, reply);
Expand Down
24 changes: 20 additions & 4 deletions test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,26 @@ int main(int argc, char** args)
return;
}

client.set("h1", "wwww", [&](parser_t paresed) {
std::cout << paresed->to_string() << std::endl;
client.get("h1", [&](parser_t p) {
std::cout << p->to_string() << std::endl;
client.select(0, []{});

client.set("h2", "wwww2", [&](parser_t p) {
//std::cout << "set h2 " << p->to_string() << std::endl << std::endl;
});
client.ping([&](parser_t p) {
//std::cout << "ping "<< p->to_string() << std::endl << std::endl;
});
client.get("h3", [&](parser_t p) {
//std::cout << "get h3 "<< p->to_string() << std::endl << std::endl;
});
client.ping([&](parser_t p) {
//std::cout << "ping "<<p->to_string() << std::endl << std::endl;
});


client.set("h4", "wwww", [&](parser_t paresed) {
//std::cout << "h4 www "<<paresed->to_string() << std::endl << std::endl;
client.get("h5", [&](parser_t p) {
//std::cout << "get h5 " <<p->to_string() << std::endl << std::endl;

client.set("wtff", "hello", [&](parser_t paresed) {
client.get("wtff", [](parser_t p2) {
Expand Down

0 comments on commit 25c3522

Please sign in to comment.