Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/speed_test_async_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ int main(int argc, char **argv) {
using next_layer_t = socket_t;
using Buffer = boost::asio::streambuf;
using Iterator = typename r::to_iterator<Buffer>::iterator_t;
using policy_t = r::parsing_policy::drop_result;
//using policy_t = r::parsing_policy::keep_result;
//using policy_t = r::parsing_policy::drop_result;
using policy_t = r::parsing_policy::keep_result;

if (argc < 2) {
std::cout << "Usage : " << argv[0] << " ip:port \n";
Expand Down
87 changes: 54 additions & 33 deletions include/bredis/impl/async_op.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,52 @@

namespace bredis {

template <typename Iterator, typename Policy> struct result_handler_t;
struct base_result_visitor_t : public boost::static_visitor<std::size_t> {
boost::system::error_code &error_code_;

base_result_visitor_t(boost::system::error_code &error_code)
: error_code_{error_code} {}

std::size_t operator()(const not_enough_data_t &) const { std::abort(); }

std::size_t operator()(const protocol_error_t &value) const {
error_code_ = value.code;
return 0;
}
};

template <typename Iterator, typename Policy> struct result_visitor_t;

template <typename Iterator>
struct result_handler_t<Iterator, parsing_policy::drop_result> {
struct result_visitor_t<Iterator, parsing_policy::drop_result>
: public base_result_visitor_t {
using base_t = base_result_visitor_t;
using policy_t = parsing_policy::drop_result;
using positive_result_t = parse_result_mapper_t<Iterator, policy_t>;

std::size_t replies_count;
positive_result_t &result;
size_t cumulative_consumption;
size_t count;
positive_result_t result;

result_handler_t(std::size_t replies_count_)
: replies_count{replies_count_},
cumulative_consumption{0}, count{0}, result{0} {}
static positive_result_t construct() { return positive_result_t{0}; }

result_visitor_t(boost::system::error_code &error_code,
std::size_t replies_count_, positive_result_t &result_)
: base_t{error_code}, replies_count{replies_count_}, result{result_},
cumulative_consumption{0}, count{0} {}

void init() {
// NO-OP;
}

bool on_result(positive_result_t &&parse_result) {
using base_t::operator();

size_t operator()(const positive_result_t &parse_result) {
++count;
cumulative_consumption += parse_result.consumed;
return count < replies_count;
// return parse_result.consumed;
return count < replies_count ? parse_result.consumed : 0;
}

void complete_result() {
Expand All @@ -49,26 +71,34 @@ struct result_handler_t<Iterator, parsing_policy::drop_result> {
};

template <typename Iterator>
struct result_handler_t<Iterator, parsing_policy::keep_result> {
struct result_visitor_t<Iterator, parsing_policy::keep_result>
: public base_result_visitor_t {
using base_t = base_result_visitor_t;
using policy_t = parsing_policy::keep_result;
using positive_result_t = parse_result_mapper_t<Iterator, policy_t>;

std::size_t replies_count;
positive_result_t &result;
size_t cumulative_consumption;
size_t count;
positive_result_t result;
markers::array_holder_t<Iterator> tmp_results;

result_handler_t(std::size_t replies_count_)
: replies_count{replies_count_}, cumulative_consumption{0}, count{0} {}
static positive_result_t construct() { return positive_result_t{}; }

result_visitor_t(boost::system::error_code &error_code,
std::size_t replies_count_, positive_result_t &result_)
: base_t{error_code}, replies_count{replies_count_}, result{result_},
cumulative_consumption{0}, count{0} {}

void init() { tmp_results.elements.reserve(replies_count); }

bool on_result(positive_result_t &&parse_result) {
using base_t::operator();

size_t operator()(const positive_result_t &parse_result) {
tmp_results.elements.emplace_back(std::move(parse_result.result));
++count;
cumulative_consumption += parse_result.consumed;
return count < replies_count;
return count < replies_count ? parse_result.consumed : 0;
}

void complete_result() {
Expand All @@ -89,45 +119,36 @@ template <typename DynamicBuffer, typename Policy> struct async_read_op_impl {
async_read_op_impl(DynamicBuffer &rx_buff, std::size_t replies_count)
: rx_buff_{rx_buff}, replies_count_{replies_count} {}
using Iterator = typename to_iterator<DynamicBuffer>::iterator_t;
using ResultHandler = result_handler_t<Iterator, Policy>;
using ResultVisitor = result_visitor_t<Iterator, Policy>;
using positive_result_t = parse_result_mapper_t<Iterator, Policy>;

positive_result_t op(boost::system::error_code &error_code,
std::size_t /*bytes_transferred*/) {

ResultHandler result_handler(replies_count_);
auto result = ResultVisitor::construct();

if (!error_code) {
auto const_buff = rx_buff_.data();
auto begin = Iterator::begin(const_buff);
auto end = Iterator::end(const_buff);

result_handler.init();
ResultVisitor visitor(error_code, replies_count_, result);
visitor.init();

bool continue_parsing;
std::size_t consumed{0};
do {
using boost::get;
begin += consumed;
auto parse_result =
Protocol::parse<Iterator, Policy>(begin, end);
auto *parse_error = boost::get<protocol_error_t>(&parse_result);
if (parse_error) {
error_code = parse_error->code;
continue_parsing = false;
} else {
auto &positive_result =
get<positive_result_t>(parse_result);
begin += positive_result.consumed;
continue_parsing =
result_handler.on_result(std::move(positive_result));
}
} while (continue_parsing);
consumed = boost::apply_visitor(visitor, parse_result);
} while (consumed);

/* check again, as protocol error might be met */
if (!error_code) {
result_handler.complete_result();
visitor.complete_result();
}
}
return result_handler.result;
return result;
}
};

Expand Down
22 changes: 11 additions & 11 deletions include/bredis/impl/common.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ template <typename Iterator> class MatchResult {
}
};

class command_serializer_visitor : public boost::static_visitor<std::string> {
class command_serializer_visitor : public boost::static_visitor<void> {
private:
std::ostream &out_;

public:
std::string operator()(const single_command_t &value) const {
std::stringstream out;
out.imbue(std::locale::classic());
Protocol::serialize(out, value);
return out.str();
command_serializer_visitor(std::ostream &out) : out_{out} {}
void operator()(const single_command_t &value) const {
out_.imbue(std::locale::classic());
Protocol::serialize(out_, value);
}

std::string operator()(const command_container_t &value) const {
std::stringstream out;
out.imbue(std::locale::classic());
void operator()(const command_container_t &value) const {
out_.imbue(std::locale::classic());
for (const auto &cmd : value) {
Protocol::serialize(out, cmd);
Protocol::serialize(out_, cmd);
}
return out.str();
}
};

Expand Down
10 changes: 5 additions & 5 deletions include/bredis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ Connection<NextLayer>::async_write(DynamicBuffer &tx_buff,
typename asio::handler_type<WriteCallback, Signature>::type;

std::ostream os(&tx_buff);
auto string = boost::apply_visitor(command_serializer_visitor(), command);
os.write(string.c_str(), string.size());
boost::apply_visitor(command_serializer_visitor(os), command);

real_handler_t handler(std::forward<WriteCallback>(write_callback));
return async_write(stream_, tx_buff, handler);
Expand Down Expand Up @@ -72,9 +71,10 @@ template <typename NextLayer>
void Connection<NextLayer>::write(const command_wrapper_t &command,
boost::system::error_code &ec) {
namespace asio = boost::asio;
auto str = boost::apply_visitor(command_serializer_visitor(), command);
auto const output_buf = asio::buffer(str.c_str(), str.size());
asio::write(stream_, output_buf, ec);
asio::streambuf tx_buff;
std::ostream os(&tx_buff);
boost::apply_visitor(command_serializer_visitor(os), command);
asio::write(stream_, tx_buff, ec);
}

template <typename NextLayer>
Expand Down
5 changes: 3 additions & 2 deletions t/11-multi-ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ TEST_CASE("ping", "[connection]") {

std::chrono::nanoseconds sleep_delay(1);

auto count = 1000;
size_t count = 1000;
r::single_command_t ping_cmd("ping");
r::command_container_t ping_cmds_container;
for (auto i = 0; i < count; ++i) {
ping_cmds_container.reserve(count);
for (size_t i = 0; i < count; ++i) {
ping_cmds_container.push_back(ping_cmd);
}
r::command_wrapper_t cmd(ping_cmds_container);
Expand Down