Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into fd_test
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong committed May 23, 2019
2 parents 4198108 + 0ebf109 commit 1f8da82
Show file tree
Hide file tree
Showing 113 changed files with 5,155 additions and 918 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.kdev4/
.zk_install/
.idea
.vscode/

gcov_report/
bin/Linux/thrift
Expand Down
5 changes: 3 additions & 2 deletions bin/dsn.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function(dsn_add_project)
set(MY_PROJ_SRC "")
endif()
set(TEMP_SRC "")
# We restrict the file suffix to keep our codes consitent.
# We restrict the file suffix to keep our codes consistent.
file(${MY_SRC_SEARCH_MODE} TEMP_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/*.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/*.c"
Expand Down Expand Up @@ -294,7 +294,8 @@ function(dsn_setup_system_libs)
set(DSN_SYSTEM_LIBS ${DSN_SYSTEM_LIBS} ${OPENSSL_CRYPTO_LIBRARY})

if(ENABLE_GPERF)
set(DSN_SYSTEM_LIBS ${DSN_SYSTEM_LIBS} tcmalloc)
set(DSN_SYSTEM_LIBS ${DSN_SYSTEM_LIBS} tcmalloc_and_profiler)
add_definitions(-DDSN_ENABLE_GPERF)
endif()

set(DSN_SYSTEM_LIBS
Expand Down
9 changes: 6 additions & 3 deletions include/dsn/cpp/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <cctype>

#include <rapidjson/ostreamwrapper.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/writer.h>
#include <rapidjson/document.h>

Expand Down Expand Up @@ -200,14 +201,16 @@ namespace json {

typedef rapidjson::GenericValue<rapidjson::UTF8<>> JsonObject;
typedef rapidjson::Writer<rapidjson::OStreamWrapper> JsonWriter;
typedef rapidjson::PrettyWriter<rapidjson::OStreamWrapper> PrettyJsonWriter;

template <typename>
class json_forwarder;

// json serialization for string types.
// please notice when we call rapidjson::Writer::String, with 3rd parameter with "true",
// which means that we will COPY string to writer
inline void json_encode(JsonWriter &out, const std::string &str)
template <typename Writer>
void json_encode(Writer &out, const std::string &str)
{
out.String(str.c_str(), str.length(), true);
}
Expand Down Expand Up @@ -599,5 +602,5 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
expire_second,
create_second,
drop_second)
}
}
} // namespace json
} // namespace dsn
1 change: 1 addition & 0 deletions include/dsn/cpp/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ struct base : environment

// Await for all running tasks to complete.
void wait_all() { __conf.tracker->wait_outstanding_tasks(); }
void cancel_all() { __conf.tracker->cancel_outstanding_tasks(); }

/// === Pipeline Declaration === ///
/// Declaration of pipeline is not thread-safe.
Expand Down
83 changes: 58 additions & 25 deletions include/dsn/cpp/rpc_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@
#include <dsn/tool-api/task_tracker.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/partition_resolver.h>

namespace dsn {

using literals::chrono_literals::operator"" _ms;

//
// rpc_holder is mainly designed for RAII of dsn::message_ex*.
// rpc_holder is mainly designed for RAII of message_ex*.
// Since the request message will be automatically released after the rpc ends,
// it will become inaccessible when you use it in an async call (probably via tasking::enqueue).
// So in rpc_holder we hold another reference of the message, preventing it to be deleted.
Expand Down Expand Up @@ -78,15 +79,15 @@ class rpc_holder
using response_type = TResponse;

public:
explicit rpc_holder(dsn::message_ex *req = nullptr)
explicit rpc_holder(message_ex *req = nullptr)
{
if (req != nullptr) {
_i = std::make_shared<internal>(req);
}
}

rpc_holder(std::unique_ptr<TRequest> req,
dsn::task_code code,
task_code code,
std::chrono::milliseconds timeout = 0_ms,
uint64_t partition_hash = 0)
: _i(new internal(req, code, timeout, partition_hash))
Expand Down Expand Up @@ -121,7 +122,7 @@ class rpc_holder
return _i->thrift_response;
}

dsn::message_ex *dsn_request() const
message_ex *dsn_request() const
{
dassert(_i, "rpc_holder is uninitialized");
return _i->dsn_request;
Expand All @@ -130,21 +131,21 @@ class rpc_holder
// the remote address where reveice request from and send response to.
rpc_address remote_address() const { return dsn_request()->header->from_address; }

// TCallback = void(dsn::error_code)
// TCallback = void(error_code)
// NOTE that the `error_code` is not the error carried by response. Users should
// check the responded error themselves.
template <typename TCallback>
task_ptr call(::dsn::rpc_address server,
dsn::task_tracker *tracker,
task_ptr call(const rpc_address &server,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
// ensures that TCallback receives exactly one argument, which must be a dsn::error_code.
// ensures that TCallback receives exactly one argument, which must be a error_code.
static_assert(function_traits<TCallback>::arity == 1,
"TCallback must receive exactly one argument");
static_assert(std::is_same<typename function_traits<TCallback>::template arg_t<0>,
dsn::error_code>::value,
"the first argument of TCallback must be dsn::error_code");
static_assert(
std::is_same<typename function_traits<TCallback>::template arg_t<0>, error_code>::value,
"the first argument of TCallback must be error_code");

if (dsn_unlikely(_mail_box != nullptr)) {
_mail_box->emplace_back(*this);
Expand All @@ -155,9 +156,9 @@ class rpc_holder
dsn_request(),
tracker,
[ cb_fwd = std::forward<TCallback>(callback),
rpc = *this ](error_code err, dsn::message_ex * req, dsn::message_ex * resp) mutable {
rpc = *this ](error_code err, message_ex * req, message_ex * resp) mutable {
if (err == ERR_OK) {
::dsn::unmarshall(resp, rpc.response());
unmarshall(resp, rpc.response());
}
cb_fwd(err);
},
Expand All @@ -166,10 +167,42 @@ class rpc_holder
return t;
}

template <typename TCallback>
task_ptr call(replication::partition_resolver_ptr &resolver,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
static_assert(function_traits<TCallback>::arity == 1,
"TCallback must receive exactly one argument");
static_assert(
std::is_same<typename function_traits<TCallback>::template arg_t<0>, error_code>::value,
"the first argument of TCallback must be error_code");

if (dsn_unlikely(_mail_box != nullptr)) {
_mail_box->emplace_back(*this);
return nullptr;
}

rpc_response_task_ptr t = rpc::create_rpc_response_task(
dsn_request(),
tracker,
[ cb_fwd = std::forward<TCallback>(callback),
rpc = *this ](error_code err, message_ex * req, message_ex * resp) mutable {
if (err == ERR_OK) {
unmarshall(resp, rpc.response());
}
cb_fwd(err);
},
reply_thread_hash);
resolver->call_task(t);
return t;
}

// Returns an rpc_holder that will reply the request after its lifetime ends.
// By default rpc_holder never replies.
// SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
static inline rpc_holder auto_reply(dsn::message_ex *req)
static inline rpc_holder auto_reply(message_ex *req)
{
rpc_holder rpc(req);
rpc._i->auto_reply = true;
Expand Down Expand Up @@ -206,28 +239,28 @@ class rpc_holder

struct internal
{
explicit internal(dsn::message_ex *req)
explicit internal(message_ex *req)
: dsn_request(req), thrift_request(make_unique<TRequest>()), auto_reply(false)
{
// we must hold one reference for the request, or rdsn will delete it after
// the rpc call ends.
dsn_request->add_ref();
dsn::unmarshall(req, *thrift_request);
unmarshall(req, *thrift_request);
}

internal(std::unique_ptr<TRequest> &req,
dsn::task_code code,
task_code code,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
: thrift_request(std::move(req)), auto_reply(false)
{
dassert(thrift_request != nullptr, "req should not be null");

// leave thread_hash to 0
dsn_request = dsn::message_ex::create_request(
dsn_request = message_ex::create_request(
code, static_cast<int>(timeout.count()), 0, partition_hash);
dsn_request->add_ref();
dsn::marshall(dsn_request, *thrift_request);
marshall(dsn_request, *thrift_request);
}

void reply()
Expand All @@ -240,8 +273,8 @@ class rpc_holder
return;
}

dsn::message_ex *dsn_response = dsn_request->create_response();
::dsn::marshall(dsn_response, thrift_response);
message_ex *dsn_response = dsn_request->create_response();
marshall(dsn_response, thrift_response);
dsn_rpc_reply(dsn_response);
}

Expand All @@ -253,7 +286,7 @@ class rpc_holder
dsn_request->release_ref();
}

dsn::message_ex *dsn_request;
message_ex *dsn_request;
std::unique_ptr<TRequest> thrift_request;
TResponse thrift_response;

Expand Down Expand Up @@ -285,12 +318,12 @@ struct is_rpc_holder<rpc_holder<TRequest, TResponse>> : public std::true_type
namespace rpc {

// call an RPC specified by rpc_holder.
// TCallback = void(dsn::error_code)
// TCallback = void(error_code)

template <typename TCallback, typename TRpcHolder>
task_ptr call(::dsn::rpc_address server,
task_ptr call(rpc_address server,
TRpcHolder rpc,
dsn::task_tracker *tracker,
task_tracker *tracker,
TCallback &&callback,
int reply_thread_hash = 0)
{
Expand Down
12 changes: 0 additions & 12 deletions include/dsn/cpp/rpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,11 @@ typedef ::dsn::ref_ptr<rpc_read_stream> rpc_read_stream_ptr;
class rpc_write_stream : public binary_writer
{
public:
// for response
rpc_write_stream(message_ex *msg)
: _msg(msg), _last_write_next_committed(true), _last_write_next_total_size(0)
{
}

// for request
rpc_write_stream(task_code code,
int timeout_ms = 0,
int thread_hash = 0,
uint64_t partition_hash = 0)
: _msg(message_ex::create_request(code, timeout_ms, thread_hash, partition_hash)),
_last_write_next_committed(true),
_last_write_next_total_size(0)
{
}

// write buffer for rpc_write_stream is allocated from
// a per-thread pool, and it is expected that
// the per-thread pool cannot allocated two outstanding
Expand Down
14 changes: 12 additions & 2 deletions include/dsn/cpp/serialization_helper/dsn.layer2_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions include/dsn/dist/failure_detector/failure_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
*/
#pragma once

#include <dsn/tool-api/zlocks.h>
#include <dsn/dist/failure_detector/fd.client.h>
#include <dsn/dist/failure_detector/fd.server.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/tool-api/zlocks.h>

namespace dsn {
namespace fd {
Expand Down Expand Up @@ -93,12 +93,15 @@ class failure_detector : public failure_detector_service,
{
public:
failure_detector();
virtual ~failure_detector() {}
virtual ~failure_detector() { unregister_ctrl_commands(); }

virtual void on_ping(const beacon_msg &beacon, ::dsn::rpc_replier<beacon_ack> &reply);

virtual void end_ping(::dsn::error_code err, const beacon_ack &ack, void *context);

virtual void register_ctrl_commands();
virtual void unregister_ctrl_commands();

public:
error_code start(uint32_t check_interval_seconds,
uint32_t beacon_interval_seconds,
Expand Down Expand Up @@ -136,6 +139,10 @@ class failure_detector : public failure_detector_service,

bool remove_from_allow_list(::dsn::rpc_address node);

void set_allow_list(const std::vector<std::string> &replica_addrs);

std::string get_allow_list(const std::vector<std::string> &args) const;

int worker_count() const { return static_cast<int>(_workers.size()); }

int master_count() const { return static_cast<int>(_masters.size()); }
Expand Down Expand Up @@ -214,6 +221,8 @@ class failure_detector : public failure_detector_service,

perf_counter_wrapper _recent_beacon_fail_count;

dsn_handle_t _get_allow_list = nullptr;

protected:
mutable zlock _lock;
dsn::task_tracker _tracker;
Expand Down
Loading

0 comments on commit 1f8da82

Please sign in to comment.