Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc message: remove dsn_message_t #168

Merged
merged 1 commit into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 70 files
+0 −1 include/dsn/c/api_common.h
+9 −217 include/dsn/c/api_layer1.h
+8 −5 include/dsn/c/api_task.h
+7 −6 include/dsn/cpp/message_utils.h
+16 −15 include/dsn/cpp/rpc_holder.h
+9 −9 include/dsn/cpp/rpc_stream.h
+5 −5 include/dsn/cpp/serialization.h
+18 −19 include/dsn/cpp/serverlet.h
+1 −1 include/dsn/cpp/service_app.h
+3 −3 include/dsn/dist/replication/mutation_log_tool.h
+3 −3 include/dsn/dist/replication/replication_app_base.h
+10 −10 include/dsn/dist/replication/replication_ddl_client.h
+1 −1 include/dsn/dist/replication/replication_service_app.h
+9 −9 include/dsn/dist/replication/storage_serverlet.h
+8 −8 include/dsn/tool-api/async_calls.h
+1 −1 include/dsn/tool-api/command_manager.h
+1 −1 include/dsn/tool-api/global_config.h
+3 −3 include/dsn/tool-api/http_server.h
+30 −12 include/dsn/tool-api/rpc_message.h
+2 −0 include/dsn/tool-api/task.h
+11 −96 include/dsn/tool-api/task_spec.h
+131 −0 include/dsn/tool-api/threadpool_spec.h
+1 −1 src/apps/skv/simple_kv.server.h
+5 −5 src/core/core/command_manager.cpp
+5 −5 src/core/core/message_utils.cpp
+4 −4 src/core/core/partition_resolver_simple.cpp
+3 −3 src/core/core/partition_resolver_simple.h
+4 −4 src/core/core/rpc_engine.cpp
+20 −150 src/core/core/rpc_message.cpp
+5 −5 src/core/core/service_api_c.cpp
+1 −1 src/core/tests/async_call.cpp
+4 −4 src/core/tests/message_utils_test.cpp
+6 −3 src/core/tests/netprovider.cpp
+40 −37 src/core/tests/rpc.cpp
+4 −3 src/core/tests/rpc_holder_test.cpp
+1 −66 src/core/tests/rpc_message.cpp
+1 −1 src/core/tests/test_utils.h
+5 −6 src/core/tools/http/http_server.cpp
+5 −4 src/dist/replication/ddl_lib/replication_ddl_client.cpp
+15 −13 src/dist/replication/lib/mutation.cpp
+10 −11 src/dist/replication/lib/mutation.h
+4 −4 src/dist/replication/lib/replica.cpp
+9 −9 src/dist/replication/lib/replica.h
+10 −10 src/dist/replication/lib/replica_2pc.cpp
+8 −8 src/dist/replication/lib/replica_config.cpp
+4 −4 src/dist/replication/lib/replica_learn.cpp
+35 −31 src/dist/replication/lib/replica_restore.cpp
+22 −19 src/dist/replication/lib/replica_stub.cpp
+6 −6 src/dist/replication/lib/replica_stub.h
+9 −8 src/dist/replication/lib/replication_app_base.cpp
+1 −1 src/dist/replication/lib/replication_service_app.cpp
+13 −12 src/dist/replication/meta_server/meta_backup_service.cpp
+5 −5 src/dist/replication/meta_server/meta_backup_service.h
+1 −1 src/dist/replication/meta_server/meta_data.cpp
+2 −2 src/dist/replication/meta_server/meta_data.h
+32 −34 src/dist/replication/meta_server/meta_service.cpp
+28 −25 src/dist/replication/meta_server/meta_service.h
+15 −15 src/dist/replication/meta_server/server_state.cpp
+10 −10 src/dist/replication/meta_server/server_state.h
+8 −7 src/dist/replication/meta_server/server_state_restore.cpp
+4 −4 src/dist/replication/test/meta_test/unit_test/backup_test.cpp
+2 −2 src/dist/replication/test/meta_test/unit_test/data_definition_test.cpp
+14 −14 src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h
+8 −8 src/dist/replication/test/meta_test/unit_test/server_state_test.cpp
+33 −33 src/dist/replication/test/meta_test/unit_test/simple_lb_cure_test.cpp
+12 −11 src/dist/replication/test/meta_test/unit_test/update_configuration_test.cpp
+1 −1 src/dist/replication/test/simple_kv/client.cpp
+1 −1 src/dist/replication/test/simple_kv/simple_kv.server.h
+5 −5 src/dist/replication/tool_lib/mutation_log_tool.cpp
+6 −6 src/tests/dsn/failure_detector.cpp
1 change: 0 additions & 1 deletion src/base/pegasus_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,3 @@ inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s

} // namespace utils
} // namespace pegasus

