From 1307a6556e0b9e002e07f512ef81546a3a6ef024 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Fri, 12 Jan 2018 16:26:43 +0800 Subject: [PATCH] clientlet: remove class clientlet, add new class thread_access_checker Conflicts: include/dsn/cpp/serverlet.h include/dsn/service_api_cpp.h include/dsn/tool/cli/cli.client.h src/core/tests/async_call.cpp src/core/tests/rpc.cpp src/dist/replication/lib/replica.h src/dist/replication/lib/replica_chkpt.cpp src/dist/replication/lib/replica_stub.cpp src/dist/replication/zookeeper/lock_types.h --- include/dsn/cpp/pipeline.h | 2 +- include/dsn/cpp/rpc_holder.h | 1 + include/dsn/cpp/serverlet.h | 4 +- include/dsn/dist/failure_detector/fd.client.h | 4 +- .../failure_detector/fd.code.definition.h | 1 - .../dist/replication/replication_ddl_client.h | 1 + include/dsn/service_api_cpp.h | 1 - .../clientlet.h => tool-api/async_calls.h} | 24 -------- include/dsn/tool-api/thread_access_checker.h | 47 ++++++++++++++++ include/dsn/tool/cli/cli.client.h | 1 + src/apps/echo/echo.app.example.h | 4 +- src/apps/echo/echo.client.h | 3 +- src/apps/nfs/nfs_client.h | 3 +- src/apps/nfs/nfs_server_impl.cpp | 4 +- src/apps/nfs_test/nfs_test.app.example.h | 5 +- src/apps/skv/simple_kv.app.example.h | 4 +- src/apps/skv/simple_kv.client.2.h | 3 +- src/apps/skv/simple_kv.client.perf.h | 8 +-- .../core/{clientlet.cpp => async_calls.cpp} | 21 +------ src/core/core/partition_resolver_simple.cpp | 1 + src/core/core/rpc_engine.cpp | 2 +- src/core/core/thread_access_checker.cpp | 46 ++++++++++++++++ src/core/perf.tests/aio.cpp | 2 + src/core/perf.tests/rpc.cpp | 8 ++- src/core/perf.tests/task_queue.cpp | 2 + src/core/tests/aio.cpp | 4 +- .../tests/{clientlet.cpp => async_call.cpp} | 55 ++++++++++--------- src/core/tests/corrupt_message.cpp | 2 + src/core/tests/rpc.cpp | 3 +- .../failure_detector/failure_detector.cpp | 24 ++++---- .../failure_detector_multimaster.cpp | 2 +- src/dist/replication/lib/mutation_log.cpp | 1 + src/dist/replication/lib/mutation_log.h | 2 +- src/dist/replication/lib/replica.cpp | 4 +- src/dist/replication/lib/replica.h | 4 ++ src/dist/replication/lib/replica_2pc.cpp | 8 +-- src/dist/replication/lib/replica_backup.cpp | 9 +-- src/dist/replication/lib/replica_check.cpp | 6 +- src/dist/replication/lib/replica_chkpt.cpp | 12 ++-- src/dist/replication/lib/replica_config.cpp | 6 +- src/dist/replication/lib/replica_init.cpp | 4 +- src/dist/replication/lib/replica_learn.cpp | 14 ++--- src/dist/replication/lib/replica_stub.cpp | 22 ++++---- .../distributed_lock_service_simple.cpp | 4 +- .../distributed_lock_service_simple.h | 2 +- .../meta_server_failure_detector.cpp | 10 ++-- .../replication/meta_server/meta_service.cpp | 6 +- .../meta_server/meta_state_service_simple.cpp | 1 + .../meta_server/meta_state_service_simple.h | 2 +- .../replication/meta_server/server_state.cpp | 4 +- .../unit_test/meta_service_test_app.h | 1 + src/dist/replication/test/simple_kv/client.h | 2 +- .../test/simple_kv/simple_kv.client.h | 4 +- .../distributed_lock_service_zookeeper.cpp | 5 +- .../distributed_lock_service_zookeeper.h | 4 +- .../replication/zookeeper/lock_struct.cpp | 33 ++++++----- src/dist/replication/zookeeper/lock_struct.h | 5 +- src/dist/replication/zookeeper/lock_types.h | 1 - .../meta_state_service_zookeeper.cpp | 1 + .../replication/zookeeper/zookeeper_session.h | 1 - 60 files changed, 282 insertions(+), 188 deletions(-) rename include/dsn/{cpp/clientlet.h => tool-api/async_calls.h} (94%) create mode 100644 include/dsn/tool-api/thread_access_checker.h rename src/core/core/{clientlet.cpp => async_calls.cpp} (82%) create mode 100644 src/core/core/thread_access_checker.cpp rename src/core/tests/{clientlet.cpp => async_call.cpp} (86%) diff --git a/include/dsn/cpp/pipeline.h b/include/dsn/cpp/pipeline.h index 1ccd03e5b0..0ded13e0e9 100644 --- a/include/dsn/cpp/pipeline.h +++ b/include/dsn/cpp/pipeline.h @@ -28,7 +28,7 @@ #include #include -#include +#include #include #include diff --git a/include/dsn/cpp/rpc_holder.h b/include/dsn/cpp/rpc_holder.h index b7529d7de2..7a2800a086 100644 --- a/include/dsn/cpp/rpc_holder.h +++ b/include/dsn/cpp/rpc_holder.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include diff --git a/include/dsn/cpp/serverlet.h b/include/dsn/cpp/serverlet.h index 8d42b4e08e..f9f649aa4f 100644 --- a/include/dsn/cpp/serverlet.h +++ b/include/dsn/cpp/serverlet.h @@ -25,9 +25,9 @@ */ #pragma once -#include #include #include +#include namespace dsn { /*! @@ -104,7 +104,7 @@ class rpc_replier }; template // where T : serverlet -class serverlet : public virtual clientlet +class serverlet { public: explicit serverlet(const char *nm); diff --git a/include/dsn/dist/failure_detector/fd.client.h b/include/dsn/dist/failure_detector/fd.client.h index 154ea88e0a..6255e58b9d 100644 --- a/include/dsn/dist/failure_detector/fd.client.h +++ b/include/dsn/dist/failure_detector/fd.client.h @@ -37,11 +37,11 @@ #include "fd.code.definition.h" #include #include -#include +#include namespace dsn { namespace fd { -class failure_detector_client : public virtual ::dsn::clientlet +class failure_detector_client { public: failure_detector_client(::dsn::rpc_address server) { _server = server; } diff --git a/include/dsn/dist/failure_detector/fd.code.definition.h b/include/dsn/dist/failure_detector/fd.code.definition.h index 2395563727..982a9ccca8 100644 --- a/include/dsn/dist/failure_detector/fd.code.definition.h +++ b/include/dsn/dist/failure_detector/fd.code.definition.h @@ -34,7 +34,6 @@ */ #pragma once -#include #include namespace dsn { diff --git a/include/dsn/dist/replication/replication_ddl_client.h b/include/dsn/dist/replication/replication_ddl_client.h index 4af5d0dfcd..5d29b5ea6b 100644 --- a/include/dsn/dist/replication/replication_ddl_client.h +++ b/include/dsn/dist/replication/replication_ddl_client.h @@ -39,6 +39,7 @@ #include #include #include +#include namespace dsn { namespace replication { diff --git a/include/dsn/service_api_cpp.h b/include/dsn/service_api_cpp.h index d2406a4ad4..597ae6a8fb 100644 --- a/include/dsn/service_api_cpp.h +++ b/include/dsn/service_api_cpp.h @@ -41,7 +41,6 @@ #include #include #include -#include #include #include #include diff --git a/include/dsn/cpp/clientlet.h b/include/dsn/tool-api/async_calls.h similarity index 94% rename from include/dsn/cpp/clientlet.h rename to include/dsn/tool-api/async_calls.h index 923413eead..a1f0ec8167 100644 --- a/include/dsn/cpp/clientlet.h +++ b/include/dsn/tool-api/async_calls.h @@ -33,30 +33,6 @@ #include namespace dsn { -/* -clientlet is the base class for RPC service and client -there can be multiple clientlet in the system -*/ -class clientlet -{ -public: - clientlet(); - virtual ~clientlet(); - rpc_address primary_address() { return dsn_primary_address(); } - - static uint32_t random32(uint32_t min, uint32_t max) { return dsn_random32(min, max); } - static uint64_t random64(uint64_t min, uint64_t max) { return dsn_random64(min, max); } - static uint64_t now_ns() { return dsn_now_ns(); } - static uint64_t now_us() { return dsn_now_us(); } - static uint64_t now_ms() { return dsn_now_ms(); } - -protected: - void check_hashed_access(); - -private: - int _access_thread_id; - bool _access_thread_id_inited; -}; inline void empty_rpc_handler(error_code, dsn_message_t, dsn_message_t) {} diff --git a/include/dsn/tool-api/thread_access_checker.h b/include/dsn/tool-api/thread_access_checker.h new file mode 100644 index 0000000000..f5414ca494 --- /dev/null +++ b/include/dsn/tool-api/thread_access_checker.h @@ -0,0 +1,47 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#pragma once + +namespace dsn { + +/// +/// a simple class used to check if some code is accessed by only one thread. +/// please refer to @replica.h and @lock_struct.h for a sample usage +/// +class thread_access_checker +{ +public: + thread_access_checker(); + ~thread_access_checker(); + + void only_one_thread_access(); + +private: + // TODO: the implementation is not thread safe. use atomic variable to reimplement this + int _access_thread_id; + bool _access_thread_id_inited; +}; +} diff --git a/include/dsn/tool/cli/cli.client.h b/include/dsn/tool/cli/cli.client.h index 9636546a96..a46860b11b 100644 --- a/include/dsn/tool/cli/cli.client.h +++ b/include/dsn/tool/cli/cli.client.h @@ -36,6 +36,7 @@ #pragma once #include #include +#include #include #include diff --git a/src/apps/echo/echo.app.example.h b/src/apps/echo/echo.app.example.h index e8d451ac65..177ce53c54 100644 --- a/src/apps/echo/echo.app.example.h +++ b/src/apps/echo/echo.app.example.h @@ -63,7 +63,7 @@ class echo_server_app : public ::dsn::service_app }; // client app example -class echo_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class echo_client_app : public ::dsn::service_app { public: echo_client_app(const service_app_info *info) : ::dsn::service_app(info) {} @@ -112,7 +112,7 @@ class echo_client_app : public ::dsn::service_app, public virtual ::dsn::clientl std::unique_ptr _echo_client; }; -class echo_perf_test_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class echo_perf_test_client_app : public ::dsn::service_app { public: echo_perf_test_client_app(const service_app_info *info) : ::dsn::service_app(info) diff --git a/src/apps/echo/echo.client.h b/src/apps/echo/echo.client.h index a984fea5ae..5587b86b44 100644 --- a/src/apps/echo/echo.client.h +++ b/src/apps/echo/echo.client.h @@ -36,10 +36,11 @@ #include "echo.code.definition.h" #include #include +#include namespace dsn { namespace example { -class echo_client : public virtual ::dsn::clientlet +class echo_client { public: echo_client(::dsn::rpc_address server) { _server = server; } diff --git a/src/apps/nfs/nfs_client.h b/src/apps/nfs/nfs_client.h index 93d5d16ece..c7523480f7 100644 --- a/src/apps/nfs/nfs_client.h +++ b/src/apps/nfs/nfs_client.h @@ -34,12 +34,13 @@ */ #pragma once #include +#include #include #include namespace dsn { namespace service { -class nfs_client : public virtual ::dsn::clientlet +class nfs_client { public: nfs_client(::dsn::rpc_address server) { _server = server; } diff --git a/src/apps/nfs/nfs_server_impl.cpp b/src/apps/nfs/nfs_server_impl.cpp index a13721acc5..2f1817999c 100644 --- a/src/apps/nfs/nfs_server_impl.cpp +++ b/src/apps/nfs/nfs_server_impl.cpp @@ -32,10 +32,12 @@ * xxxx-xx-xx, author, first version * xxxx-xx-xx, author, fix bug about xxx */ -#include "nfs_server_impl.h" #include #include #include +#include + +#include "nfs_server_impl.h" namespace dsn { namespace service { diff --git a/src/apps/nfs_test/nfs_test.app.example.h b/src/apps/nfs_test/nfs_test.app.example.h index 9fc46db6b6..ff89311a21 100644 --- a/src/apps/nfs_test/nfs_test.app.example.h +++ b/src/apps/nfs_test/nfs_test.app.example.h @@ -34,6 +34,7 @@ */ #pragma once #include +#include #include #include @@ -42,7 +43,7 @@ namespace replication { namespace application { // server app example -class nfs_server_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class nfs_server_app : public ::dsn::service_app { public: nfs_server_app(const service_app_info *info) : ::dsn::service_app(info) {} @@ -57,7 +58,7 @@ class nfs_server_app : public ::dsn::service_app, public virtual ::dsn::clientle }; // client app example -class nfs_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class nfs_client_app : public ::dsn::service_app { public: nfs_client_app(const service_app_info *info) : ::dsn::service_app(info) diff --git a/src/apps/skv/simple_kv.app.example.h b/src/apps/skv/simple_kv.app.example.h index 2987afe96b..ad1ddf6d0c 100644 --- a/src/apps/skv/simple_kv.app.example.h +++ b/src/apps/skv/simple_kv.app.example.h @@ -44,7 +44,7 @@ namespace dsn { namespace replication { namespace application { // client app example -class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class simple_kv_client_app : public ::dsn::service_app { public: simple_kv_client_app(const service_app_info *info) : ::dsn::service_app(info) {} @@ -127,7 +127,7 @@ class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::cl dsn::task_tracker _tracker; }; -class simple_kv_perf_test_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class simple_kv_perf_test_client_app : public ::dsn::service_app { public: simple_kv_perf_test_client_app(const service_app_info *info) : ::dsn::service_app(info) {} diff --git a/src/apps/skv/simple_kv.client.2.h b/src/apps/skv/simple_kv.client.2.h index 92679db91c..88786c5156 100644 --- a/src/apps/skv/simple_kv.client.2.h +++ b/src/apps/skv/simple_kv.client.2.h @@ -3,11 +3,12 @@ #include "simple_kv.types.h" #include #include +#include namespace dsn { namespace replication { namespace application { -class simple_kv_client2 : public virtual ::dsn::clientlet +class simple_kv_client2 { public: simple_kv_client2(::dsn::rpc_address server) { _server = server; } diff --git a/src/apps/skv/simple_kv.client.perf.h b/src/apps/skv/simple_kv.client.perf.h index 6e1f19a6f2..f0e0cd539f 100644 --- a/src/apps/skv/simple_kv.client.perf.h +++ b/src/apps/skv/simple_kv.client.perf.h @@ -63,7 +63,7 @@ class simple_kv_perf_test_client : public simple_kv_client2, void send_one_read(int payload_bytes, int key_space_size) { - auto rs = random64(0, 10000000) % key_space_size; + auto rs = dsn_random64(0, 10000000) % key_space_size; std::stringstream ss; ss << "key." << rs << "." << std::string(payload_bytes, 'x'); @@ -78,7 +78,7 @@ class simple_kv_perf_test_client : public simple_kv_client2, void send_one_write(int payload_bytes, int key_space_size) { - auto rs = random64(0, 10000000) % key_space_size; + auto rs = dsn_random64(0, 10000000) % key_space_size; std::stringstream ss; ss << "key." << rs; @@ -94,7 +94,7 @@ class simple_kv_perf_test_client : public simple_kv_client2, void send_one_append(int payload_bytes, int key_space_size) { - auto rs = random64(0, 10000000) % key_space_size; + auto rs = dsn_random64(0, 10000000) % key_space_size; std::stringstream ss; ss << "key." << rs; kv_pair req = {ss.str(), std::string(payload_bytes, 'x')}; @@ -110,4 +110,4 @@ class simple_kv_perf_test_client : public simple_kv_client2, }; } } -} \ No newline at end of file +} diff --git a/src/core/core/clientlet.cpp b/src/core/core/async_calls.cpp similarity index 82% rename from src/core/core/clientlet.cpp rename to src/core/core/async_calls.cpp index c4d584cc0f..72bc441a98 100644 --- a/src/core/core/clientlet.cpp +++ b/src/core/core/async_calls.cpp @@ -33,30 +33,14 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include #include #include #include +#include #include #include namespace dsn { - -clientlet::clientlet() { _access_thread_id_inited = false; } - -clientlet::~clientlet() { _access_thread_id_inited = false; } - -void clientlet::check_hashed_access() -{ - if (_access_thread_id_inited) { - dassert(::dsn::utils::get_current_tid() == _access_thread_id, - "the service is assumed to be accessed by one thread only!"); - } else { - _access_thread_id = ::dsn::utils::get_current_tid(); - _access_thread_id_inited = true; - } -} - namespace file { void copy_remote_files_impl(::dsn::rpc_address remote, @@ -83,5 +67,4 @@ void copy_remote_files_impl(::dsn::rpc_address remote, } } } - -} // end namespace dsn::service +} diff --git a/src/core/core/partition_resolver_simple.cpp b/src/core/core/partition_resolver_simple.cpp index 362f2251ca..845658ff21 100644 --- a/src/core/core/partition_resolver_simple.cpp +++ b/src/core/core/partition_resolver_simple.cpp @@ -35,6 +35,7 @@ #include "partition_resolver_simple.h" #include +#include namespace dsn { namespace dist { diff --git a/src/core/core/rpc_engine.cpp b/src/core/core/rpc_engine.cpp index e4c61213eb..7abf5ad582 100644 --- a/src/core/core/rpc_engine.cpp +++ b/src/core/core/rpc_engine.cpp @@ -50,8 +50,8 @@ #include #include #include +#include #include -#include #include namespace dsn { diff --git a/src/core/core/thread_access_checker.cpp b/src/core/core/thread_access_checker.cpp new file mode 100644 index 0000000000..99663242fb --- /dev/null +++ b/src/core/core/thread_access_checker.cpp @@ -0,0 +1,46 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include +#include +#include + +namespace dsn { + +thread_access_checker::thread_access_checker() { _access_thread_id_inited = false; } + +thread_access_checker::~thread_access_checker() { _access_thread_id_inited = false; } + +void thread_access_checker::only_one_thread_access() +{ + if (_access_thread_id_inited) { + dassert(::dsn::utils::get_current_tid() == _access_thread_id, + "the service is assumed to be accessed by one thread only!"); + } else { + _access_thread_id = ::dsn::utils::get_current_tid(); + _access_thread_id_inited = true; + } +} +} diff --git a/src/core/perf.tests/aio.cpp b/src/core/perf.tests/aio.cpp index d82512a9ee..02b82b5b9a 100644 --- a/src/core/perf.tests/aio.cpp +++ b/src/core/perf.tests/aio.cpp @@ -35,6 +35,8 @@ #include #include #include +#include + #include #include #include diff --git a/src/core/perf.tests/rpc.cpp b/src/core/perf.tests/rpc.cpp index b307d114db..0c6fc67348 100644 --- a/src/core/perf.tests/rpc.cpp +++ b/src/core/perf.tests/rpc.cpp @@ -33,10 +33,14 @@ * 2016-01-05, Tianyi Wang, first version */ #include -#include +#include + #include -#include "test_utils.h" + #include +#include + +#include "test_utils.h" TEST(core, rpc_perf_test) { diff --git a/src/core/perf.tests/task_queue.cpp b/src/core/perf.tests/task_queue.cpp index c3cbbf5b4d..7935dd1c3e 100644 --- a/src/core/perf.tests/task_queue.cpp +++ b/src/core/perf.tests/task_queue.cpp @@ -38,6 +38,8 @@ #include #include #include +#include + #include #include "test_utils.h" #include diff --git a/src/core/tests/aio.cpp b/src/core/tests/aio.cpp index 9ca72ec2f8..c3a2aff35e 100644 --- a/src/core/tests/aio.cpp +++ b/src/core/tests/aio.cpp @@ -34,9 +34,11 @@ */ #include +#include #include -#include #include + +#include #include "test_utils.h" using namespace ::dsn; diff --git a/src/core/tests/clientlet.cpp b/src/core/tests/async_call.cpp similarity index 86% rename from src/core/tests/clientlet.cpp rename to src/core/tests/async_call.cpp index a54a1ef5e2..6aaee664bc 100644 --- a/src/core/tests/clientlet.cpp +++ b/src/core/tests/async_call.cpp @@ -34,36 +34,40 @@ */ #include -#include +#include +#include #include -#include + +#include #include #include + #include "test_utils.h" DEFINE_TASK_CODE(LPC_TEST_CLIENTLET, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER) using namespace dsn; int global_value; -class test_clientlet : public clientlet +class tracker_class { public: std::string str; int number; dsn::task_tracker _tracker; + dsn::thread_access_checker _checker; public: - test_clientlet() : clientlet(), str("before called"), number(0) { global_value = 0; } + tracker_class() : str("before called"), number(0), _tracker(1) { global_value = 0; } void callback_function1() { - check_hashed_access(); + _checker.only_one_thread_access(); str = "after called"; ++global_value; } void callback_function2() { - check_hashed_access(); + _checker.only_one_thread_access(); number = 0; for (int i = 0; i < 1000; ++i) number += i; @@ -73,52 +77,52 @@ class test_clientlet : public clientlet void callback_function3() { ++global_value; } }; -TEST(dev_cpp, clientlet_task) +TEST(async_call, task_call) { /* normal lpc*/ - test_clientlet *cl = new test_clientlet(); + tracker_class *tc = new tracker_class(); task_ptr t = - tasking::enqueue(LPC_TEST_CLIENTLET, &cl->_tracker, [cl] { cl->callback_function1(); }); + tasking::enqueue(LPC_TEST_CLIENTLET, &tc->_tracker, [tc] { tc->callback_function1(); }); EXPECT_TRUE(t != nullptr); t->wait(); - EXPECT_TRUE(cl->str == "after called"); - delete cl; + EXPECT_TRUE(tc->str == "after called"); + delete tc; /* task tracking */ - cl = new test_clientlet(); + tc = new tracker_class(); std::vector test_tasks; t = tasking::enqueue(LPC_TEST_CLIENTLET, - &cl->_tracker, - [=] { cl->callback_function1(); }, + &tc->_tracker, + [=] { tc->callback_function1(); }, 0, std::chrono::seconds(30)); test_tasks.push_back(t); t = tasking::enqueue(LPC_TEST_CLIENTLET, - &cl->_tracker, - [cl] { cl->callback_function1(); }, + &tc->_tracker, + [tc] { tc->callback_function1(); }, 0, std::chrono::seconds(30)); test_tasks.push_back(t); t = tasking::enqueue_timer(LPC_TEST_CLIENTLET, - &cl->_tracker, - [cl] { cl->callback_function1(); }, + &tc->_tracker, + [tc] { tc->callback_function1(); }, std::chrono::seconds(20), 0, std::chrono::seconds(30)); test_tasks.push_back(t); - delete cl; + delete tc; for (unsigned int i = 0; i != test_tasks.size(); ++i) EXPECT_FALSE(test_tasks[i]->cancel(true)); } -TEST(dev_cpp, clientlet_rpc) +TEST(async_call, rpc_call) { rpc_address addr("localhost", 20101); rpc_address addr2("localhost", TEST_PORT_END); rpc_address addr3("localhost", 32767); - test_clientlet *cl = new test_clientlet(); + tracker_class *tc = new tracker_class(); rpc::call_one_way_typed(addr, RPC_TEST_STRING_COMMAND, std::string("expect_no_reply"), 0); std::vector task_vec; const char *command = "echo hello world"; @@ -127,7 +131,7 @@ TEST(dev_cpp, clientlet_rpc) auto t = rpc::call(addr3, RPC_TEST_STRING_COMMAND, *str_command, - &cl->_tracker, + &tc->_tracker, [str_command](error_code ec, std::string &&resp) { if (ERR_OK == ec) EXPECT_TRUE(str_command->substr(5) == resp); @@ -136,11 +140,13 @@ TEST(dev_cpp, clientlet_rpc) t = rpc::call(addr2, RPC_TEST_STRING_COMMAND, std::string(command), - &cl->_tracker, + &tc->_tracker, [](error_code ec, std::string &&resp) { EXPECT_TRUE(ec == ERR_OK); }); task_vec.push_back(t); for (int i = 0; i != task_vec.size(); ++i) task_vec[i]->wait(); + + delete tc; } class simple_task : public dsn::raw_task @@ -196,8 +202,7 @@ bool spin_wait(const std::function &pred, int wait_times) } return pred(); } - -TEST(dev_cpp, task_destructor) +TEST(async_call, task_destructor) { { task_ptr t(new simple_task(LPC_TEST_CLIENTLET, nullptr)); diff --git a/src/core/tests/corrupt_message.cpp b/src/core/tests/corrupt_message.cpp index d51526631b..b6de008899 100644 --- a/src/core/tests/corrupt_message.cpp +++ b/src/core/tests/corrupt_message.cpp @@ -3,6 +3,8 @@ #include #include +#include + #include #include diff --git a/src/core/tests/rpc.cpp b/src/core/tests/rpc.cpp index a4ce144641..2d70090340 100644 --- a/src/core/tests/rpc.cpp +++ b/src/core/tests/rpc.cpp @@ -44,7 +44,8 @@ #include #include -#include +#include + #include "test_utils.h" typedef std::function rpc_reply_handler; diff --git a/src/dist/failure_detector/failure_detector.cpp b/src/dist/failure_detector/failure_detector.cpp index 2462c398d1..bd8d37bd2c 100644 --- a/src/dist/failure_detector/failure_detector.cpp +++ b/src/dist/failure_detector/failure_detector.cpp @@ -118,7 +118,7 @@ error_code failure_detector::stop() void failure_detector::register_master(::dsn::rpc_address target) { bool setup_timer = false; - uint64_t now = now_ms(); + uint64_t now = dsn_now_ms(); zauto_lock l(_lock); @@ -143,7 +143,7 @@ void failure_detector::register_master(::dsn::rpc_address target) ret.first->second.send_beacon_timer = tasking::enqueue_timer(LPC_BEACON_SEND, &_tracker, - [this, target]() { this->send_beacon(target, now_ms()); }, + [this, target]() { this->send_beacon(target, dsn_now_ms()); }, std::chrono::milliseconds(_beacon_interval_milliseconds), 0, std::chrono::milliseconds(1)); @@ -171,7 +171,7 @@ bool failure_detector::switch_master(::dsn::rpc_address from, it->second.send_beacon_timer = tasking::enqueue_timer(LPC_BEACON_SEND, &_tracker, - [this, to]() { this->send_beacon(to, now_ms()); }, + [this, to]() { this->send_beacon(to, dsn_now_ms()); }, std::chrono::milliseconds(_beacon_interval_milliseconds), 0, std::chrono::milliseconds(delay_milliseconds)); @@ -218,7 +218,7 @@ void failure_detector::check_all_records() } std::vector<::dsn::rpc_address> expire; - uint64_t now = now_ms(); + uint64_t now = dsn_now_ms(); { zauto_lock l(_lock); @@ -256,7 +256,7 @@ void failure_detector::check_all_records() // process recv record, for server expire.clear(); - now = now_ms(); + now = dsn_now_ms(); { zauto_lock l(_lock); @@ -297,13 +297,13 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon { ack.time = beacon.time; ack.this_node = beacon.to_addr; - ack.primary_node = primary_address(); + ack.primary_node = dsn_primary_address(); ack.is_master = true; ack.allowed = true; zauto_lock l(_lock); - uint64_t now = now_ms(); + uint64_t now = dsn_now_ms(); auto node = beacon.from_addr; worker_map::iterator itr = _workers.find(node); @@ -354,7 +354,7 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack */ uint64_t beacon_send_time = ack.time; auto node = ack.this_node; - uint64_t now = now_ms(); + uint64_t now = dsn_now_ms(); if (err != ERR_OK) { dwarn("ping master(%s) failed, timeout_ms = %u, err = %s", @@ -370,7 +370,7 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack dwarn("received beacon ack without corresponding master, ignore it, " "remote_master[%s], local_worker[%s]", node.to_string(), - primary_address().to_string()); + dsn_primary_address().to_string()); return false; } @@ -379,7 +379,7 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack dwarn("worker rejected, stop sending beacon message, " "remote_master[%s], local_worker[%s]", node.to_string(), - primary_address().to_string()); + dsn_primary_address().to_string()); record.rejected = true; record.send_beacon_timer->cancel(true); return false; @@ -442,7 +442,7 @@ bool failure_detector::is_master_connected(::dsn::rpc_address node) const void failure_detector::register_worker(::dsn::rpc_address target, bool is_connected) { - uint64_t now = now_ms(); + uint64_t now = dsn_now_ms(); /* * callers should use the fd::_lock necessarily @@ -500,7 +500,7 @@ void failure_detector::send_beacon(::dsn::rpc_address target, uint64_t time) { beacon_msg beacon; beacon.time = time; - beacon.from_addr = primary_address(); + beacon.from_addr = dsn_primary_address(); beacon.to_addr = target; beacon.__set_start_time(static_cast(dsn_runtime_init_time_ms())); diff --git a/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp b/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp index 90f8c02328..f4b24a9910 100644 --- a/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp +++ b/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp @@ -52,7 +52,7 @@ slave_failure_detector_with_multimaster::slave_failure_detector_with_multimaster } _meta_servers.group_address()->set_leader( - meta_servers[random32(0, (uint32_t)meta_servers.size() - 1)]); + meta_servers[dsn_random32(0, (uint32_t)meta_servers.size() - 1)]); // ATTENTION: here we disable dsn_group_set_update_leader_automatically to avoid // failure detecting logic is affected by rpc failure or rpc forwarding. diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index c5745dd246..1b58c1df67 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -40,6 +40,7 @@ #include "replica.h" #include #include +#include namespace dsn { namespace replication { diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index b24e128d5d..f53fbf45da 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -120,7 +120,7 @@ class log_block /* : public ::dsn::transient_object*/ // this class is thread safe // class replica; -class mutation_log : public ref_counter, public virtual clientlet +class mutation_log : public ref_counter { public: // return true when the mutation's offset is not less than diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 29c0b38104..e6e8a8124a 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -79,7 +79,7 @@ replica::replica( void replica::update_last_checkpoint_generate_time() { - _last_checkpoint_generate_time_ms = now_ms(); + _last_checkpoint_generate_time_ms = dsn_now_ms(); uint64_t max_interval_ms = _options->checkpoint_max_interval_hours * 3600000UL; // use random trigger time to avoid flush peek _next_checkpoint_interval_trigger_time_ms = @@ -105,7 +105,7 @@ void replica::init_state() _config.pid.set_partition_index(0); _config.status = partition_status::PS_INACTIVE; _primary_states.membership.ballot = 0; - _create_time_ms = now_ms(); + _create_time_ms = dsn_now_ms(); _last_config_change_time_ms = _create_time_ms; update_last_checkpoint_generate_time(); _private_log = nullptr; diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index c2e9896d7f..3fe6b9461e 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -43,7 +43,9 @@ // #include +#include #include + #include #include @@ -381,6 +383,8 @@ class replica : public serverlet, public ref_counter, public replica_ba perf_counter_wrapper _counter_private_log_size; dsn::task_tracker _tracker; + // the thread access checker + dsn::thread_access_checker _checker; }; typedef dsn::ref_ptr replica_ptr; } diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index 58a969746e..78ead9b5e1 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -44,7 +44,7 @@ namespace replication { void replica::on_client_write(task_code code, dsn_message_t request) { - check_hashed_access(); + _checker.only_one_thread_access(); if (partition_status::PS_PRIMARY != status()) { response_client_message(false, request, ERR_INVALID_STATE); @@ -223,7 +223,7 @@ void replica::do_possible_commit_on_primary(mutation_ptr &mu) void replica::on_prepare(dsn_message_t request) { - check_hashed_access(); + _checker.only_one_thread_access(); replica_configuration rconfig; mutation_ptr mu; @@ -378,7 +378,7 @@ void replica::on_prepare(dsn_message_t request) void replica::on_append_log_completed(mutation_ptr &mu, error_code err, size_t size) { - check_hashed_access(); + _checker.only_one_thread_access(); dinfo("%s: append shared log completed for mutation %s, size = %u, err = %s", name(), @@ -437,7 +437,7 @@ void replica::on_prepare_reply(std::pair p dsn_message_t request, dsn_message_t reply) { - check_hashed_access(); + _checker.only_one_thread_access(); mutation_ptr mu = pr.first; partition_status::type target_status = pr.second; diff --git a/src/dist/replication/lib/replica_backup.cpp b/src/dist/replication/lib/replica_backup.cpp index 260887854f..6ffdad8d5b 100644 --- a/src/dist/replication/lib/replica_backup.cpp +++ b/src/dist/replication/lib/replica_backup.cpp @@ -3,18 +3,19 @@ #include #include +#include "dist/replication/client_lib/block_service_manager.h" + #include "replica.h" #include "mutation.h" #include "mutation_log.h" #include "replica_stub.h" -#include "../client_lib/block_service_manager.h" namespace dsn { namespace replication { void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response) { - check_hashed_access(); + _checker.only_one_thread_access(); const std::string &policy_name = request.policy.policy_name; auto backup_id = request.backup_id; @@ -441,7 +442,7 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) // - may trigger async checkpoint and invoke wait_async_checkpoint_for_backup() void replica::trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context) { - check_hashed_access(); + _checker.only_one_thread_access(); if (backup_context->status() != ColdBackupCheckpointing) { ddebug("%s: ignore triggering async checkpoint because backup_status = %s", @@ -516,7 +517,7 @@ void replica::trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup // - may schedule local_create_backup_checkpoint if async checkpoint completed void replica::wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context) { - check_hashed_access(); + _checker.only_one_thread_access(); if (backup_context->status() != ColdBackupCheckpointing) { ddebug("%s: ignore waiting async checkpoint because backup_status = %s", diff --git a/src/dist/replication/lib/replica_check.cpp b/src/dist/replication/lib/replica_check.cpp index 099dada6dd..41703ebd11 100644 --- a/src/dist/replication/lib/replica_check.cpp +++ b/src/dist/replication/lib/replica_check.cpp @@ -44,7 +44,7 @@ namespace replication { void replica::init_group_check() { - check_hashed_access(); + _checker.only_one_thread_access(); ddebug("%s: init group check", name()); @@ -131,7 +131,7 @@ void replica::broadcast_group_check() void replica::on_group_check(const group_check_request &request, /*out*/ group_check_response &response) { - check_hashed_access(); + _checker.only_one_thread_access(); ddebug("%s: process group check, primary = %s, ballot = %" PRId64 ", status = %s, last_committed_decree = %" PRId64, @@ -190,7 +190,7 @@ void replica::on_group_check_reply(error_code err, const std::shared_ptr &req, const std::shared_ptr &resp) { - check_hashed_access(); + _checker.only_one_thread_access(); if (partition_status::PS_PRIMARY != status() || req->config.ballot < get_ballot()) { return; diff --git a/src/dist/replication/lib/replica_chkpt.cpp b/src/dist/replication/lib/replica_chkpt.cpp index 9420898573..af1d866a7c 100644 --- a/src/dist/replication/lib/replica_chkpt.cpp +++ b/src/dist/replication/lib/replica_chkpt.cpp @@ -47,9 +47,9 @@ namespace replication { // run in replica thread void replica::on_checkpoint_timer() { - check_hashed_access(); + _checker.only_one_thread_access(); - if (now_ms() > _next_checkpoint_interval_trigger_time_ms) { + if (dsn_now_ms() > _next_checkpoint_interval_trigger_time_ms) { // we trigger emergency checkpoint if no checkpoint generated for a long time ddebug("%s: trigger emergency checkpoint by checkpoint_max_interval_hours, " "config_interval = %dh (%" PRIu64 "ms), random_interval = %" PRIu64 "ms", @@ -120,7 +120,7 @@ void replica::init_checkpoint(bool is_emergency) void replica::on_copy_checkpoint(const replica_configuration &request, /*out*/ learn_response &response) { - check_hashed_access(); + _checker.only_one_thread_access(); if (request.ballot > get_ballot()) { if (!update_local_configuration(request)) { @@ -163,7 +163,7 @@ void replica::on_copy_checkpoint_ack(error_code err, const std::shared_ptr &req, const std::shared_ptr &resp) { - check_hashed_access(); + _checker.only_one_thread_access(); if (partition_status::PS_PRIMARY != status()) { _primary_states.checkpoint_task = nullptr; @@ -217,7 +217,7 @@ void replica::on_copy_checkpoint_file_completed(error_code err, std::shared_ptr resp, const std::string &chk_dir) { - check_hashed_access(); + _checker.only_one_thread_access(); if (ERR_OK != err) { dwarn("copy checkpoint failed, err(%s), remote_addr(%s)", @@ -352,7 +352,7 @@ void replica::catch_up_with_private_logs(partition_status::type s) void replica::on_checkpoint_completed(error_code err) { - check_hashed_access(); + _checker.only_one_thread_access(); // closing or wrong timing if (partition_status::PS_SECONDARY != status() || err == ERR_WRONG_TIMING) { diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 01725c4f3b..8937ef8db9 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -45,7 +45,7 @@ namespace replication { void replica::on_config_proposal(configuration_update_request &proposal) { - check_hashed_access(); + _checker.only_one_thread_access(); ddebug("%s: process config proposal %s for %s", name(), @@ -423,7 +423,7 @@ void replica::on_update_configuration_on_meta_server_reply( dsn_message_t response, std::shared_ptr req) { - check_hashed_access(); + _checker.only_one_thread_access(); if (partition_status::PS_INACTIVE != status() || _stub->is_connected() == false) { _primary_states.reconfiguration_task = nullptr; @@ -706,7 +706,7 @@ bool replica::update_local_configuration(const replica_configuration &config, result.to_string()); } } - _last_config_change_time_ms = now_ms(); + _last_config_change_time_ms = dsn_now_ms(); dassert(max_prepared_decree() >= last_committed_decree(), "%" PRId64 " VS %" PRId64 "", max_prepared_decree(), diff --git a/src/dist/replication/lib/replica_init.cpp b/src/dist/replication/lib/replica_init.cpp index 8789e984aa..eb7d4a1909 100644 --- a/src/dist/replication/lib/replica_init.cpp +++ b/src/dist/replication/lib/replica_init.cpp @@ -242,7 +242,7 @@ error_code replica::init_app_and_prepare_list(bool create_new) std::map replay_condition; replay_condition[_config.pid] = _app->last_committed_decree(); - uint64_t start_time = now_ms(); + uint64_t start_time = dsn_now_ms(); err = _private_log->open( [this](int log_length, mutation_ptr &mu) { return replay_mutation(mu, true); }, [this](error_code err) { @@ -253,7 +253,7 @@ error_code replica::init_app_and_prepare_list(bool create_new) }, replay_condition); - uint64_t finish_time = now_ms(); + uint64_t finish_time = dsn_now_ms(); if (err == ERR_OK) { ddebug("%s: replay private log succeed, durable = %" PRId64 diff --git a/src/dist/replication/lib/replica_learn.cpp b/src/dist/replication/lib/replica_learn.cpp index bbe72b5f3d..f3f362afd3 100644 --- a/src/dist/replication/lib/replica_learn.cpp +++ b/src/dist/replication/lib/replica_learn.cpp @@ -46,7 +46,7 @@ namespace replication { void replica::init_learn(uint64_t signature) { - check_hashed_access(); + _checker.only_one_thread_access(); if (status() != partition_status::PS_POTENTIAL_SECONDARY) { dwarn( @@ -230,7 +230,7 @@ void replica::init_learn(uint64_t signature) void replica::on_learn(dsn_message_t msg, const learn_request &request) { - check_hashed_access(); + _checker.only_one_thread_access(); learn_response response; if (partition_status::PS_PRIMARY != status()) { @@ -502,7 +502,7 @@ void replica::on_learn(dsn_message_t msg, const learn_request &request) void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp) { - check_hashed_access(); + _checker.only_one_thread_access(); dassert(partition_status::PS_POTENTIAL_SECONDARY == status(), "invalid partition status, status = %s", @@ -1116,7 +1116,7 @@ void replica::on_copy_remote_state_completed(error_code err, void replica::on_learn_remote_state_completed(error_code err) { - check_hashed_access(); + _checker.only_one_thread_access(); if (partition_status::PS_POTENTIAL_SECONDARY != status()) { dwarn("%s: on_learn_remote_state_completed[%016" PRIx64 @@ -1157,7 +1157,7 @@ void replica::on_learn_remote_state_completed(error_code err) void replica::handle_learning_error(error_code err, bool is_local_error) { - check_hashed_access(); + _checker.only_one_thread_access(); derror("%s: handle_learning_error[%016" PRIx64 "]: learnee = %s, learn_duration = %" PRIu64 " ms, err = %s, %s", @@ -1246,7 +1246,7 @@ void replica::notify_learn_completion() void replica::on_learn_completion_notification(const group_check_response &report, /*out*/ learn_notify_response &response) { - check_hashed_access(); + _checker.only_one_thread_access(); ddebug("%s: on_learn_completion_notification[%016" PRIx64 "]: learner = %s, learning_status = %s", @@ -1291,7 +1291,7 @@ void replica::on_learn_completion_notification_reply(error_code err, group_check_response &&report, learn_notify_response &&resp) { - check_hashed_access(); + _checker.only_one_thread_access(); dassert(partition_status::PS_POTENTIAL_SECONDARY == status(), "invalid partition_status, status = %s", diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 01a53f1497..2720410ce5 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -269,7 +269,7 @@ void replica_stub::initialize(bool clear /* = false*/) void replica_stub::initialize(const replication_options &opts, bool clear /* = false*/) { - _primary_address = primary_address(); + _primary_address = dsn_primary_address(); ddebug("primary_address = %s", _primary_address.to_string()); set_options(opts); @@ -365,7 +365,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f ddebug("%s@%s: load replica '%s' success, = <%" PRId64 ", %" PRId64 ">, last_prepared_decree = %" PRId64, r->get_gpid().to_string(), - primary_address().to_string(), + dsn_primary_address().to_string(), dir.c_str(), r->last_durable_decree(), r->last_committed_decree(), @@ -530,13 +530,13 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f // gc if (false == _options.gc_disabled) { - _gc_timer_task = - tasking::enqueue_timer(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, - &_tracker, - [this] { on_gc(); }, - std::chrono::milliseconds(_options.gc_interval_ms), - 0, - std::chrono::milliseconds(random32(0, _options.gc_interval_ms))); + _gc_timer_task = tasking::enqueue_timer( + LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, + &_tracker, + [this] { on_gc(); }, + std::chrono::milliseconds(_options.gc_interval_ms), + 0, + std::chrono::milliseconds(dsn_random32(0, _options.gc_interval_ms))); } // disk stat @@ -558,7 +558,7 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f } if (_options.delay_for_fd_timeout_on_start) { - uint64_t now_time_ms = now_ms(); + uint64_t now_time_ms = dsn_now_ms(); uint64_t delay_time_ms = (_options.fd_grace_seconds + 3) * 1000; // for more 3 seconds than grace seconds if (now_time_ms < dsn_runtime_init_time_ms() + delay_time_ms) { @@ -1194,7 +1194,7 @@ void replica_stub::on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id) replica_ptr replica = get_replica(id); if (replica != nullptr && replica->status() != partition_status::PS_POTENTIAL_SECONDARY) { if (replica->status() == partition_status::PS_INACTIVE && - now_ms() - replica->create_time_milliseconds() < + dsn_now_ms() - replica->create_time_milliseconds() < _options.gc_memory_replica_interval_ms) { ddebug("%s: replica not exists on meta server, wait to close", replica->name()); return; diff --git a/src/dist/replication/meta_server/distributed_lock_service_simple.cpp b/src/dist/replication/meta_server/distributed_lock_service_simple.cpp index cf707f00fc..9700e81532 100644 --- a/src/dist/replication/meta_server/distributed_lock_service_simple.cpp +++ b/src/dist/replication/meta_server/distributed_lock_service_simple.cpp @@ -33,12 +33,14 @@ * xxxx-xx-xx, author, fix bug about xxx */ +#include + #include "dist/replication/client_lib/replication_common.h" #include "distributed_lock_service_simple.h" namespace dsn { namespace dist { -DEFINE_TASK_CODE(LPC_DIST_LOCK_SVC_RANDOM_EXPIRE, TASK_PRIORITY_COMMON, THREAD_POOL_META_SERVER); +DEFINE_TASK_CODE(LPC_DIST_LOCK_SVC_RANDOM_EXPIRE, TASK_PRIORITY_COMMON, THREAD_POOL_META_SERVER) static void __lock_cb_bind_and_enqueue(task_ptr lock_task, error_code err, diff --git a/src/dist/replication/meta_server/distributed_lock_service_simple.h b/src/dist/replication/meta_server/distributed_lock_service_simple.h index 5386433711..1016eac1b7 100644 --- a/src/dist/replication/meta_server/distributed_lock_service_simple.h +++ b/src/dist/replication/meta_server/distributed_lock_service_simple.h @@ -41,7 +41,7 @@ using namespace ::dsn::service; namespace dsn { namespace dist { -class distributed_lock_service_simple : public distributed_lock_service, public clientlet +class distributed_lock_service_simple : public distributed_lock_service { public: virtual ~distributed_lock_service_simple() { _tracker.cancel_outstanding_tasks(); } diff --git a/src/dist/replication/meta_server/meta_server_failure_detector.cpp b/src/dist/replication/meta_server/meta_server_failure_detector.cpp index 5d854960f0..52c3745f1f 100644 --- a/src/dist/replication/meta_server/meta_server_failure_detector.cpp +++ b/src/dist/replication/meta_server/meta_server_failure_detector.cpp @@ -89,7 +89,7 @@ bool meta_server_failure_detector::get_leader(rpc_address *leader) } if (_is_leader.load()) { - *leader = primary_address(); + *leader = dsn_primary_address(); return true; } else if (_lock_svc == nullptr) { leader->set_invalid(); @@ -99,7 +99,7 @@ bool meta_server_failure_detector::get_leader(rpc_address *leader) uint64_t version; error_code err = _lock_svc->query_cache(_primary_lock_id, lock_owner, version); if (err == dsn::ERR_OK && leader->from_string_ipv4(lock_owner.c_str())) { - return (*leader) == primary_address(); + return (*leader) == dsn_primary_address(); } else { dwarn("query leader from cache got error(%s)", err.to_string()); leader->set_invalid(); @@ -119,7 +119,7 @@ void meta_server_failure_detector::acquire_leader_lock() error_code err; auto tasks = _lock_svc->lock( _primary_lock_id, - primary_address().to_std_string(), + dsn_primary_address().to_std_string(), // lock granted LPC_META_SERVER_LEADER_LOCK_CALLBACK, [this, &err](error_code ec, const std::string &owner, uint64_t version) { @@ -180,10 +180,10 @@ void meta_server_failure_detector::leader_initialize(const std::string &lock_ser dassert(addr.from_string_ipv4(lock_service_owner.c_str()), "parse %s to rpc_address failed", lock_service_owner.c_str()); - dassert(addr == primary_address(), + dassert(addr == dsn_primary_address(), "acquire leader return success, but owner not match: %s vs %s", addr.to_string(), - primary_address().to_string()); + dsn_primary_address().to_string()); _is_leader.store(true); _election_moment.store(dsn_now_ms()); } diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index d79b706878..4df13b056e 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -231,7 +231,7 @@ error_code meta_service::start() _failure_detector->acquire_leader_lock(); dassert(_failure_detector->get_leader(nullptr), "must be primary at this point"); ddebug("%s got the primary lock, start to recover server state from remote storage", - primary_address().to_string()); + dsn_primary_address().to_string()); // initialize the load balancer server_load_balancer *balancer = utils::factory_store::create( @@ -449,7 +449,7 @@ void meta_service::on_query_cluster_info(dsn_message_t req) response.values.push_back(oss.str()); response.keys.push_back("primary_meta_server"); - response.values.push_back(primary_address().to_std_string()); + response.values.push_back(dsn_primary_address().to_std_string()); std::string zk_hosts = dsn_config_get_value_string("zookeeper", "hosts_list", "", "zookeeper_hosts"); zk_hosts.erase(std::remove_if(zk_hosts.begin(), zk_hosts.end(), ::isspace), zk_hosts.end()); @@ -595,7 +595,7 @@ void meta_service::on_start_recovery(dsn_message_t req) zauto_write_lock l(_meta_lock); if (_started.load()) { ddebug("service(%s) is already started, ignore the recovery request", - primary_address().to_string()); + dsn_primary_address().to_string()); response.err = ERR_SERVICE_ALREADY_RUNNING; } else { configuration_recovery_request request; diff --git a/src/dist/replication/meta_server/meta_state_service_simple.cpp b/src/dist/replication/meta_server/meta_state_service_simple.cpp index beaf0081f9..729a7e80b8 100644 --- a/src/dist/replication/meta_server/meta_state_service_simple.cpp +++ b/src/dist/replication/meta_server/meta_state_service_simple.cpp @@ -36,6 +36,7 @@ #include "meta_state_service_simple.h" #include +#include #include #include diff --git a/src/dist/replication/meta_server/meta_state_service_simple.h b/src/dist/replication/meta_server/meta_state_service_simple.h index 074f4cb4e4..bf86564b6f 100644 --- a/src/dist/replication/meta_server/meta_state_service_simple.h +++ b/src/dist/replication/meta_server/meta_state_service_simple.h @@ -46,7 +46,7 @@ DEFINE_TASK_CODE_AIO(LPC_META_STATE_SERVICE_SIMPLE_INTERNAL, TASK_PRIORITY_HIGH, THREAD_POOL_DEFAULT); -class meta_state_service_simple : public meta_state_service, public clientlet +class meta_state_service_simple : public meta_state_service { public: explicit meta_state_service_simple() diff --git a/src/dist/replication/meta_server/server_state.cpp b/src/dist/replication/meta_server/server_state.cpp index 1e343ebde9..ad81e4522f 100644 --- a/src/dist/replication/meta_server/server_state.cpp +++ b/src/dist/replication/meta_server/server_state.cpp @@ -35,9 +35,9 @@ */ #include -#include #include #include +#include #include #include #include @@ -2118,7 +2118,7 @@ server_state::sync_apps_from_replica_nodes(const std::vector & ddebug("send query app and replica request to node(%s)", replica_nodes[i].to_string()); query_app_info_request app_query; - app_query.meta_server = _meta_svc->primary_address(); + app_query.meta_server = dsn_primary_address(); rpc::call(replica_nodes[i], RPC_QUERY_APP_INFO, diff --git a/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h b/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h index 0b6222f0fc..eb0f7f12ab 100644 --- a/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h +++ b/src/dist/replication/test/meta_test/unit_test/meta_service_test_app.h @@ -2,6 +2,7 @@ #define META_SERVICE_TEST_APP_H #include +#include #include #include #include "dist/replication/meta_server/server_state.h" diff --git a/src/dist/replication/test/simple_kv/client.h b/src/dist/replication/test/simple_kv/client.h index a75dbb7b4a..75cfe16de1 100644 --- a/src/dist/replication/test/simple_kv/client.h +++ b/src/dist/replication/test/simple_kv/client.h @@ -41,7 +41,7 @@ namespace dsn { namespace replication { namespace test { -class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::clientlet +class simple_kv_client_app : public ::dsn::service_app { public: simple_kv_client_app(const service_app_info *info); diff --git a/src/dist/replication/test/simple_kv/simple_kv.client.h b/src/dist/replication/test/simple_kv/simple_kv.client.h index a67d8a8af8..3b0c48e881 100644 --- a/src/dist/replication/test/simple_kv/simple_kv.client.h +++ b/src/dist/replication/test/simple_kv/simple_kv.client.h @@ -27,6 +27,8 @@ #include #include #include +#include + #include "simple_kv.code.definition.h" #include "simple_kv.types.h" #include "case.h" @@ -34,7 +36,7 @@ namespace dsn { namespace replication { namespace test { -class simple_kv_client : public virtual ::dsn::clientlet +class simple_kv_client { public: simple_kv_client(::dsn::rpc_address server) { _server = server; } diff --git a/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.cpp b/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.cpp index b4acf3b69b..0b3998750f 100644 --- a/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.cpp +++ b/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.cpp @@ -31,7 +31,9 @@ * Revision history: * 2015-12-04, @shengofsun (sunweijie@xiaomi.com) */ +#include #include + #include #include #include @@ -50,8 +52,7 @@ namespace dist { std::string distributed_lock_service_zookeeper::LOCK_NODE_PREFIX = "LOCKNODE"; -distributed_lock_service_zookeeper::distributed_lock_service_zookeeper() - : clientlet(), ref_counter() +distributed_lock_service_zookeeper::distributed_lock_service_zookeeper() : ref_counter() { _first_call = true; } diff --git a/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.h b/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.h index 03f2efc462..2e3855f22b 100644 --- a/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.h +++ b/src/dist/replication/zookeeper/distributed_lock_service_zookeeper.h @@ -41,9 +41,7 @@ namespace dsn { namespace dist { class zookeeper_session; -class distributed_lock_service_zookeeper : public distributed_lock_service, - public clientlet, - public ref_counter +class distributed_lock_service_zookeeper : public distributed_lock_service, public ref_counter { public: explicit distributed_lock_service_zookeeper(); diff --git a/src/dist/replication/zookeeper/lock_struct.cpp b/src/dist/replication/zookeeper/lock_struct.cpp index e2ada4bb86..7ade218f48 100644 --- a/src/dist/replication/zookeeper/lock_struct.cpp +++ b/src/dist/replication/zookeeper/lock_struct.cpp @@ -40,6 +40,9 @@ #include #include +#include +#include + #include "distributed_lock_service_zookeeper.h" #include "lock_struct.h" #include "lock_types.h" @@ -91,7 +94,7 @@ static bool is_zookeeper_timeout(int zookeeper_error) #define REMOVE_FOR_UNLOCK true #define REMOVE_FOR_CANCEL false -lock_struct::lock_struct(lock_srv_ptr srv) : clientlet(), ref_counter() +lock_struct::lock_struct(lock_srv_ptr srv) : ref_counter() { _dist_lock_service = srv; clear(); @@ -123,7 +126,7 @@ void lock_struct::clear() void lock_struct::remove_lock() { - check_hashed_access(); + _checker.only_one_thread_access(); if (_dist_lock_service != nullptr) { _dist_lock_service->erase(std::make_pair(_lock_id, _myself._node_value)); @@ -175,7 +178,7 @@ void lock_struct::my_lock_removed(lock_struct_ptr _this, int zoo_event) { static const lock_state allow_state[] = { lock_state::locked, lock_state::unlocking, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); if (_this->_state == lock_state::unlocking || _this->_state == lock_state::expired) { @@ -189,7 +192,7 @@ void lock_struct::owner_change(lock_struct_ptr _this, int zoo_event) { static const lock_state allow_state[] = { lock_state::uninitialized, lock_state::pending, lock_state::cancelled, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); if (_this->_state == lock_state::uninitialized) { @@ -222,7 +225,7 @@ void lock_struct::after_remove_duplicated_locknode(lock_struct_ptr _this, }; static const int allow_state[] = { lock_state::pending, lock_state::cancelled, lock_state::expired, lock_state::locked}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 3, zerror(ec)); __check_code(_this->_state, allow_state, 4, string_state(_this->_state)); @@ -290,7 +293,7 @@ void lock_struct::after_get_lock_owner(lock_struct_ptr _this, }; static const int allow_state[] = { lock_state::pending, lock_state::cancelled, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 3, zerror(ec)); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); @@ -345,7 +348,7 @@ void lock_struct::after_self_check(lock_struct_ptr _this, }; static const lock_state allow_state[] = { lock_state::locked, lock_state::unlocking, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 3, zerror(ec)); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); @@ -433,7 +436,7 @@ void lock_struct::after_get_lockdir_nodes(lock_struct_ptr _this, static const int allow_state[] = { lock_state::pending, lock_state::cancelled, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 2, zerror(ec)); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); @@ -545,7 +548,7 @@ void lock_struct::after_create_locknode(lock_struct_ptr _this, static const int allow_state[] = { lock_state::pending, lock_state::cancelled, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 2, zerror(ec)); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); @@ -605,7 +608,7 @@ void lock_struct::create_locknode() /*static*/ void lock_struct::after_create_lockdir(lock_struct_ptr _this, int ec) { - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); static const int allow_ec[] = { ZOK, ZNODEEXISTS, // succeed state @@ -635,7 +638,7 @@ void lock_struct::try_lock(lock_struct_ptr _this, lock_future_ptr lock_callback, lock_future_ptr expire_callback) { - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); if (_this->_state != lock_state::uninitialized) { lock_callback->enqueue_with(ERR_RECURSIVE_LOCK, "", -1); @@ -678,7 +681,7 @@ void lock_struct::after_remove_my_locknode(lock_struct_ptr _this, int ec, bool r }; static const int allow_state[] = { lock_state::cancelled, lock_state::unlocking, lock_state::expired}; - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); __check_code(ec, allow_ec, 3, zerror(ec)); __check_code(_this->_state, allow_state, 3, string_state(_this->_state)); @@ -746,7 +749,7 @@ void lock_struct::remove_my_locknode(std::string &&znode_path, /*static*/ void lock_struct::cancel_pending_lock(lock_struct_ptr _this, lock_future_ptr cancel_callback) { - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); if (_this->_state != lock_state::uninitialized && _this->_state != lock_state::pending && _this->_state != lock_state::cancelled) { cancel_callback->enqueue_with(ERR_INVALID_PARAMETERS, "", _this->_owner._sequence_id); @@ -770,7 +773,7 @@ void lock_struct::cancel_pending_lock(lock_struct_ptr _this, lock_future_ptr can /*static*/ void lock_struct::unlock(lock_struct_ptr _this, error_code_future_ptr unlock_callback) { - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); if (_this->_state != lock_state::locked && _this->_state != lock_state::unlocking) { ddebug("lock(%s) myself(%s) seqid(%lld) state(%s), just return", _this->_lock_id.c_str(), @@ -791,7 +794,7 @@ void lock_struct::unlock(lock_struct_ptr _this, error_code_future_ptr unlock_cal /*static*/ void lock_struct::lock_expired(lock_struct_ptr _this) { - _this->check_hashed_access(); + _this->_checker.only_one_thread_access(); _this->on_expire(); } } diff --git a/src/dist/replication/zookeeper/lock_struct.h b/src/dist/replication/zookeeper/lock_struct.h index a712ed3fef..26b8165ad8 100644 --- a/src/dist/replication/zookeeper/lock_struct.h +++ b/src/dist/replication/zookeeper/lock_struct.h @@ -33,6 +33,7 @@ */ #pragma once +#include #include #include @@ -59,7 +60,7 @@ struct zoolock_pair int64_t _sequence_id; }; -class lock_struct : public clientlet, public ref_counter +class lock_struct : public ref_counter { public: lock_struct(lock_srv_ptr srv); @@ -118,6 +119,8 @@ class lock_struct : public clientlet, public ref_counter int _hash; lock_srv_ptr _dist_lock_service; + + thread_access_checker _checker; }; } } diff --git a/src/dist/replication/zookeeper/lock_types.h b/src/dist/replication/zookeeper/lock_types.h index d946932615..a47263404c 100644 --- a/src/dist/replication/zookeeper/lock_types.h +++ b/src/dist/replication/zookeeper/lock_types.h @@ -35,7 +35,6 @@ #include #include -#include #include namespace dsn { diff --git a/src/dist/replication/zookeeper/meta_state_service_zookeeper.cpp b/src/dist/replication/zookeeper/meta_state_service_zookeeper.cpp index 8faf7a4011..01e5277080 100644 --- a/src/dist/replication/zookeeper/meta_state_service_zookeeper.cpp +++ b/src/dist/replication/zookeeper/meta_state_service_zookeeper.cpp @@ -31,6 +31,7 @@ * Revision history: * 2015-12-04, @shengofsun (sunweijie@xiaomi.com) */ +#include #include #include diff --git a/src/dist/replication/zookeeper/zookeeper_session.h b/src/dist/replication/zookeeper/zookeeper_session.h index b11f150871..43aca88053 100644 --- a/src/dist/replication/zookeeper/zookeeper_session.h +++ b/src/dist/replication/zookeeper/zookeeper_session.h @@ -33,7 +33,6 @@ */ #include -#include #include #include