Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for homestore based replication to run correctly #58

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

class NuRaftMesgConan(ConanFile):
name = "nuraft_mesg"
version = "2.0.5"
version = "2.1.1"

homepage = "https://github.com/eBay/nuraft_mesg"
description = "A gRPC service for NuRAFT"
Expand Down Expand Up @@ -58,13 +58,13 @@ def build_requirements(self):
self.build_requires("jungle/cci.20221201")

def requirements(self):
self.requires("sisl/[~=10, include_prerelease=True]@oss/master")
self.requires("sisl/[~=11, include_prerelease=True]@oss/master")
self.requires("nuraft/2.3.0")

self.requires("boost/1.82.0")
self.requires("flatbuffers/23.5.26")
self.requires("openssl/3.1.1")
self.requires("lz4/1.9.4")
self.requires("openssl/3.1.3")
#self.requires("lz4/1.9.4")

def validate(self):
if self.info.settings.compiler.cppstd:
Expand Down
3 changes: 1 addition & 2 deletions include/nuraft_mesg/nuraft_mesg.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ namespace nuraft_mesg {
class mesg_state_mgr;

// called by the server after it receives the request
using data_service_request_handler_t =
std::function< void(sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;
using data_service_request_handler_t = std::function< void(boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) >;

class MessagingApplication {
public:
Expand Down
6 changes: 3 additions & 3 deletions src/lib/common_lib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace nuraft_mesg {
[[maybe_unused]] static void serialize_to_byte_buffer(grpc::ByteBuffer& cli_byte_buf, io_blob_list_t const& cli_buf) {
folly::small_vector< grpc::Slice, 4 > slices;
for (auto const& blob : cli_buf) {
slices.emplace_back(blob.bytes, blob.size, grpc::Slice::STATIC_SLICE);
slices.emplace_back(blob.cbytes(), blob.size(), grpc::Slice::STATIC_SLICE);
}
cli_byte_buf.Clear();
grpc::ByteBuffer tmp(slices.data(), cli_buf.size());
Expand All @@ -29,8 +29,8 @@ namespace nuraft_mesg {
grpc::Slice slice;
auto status = cli_byte_buf.TrySingleSlice(&slice);
if (!status.ok()) { return status; }
cli_buf.bytes = const_cast< uint8_t* >(slice.begin());
cli_buf.size = slice.size();
cli_buf.set_bytes(slice.begin());
cli_buf.set_size(slice.size());
return status;
}

Expand Down
8 changes: 1 addition & 7 deletions src/lib/data_service_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ bool data_service_grpc::bind(std::string const& request_name, group_id_t const&
// This is an async call, hence the "return false". The user should invoke rpc_data->send_response to finish the
// call
auto generic_handler_cb = [request_cb](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
sisl::io_blob svr_buf;
if (auto status = deserialize_from_byte_buffer(rpc_data->request(), svr_buf); !status.ok()) {
LOGE(, "ByteBuffer DumpToSingleSlice failed, {}", status.error_message());
rpc_data->set_status(status);
return true; // respond immediately
}
request_cb(svr_buf, rpc_data);
request_cb(rpc_data);
return false;
};
auto lk = std::unique_lock< data_lock_type >(_req_lock);
Expand Down
5 changes: 3 additions & 2 deletions src/lib/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ msg_service::msg_service(std::shared_ptr< ManagerImpl > const& manager, group_id
_data_service_enabled(enable_data_service),
_default_group_type(default_group_type),
_manager(manager),
_service_address(service_address) {}
_service_address(service_address),
handler_thread_pool_{4} {}

msg_service::~msg_service() = default;
msg_service::~msg_service() { handler_thread_pool_.join(); }

void msg_service::associate(::sisl::GrpcServer* server) {
RELEASE_ASSERT(server, "NULL server!");
Expand Down
4 changes: 4 additions & 0 deletions src/lib/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <sisl/grpc/rpc_server.hpp>
#include <sisl/metrics/metrics.hpp>

#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>

#include "grpc_server.hpp"
#include "manager_impl.hpp"
#include "data_service_grpc.hpp"
Expand Down Expand Up @@ -50,6 +53,7 @@ class msg_service : public nuraft::raft_server_handler, public std::enable_share
protected:
folly::ConcurrentHashMap< group_id_t, grpc_server_wrapper > _raft_servers;
peer_id_t const _service_address;
boost::asio::thread_pool handler_thread_pool_;

public:
// Each serialization implementation must provide this static
Expand Down
38 changes: 27 additions & 11 deletions src/proto/proto_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static RCResponse* fromRCResponse(nuraft::resp_msg& rcmsg) {

class proto_service : public msg_service {
::grpc::Status step(nuraft::raft_server& server, const RaftMessage& request, RaftMessage& reply);
std::mutex _raft_servers_mutex;

public:
using msg_service::msg_service;
Expand Down Expand Up @@ -127,17 +128,32 @@ bool proto_service::raftStep(const sisl::AsyncRpcDataPtr< Messaging, RaftGroupMs

// Setup our response and process the request.
response.set_group_id(group_id);
if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) {
if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1);
try {
rpc_data->set_status(step(*it->second.m_server->raft_server(), request.msg(), *response.mutable_msg()));
return true;
} catch (std::runtime_error& rte) { LOGE("Caught exception during step(): {}", rte.what()); }
} else {
LOGD("Missing [group={}]", group_id);
}
rpc_data->set_status(::grpc::Status(::grpc::NOT_FOUND, fmt::format("Missing RAFT group {}", group_id)));
return true;

boost::asio::post(handler_thread_pool_, [this, rpc_data]() {
auto gid = boost::uuids::string_generator()(rpc_data->response().group_id());
auto& request = rpc_data->request();
auto& response = rpc_data->response();
auto const& group_id = request.group_id();
{
std::lock_guard< std::mutex > lock(_raft_servers_mutex);
if (auto it = _raft_servers.find(gid); _raft_servers.end() != it) {
if (it->second.m_metrics) COUNTER_INCREMENT(*it->second.m_metrics, group_steps, 1);
try {
rpc_data->set_status(
step(*it->second.m_server->raft_server(), request.msg(), *response.mutable_msg()));
} catch (std::runtime_error& rte) {
LOGE("Caught exception during step(): {}", rte.what());
rpc_data->set_status(
::grpc::Status(::grpc::NOT_FOUND, fmt::format("Missing RAFT group {}", group_id)));
}
} else {
LOGD("Missing [group={}]", group_id);
rpc_data->set_status(::grpc::Status(::grpc::NOT_FOUND, fmt::format("Missing RAFT group {}", group_id)));
}
}
rpc_data->send_response();
});
return false;
}

std::shared_ptr< msg_service > msg_service::create(std::shared_ptr< ManagerImpl > const& manager,
Expand Down
19 changes: 10 additions & 9 deletions src/tests/test_state_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,23 @@ test_state_mgr::data_service_request_unidirectional(nuraft_mesg::destination_t c
bool test_state_mgr::register_data_service_apis(nuraft_mesg::Manager* messaging) {
return messaging->bind_data_service_request(
SEND_DATA, _group_id,
[this](sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
[this](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
rpc_data->set_comp_cb([this](boost::intrusive_ptr< sisl::GenericRpcData >&) { server_counter++; });
verify_data(incoming_buf);
m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{incoming_buf}, rpc_data);
verify_data(rpc_data->request_blob());
m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{rpc_data->request_blob()},
rpc_data);
}) &&
messaging->bind_data_service_request(
REQUEST_DATA, _group_id,
[this](sisl::io_blob const& incoming_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
REQUEST_DATA, _group_id, [this](boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) {
rpc_data->set_comp_cb([this](boost::intrusive_ptr< sisl::GenericRpcData >&) { server_counter++; });
m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{incoming_buf}, rpc_data);
m_repl_svc_ctx->send_data_service_response(nuraft_mesg::io_blob_list_t{rpc_data->request_blob()},
rpc_data);
});
}

void test_state_mgr::verify_data(sisl::io_blob const& buf) {
for (size_t read_sz{0}; read_sz < buf.size; read_sz += sizeof(uint32_t)) {
uint32_t const data{*reinterpret_cast< uint32_t* >(buf.bytes + read_sz)};
for (size_t read_sz{0}; read_sz < buf.size(); read_sz += sizeof(uint32_t)) {
uint32_t const data{*reinterpret_cast< uint32_t* >(const_cast< uint8_t* >(buf.cbytes()) + read_sz)};
EXPECT_EQ(data, data_vec[read_sz / sizeof(uint32_t)]);
}
}
Expand All @@ -217,7 +218,7 @@ void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf) {
static int const data_size{8};
for (int i = 0; i < data_size; i++) {
cli_buf.emplace_back(sizeof(uint32_t));
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(cli_buf[i].bytes)};
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(cli_buf[i].bytes())};
data_vec.emplace_back(get_random_num());
*write_buf = data_vec.back();
}
Expand Down
Loading