24 changes: 12 additions & 12 deletions src/client_lib/pegasus_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void pegasus_client_impl::async_set(const std::string &hash_key,

// wrap the user defined callback function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -199,7 +199,7 @@ void pegasus_client_impl::async_multi_set(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(tmp_key);
// wrap the user-defined-callback-function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -261,7 +261,7 @@ void pegasus_client_impl::async_get(const std::string &hash_key,
pegasus_generate_key(req, hash_key, sort_key);
auto partition_hash = pegasus_key_hash(req);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -352,7 +352,7 @@ void pegasus_client_impl::async_multi_get(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -453,7 +453,7 @@ void pegasus_client_impl::async_multi_get(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -533,7 +533,7 @@ void pegasus_client_impl::async_multi_get_sortkeys(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -648,7 +648,7 @@ void pegasus_client_impl::async_del(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(req);

auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -730,7 +730,7 @@ void pegasus_client_impl::async_multi_del(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(tmp_key);

auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -799,7 +799,7 @@ void pegasus_client_impl::async_incr(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(req.key);

auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -904,7 +904,7 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -1021,7 +1021,7 @@ void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
Expand Down Expand Up @@ -1192,7 +1192,7 @@ void pegasus_client_impl::async_get_unordered_scanners(
}

auto new_callback = [ user_callback = std::move(callback), max_split_count, options, this ](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
std::vector<pegasus_scanner *> scanners;
configuration_query_by_index_response response;
Expand Down
2 changes: 1 addition & 1 deletion src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class pegasus_client_impl : public pegasus_client
void _async_next_internal();
void _start_scan();
void _next_batch();
void _on_scan_response(::dsn::error_code, dsn_message_t, dsn_message_t);
void _on_scan_response(::dsn::error_code, dsn::message_ex *, dsn::message_ex *);
void _split_reset();

private:
Expand Down
25 changes: 13 additions & 12 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
dassert(!_rpc_started, "");
_rpc_started = true;
_client->scan(req,
[this](::dsn::error_code err, dsn_message_t req, dsn_message_t resp) mutable {
_on_scan_response(err, req, resp);
},
[this](::dsn::error_code err,
dsn::message_ex *req,
dsn::message_ex *resp) mutable { _on_scan_response(err, req, resp); },
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
Expand Down Expand Up @@ -200,18 +200,19 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()

dassert(!_rpc_started, "");
_rpc_started = true;
_client->get_scanner(req,
[this](::dsn::error_code err,
dsn_message_t req,
dsn_message_t resp) mutable { _on_scan_response(err, req, resp); },
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
_client->get_scanner(
req,
[this](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable {
_on_scan_response(err, req, resp);
},
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
}

void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_code err,
dsn_message_t req,
dsn_message_t resp)
dsn::message_ex *req,
dsn::message_ex *resp)
{
dassert(_rpc_started, "");
_rpc_started = false;
Expand Down
4 changes: 3 additions & 1 deletion src/include/pegasus/error_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ PEGASUS_ERR_CODE(PERR_INVALID_VALUE, -203, "value can't be empty");
PEGASUS_ERR_CODE(PERR_INVALID_PAR_COUNT, -204, "partition count must be a power of 2");
PEGASUS_ERR_CODE(PERR_INVALID_REP_COUNT, -205, "replication count must be 3");
PEGASUS_ERR_CODE(PERR_INVALID_SPLIT_COUNT, -206, "split count must be greater than 0");
PEGASUS_ERR_CODE(PERR_GEO_DECODE_VALUE_ERROR, -207, "decode latitude and longitude from value error");
PEGASUS_ERR_CODE(PERR_GEO_DECODE_VALUE_ERROR,
-207,
"decode latitude and longitude from value error");
PEGASUS_ERR_CODE(PERR_GEO_INVALID_LATLNG_ERROR, -208, "latitude or longitude is invalid");

// SERVER ERROR
Expand Down
2 changes: 1 addition & 1 deletion src/include/rrdb/rrdb.server.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class rrdb_service : public replication::replication_app_base,
public:
rrdb_service(replication::replica *r) : replication::replication_app_base(r) {}
virtual ~rrdb_service() {}
virtual int on_request(dsn_message_t request) override
virtual int on_request(dsn::message_ex *request) override
{
handle_request(request);
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/redis_protocol/proxy/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class proxy_app : public ::dsn::service_app
return ::dsn::ERR_INVALID_PARAMETERS;
}

proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) {
proxy_session::factory f = [](proxy_stub *p, dsn::message_ex *m) {
return std::make_shared<redis_parser>(p, m);
};
_proxy = dsn::make_unique<proxy_stub>(
Expand Down
23 changes: 10 additions & 13 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,
open_service();
}

void proxy_stub::on_rpc_request(dsn_message_t request)
void proxy_stub::on_rpc_request(dsn::message_ex *request)
{
::dsn::rpc_address source = dsn_msg_from_address(request);
::dsn::rpc_address source = request->header->from_address;
std::shared_ptr<proxy_session> ps;
{
::dsn::service::zauto_read_lock l(_lock);
Expand All @@ -68,9 +68,9 @@ void proxy_stub::on_rpc_request(dsn_message_t request)
ps->on_recv_request(request);
}

void proxy_stub::on_recv_remove_session_request(dsn_message_t request)
void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request)
{
::dsn::rpc_address source = dsn_msg_from_address(request);
::dsn::rpc_address source = request->header->from_address;
std::shared_ptr<proxy_session> ps = remove_session(source);
if (ps != nullptr) {
ps->on_remove_session();
Expand All @@ -91,25 +91,25 @@ std::shared_ptr<proxy_session> proxy_stub::remove_session(dsn::rpc_address remot
return result;
}

proxy_session::proxy_session(proxy_stub *op, dsn_message_t first_msg)
proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg)
: stub(op), is_session_reset(false), backup_one_request(first_msg)
{
dassert(first_msg != nullptr, "null msg when create session");
dsn_msg_add_ref(backup_one_request);
backup_one_request->add_ref();

remote_address = dsn_msg_from_address(backup_one_request);
remote_address = backup_one_request->header->from_address;
dassert(remote_address.type() == HOST_TYPE_IPV4,
"invalid rpc_address type, type = %d",
(int)remote_address.type());
}

proxy_session::~proxy_session()
{
dsn_msg_release_ref(backup_one_request);
backup_one_request->release_ref();
ddebug("proxy session %s destroyed", remote_address.to_string());
}

void proxy_session::on_recv_request(dsn_message_t msg)
void proxy_session::on_recv_request(dsn::message_ex *msg)
{
// NOTICE:
// 1. in the implementation of "parse", the msg may add_ref & release_ref.
Expand All @@ -131,9 +131,6 @@ void proxy_session::on_recv_request(dsn_message_t msg)

void proxy_session::on_remove_session() { is_session_reset.store(true); }

dsn_message_t proxy_session::create_response()
{
return dsn_msg_create_response(backup_one_request);
}
dsn::message_ex *proxy_session::create_response() { return backup_one_request->create_response(); }
}
} // namespace
18 changes: 9 additions & 9 deletions src/redis_protocol/proxy_lib/proxy_layer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class proxy_stub;
class proxy_session : public std::enable_shared_from_this<proxy_session>
{
public:
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn_message_t first_msg)>
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn::message_ex *first_msg)>
factory;
proxy_session(proxy_stub *p, dsn_message_t first_msg);
proxy_session(proxy_stub *p, dsn::message_ex *first_msg);
virtual ~proxy_session();

// on_recv_request & on_remove_session are called by proxy_stub when messages are got from
Expand All @@ -35,21 +35,21 @@ class proxy_session : public std::enable_shared_from_this<proxy_session>
//
// however, during the running of on_recv_request, an "on_remove_session" may be called,
// the proxy_session and its derived class may need to do some synchronization on this.
void on_recv_request(dsn_message_t msg);
void on_recv_request(dsn::message_ex *msg);
void on_remove_session();

protected:
// return if parse ok
virtual bool parse(dsn_message_t msg) = 0;
dsn_message_t create_response();
virtual bool parse(dsn::message_ex *msg) = 0;
dsn::message_ex *create_response();

protected:
proxy_stub *stub;
std::atomic_bool is_session_reset;

// when get message from raw parser, request & response of "dsn_message_t" are not in couple.
// when get message from raw parser, request & response of "dsn::message_ex*" are not in couple.
// we need to backup one request to create a response struct.
dsn_message_t backup_one_request;
dsn::message_ex *backup_one_request;
// the client address for which this session served
dsn::rpc_address remote_address;
};
Expand Down Expand Up @@ -81,8 +81,8 @@ class proxy_stub : public ::dsn::serverlet<proxy_stub>
std::shared_ptr<proxy_session> remove_session(dsn::rpc_address remote_address);

private:
void on_rpc_request(dsn_message_t request);
void on_recv_remove_session_request(dsn_message_t);
void on_rpc_request(dsn::message_ex *request);
void on_recv_remove_session_request(dsn::message_ex *);

::dsn::service::zrwlock_nr _lock;
std::unordered_map<::dsn::rpc_address, std::shared_ptr<proxy_session>> _sessions;
Expand Down
Loading