Skip to content

Commit

Permalink
issue=#1271 Add mock zk adapter for test (#1272)
Browse files Browse the repository at this point in the history
* issue=#1271 Add mock zk adapter for test
  • Loading branch information
Yang Ce authored and ajie committed Jun 9, 2017
1 parent bb9be4f commit 88efa12
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 49 deletions.
8 changes: 8 additions & 0 deletions src/master/master_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DECLARE_string(tera_master_meta_table_path);
DECLARE_int32(tera_master_meta_retry_times);

DECLARE_bool(tera_zk_enabled);
DECLARE_bool(tera_mock_zk_enabled);

DECLARE_double(tera_master_workload_split_threshold);
DECLARE_int64(tera_master_split_tablet_size);
Expand Down Expand Up @@ -91,6 +92,7 @@ DECLARE_string(tera_zk_root_path);
DECLARE_string(tera_zk_addr_list);
DECLARE_string(tera_local_addr);
DECLARE_bool(tera_ins_enabled);
DECLARE_bool(tera_mock_ins_enabled);

DECLARE_int64(tera_sdk_perf_counter_log_interval);

Expand Down Expand Up @@ -171,6 +173,12 @@ bool MasterImpl::Init() {
} else if (FLAGS_tera_ins_enabled) {
LOG(INFO) << "ins mode" ;
zk_adapter_.reset(new InsMasterZkAdapter(this, local_addr_));
} else if (FLAGS_tera_mock_zk_enabled) {
LOG(INFO) << "mock zk mode" ;
zk_adapter_.reset(new MockMasterZkAdapter(this, local_addr_));
} else if (FLAGS_tera_mock_ins_enabled) {
LOG(INFO) << "mock ins mode" ;
zk_adapter_.reset(new MockInsMasterZkAdapter(this, local_addr_));
} else {
LOG(INFO) << "fake zk mode!";
zk_adapter_.reset(new FakeMasterZkAdapter(this, local_addr_));
Expand Down
58 changes: 36 additions & 22 deletions src/master/master_zk_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,28 @@ class MasterZkAdapter : public MasterZkAdapterBase {
virtual bool UpdateRootTabletNode(const std::string& root_tablet_addr);

protected:
bool Setup();
void Reset();

bool LockMasterLock();
bool UnlockMasterLock();
bool CreateMasterNode();
bool DeleteMasterNode();

bool WatchRootTabletNode(bool* is_exist, std::string* root_tablet_addr);
bool WatchSafeModeMark(bool* is_safemode);
bool WatchTabletNodeList(std::map<std::string, std::string>* tabletnode_list);

void OnSafeModeMarkCreated();
void OnSafeModeMarkDeleted();
void OnMasterLockLost();
void OnTabletNodeListDeleted();
void OnRootTabletNodeDeleted();
void OnMasterNodeDeleted();
void OnTabletServerKickMarkCreated();
void OnTabletServerKickMarkDeleted();
void OnTabletServerStart(const std::string& ts_host);
void OnTabletServerExist(const std::string& ts_host);
virtual bool Setup();
virtual void Reset();

virtual bool LockMasterLock();
virtual bool UnlockMasterLock();
virtual bool CreateMasterNode();
virtual bool DeleteMasterNode();

virtual bool WatchRootTabletNode(bool* is_exist, std::string* root_tablet_addr);
virtual bool WatchSafeModeMark(bool* is_safemode);
virtual bool WatchTabletNodeList(std::map<std::string, std::string>* tabletnode_list);

virtual void OnSafeModeMarkCreated();
virtual void OnSafeModeMarkDeleted();
virtual void OnMasterLockLost();
virtual void OnTabletNodeListDeleted();
virtual void OnRootTabletNodeDeleted();
virtual void OnMasterNodeDeleted();
virtual void OnTabletServerKickMarkCreated();
virtual void OnTabletServerKickMarkDeleted();
virtual void OnTabletServerStart(const std::string& ts_host);
virtual void OnTabletServerExist(const std::string& ts_host);

virtual void OnChildrenChanged(const std::string& path,
const std::vector<std::string>& name_list,
Expand All @@ -92,6 +92,13 @@ class MasterZkAdapter : public MasterZkAdapterBase {
std::string server_addr_;
};

class MockMasterZkAdapter : public MasterZkAdapter {
public:
MockMasterZkAdapter(MasterImpl* master_impl, const std::string & server_addr) :
MasterZkAdapter(master_impl, server_addr) {}
virtual ~MockMasterZkAdapter() {}
};

/*
* This is not zookeeper!
* Just used on onebox for tasting tera briefly.
Expand Down Expand Up @@ -167,6 +174,13 @@ class InsMasterZkAdapter: public MasterZkAdapterBase {
galaxy::ins::sdk::InsSDK* ins_sdk_;
};

class MockInsMasterZkAdapter : public InsMasterZkAdapter {
public:
MockInsMasterZkAdapter(MasterImpl* master_impl, const std::string& server_addr) :
InsMasterZkAdapter(master_impl, server_addr) {}
virtual ~MockInsMasterZkAdapter() {}
};

} // namespace master
} // namespace tera

Expand Down
14 changes: 10 additions & 4 deletions src/sdk/sdk_zk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
DECLARE_string(tera_zk_lib_log_path);
DECLARE_string(tera_fake_zk_path_prefix);
DECLARE_bool(tera_zk_enabled);
DECLARE_bool(tera_mock_zk_enabled);
DECLARE_string(tera_zk_addr_list);
DECLARE_string(tera_zk_root_path);
DECLARE_bool(tera_ins_enabled);
DECLARE_string(tera_ins_root_path);
DECLARE_string(tera_ins_addr_list);
DECLARE_bool(tera_mock_ins_enabled);

namespace tera {
namespace sdk {
Expand Down Expand Up @@ -121,12 +123,16 @@ bool FakeZkClusterFinder::ReadNode(const std::string& name, std::string* value)
}

ClusterFinder* NewClusterFinder() {
if (FLAGS_tera_ins_enabled) {
if (FLAGS_tera_zk_enabled) {
return new sdk::ZkClusterFinder(FLAGS_tera_zk_root_path, FLAGS_tera_zk_addr_list);
} else if (FLAGS_tera_ins_enabled) {
return new sdk::InsClusterFinder(FLAGS_tera_ins_root_path, FLAGS_tera_ins_addr_list);
} else if (!FLAGS_tera_zk_enabled) {
return new sdk::FakeZkClusterFinder(FLAGS_tera_fake_zk_path_prefix);
} else if (FLAGS_tera_mock_zk_enabled) {
return new sdk::MockZkClusterFinder(FLAGS_tera_zk_root_path, FLAGS_tera_zk_addr_list);
} else if (FLAGS_tera_mock_ins_enabled) {
return new sdk::MockInsClusterFinder(FLAGS_tera_ins_root_path, FLAGS_tera_ins_addr_list);
} else {
return new sdk::ZkClusterFinder(FLAGS_tera_zk_root_path, FLAGS_tera_zk_addr_list);
return new sdk::FakeZkClusterFinder(FLAGS_tera_fake_zk_path_prefix);
}
}

Expand Down
30 changes: 25 additions & 5 deletions src/sdk/sdk_zk.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class ClusterFinder
std::string RootTableAddr(bool update = false);
std::string ClusterId(); // cluster URI: <scheme>://<authority>/<path>

private:
protected:
virtual bool ReadNode(const std::string& path, std::string* value) = 0;
virtual std::string Name() = 0;
virtual std::string Authority() = 0;
virtual std::string Path() = 0;

private:
mutable Mutex mutex_;
std::string master_addr_;
std::string root_table_addr_;
Expand All @@ -35,35 +36,54 @@ class ClusterFinder
class ZkClusterFinder : public ClusterFinder {
public:
ZkClusterFinder(const std::string& zk_root_path, const std::string& zk_addr_list);
private:
protected:
virtual bool ReadNode(const std::string& path, std::string* value);
virtual std::string Name() { return "zk"; };
virtual std::string Authority() { return zk_addr_list_; }
virtual std::string Path() { return zk_root_path_; }
private:
std::string zk_root_path_;
std::string zk_addr_list_;
};

class MockZkClusterFinder : public ZkClusterFinder {
public:
MockZkClusterFinder(const std::string& zk_root_path, const std::string& zk_addr_list) :
ZkClusterFinder(zk_root_path, zk_addr_list) {}
protected:
virtual std::string Name() { return "mock zk"; }
};

class InsClusterFinder : public ClusterFinder {
public:
InsClusterFinder(const std::string& ins_root_path, const std::string& ins_addr_list);
private:
protected:
virtual bool ReadNode(const std::string& path, std::string* value);
virtual std::string Name() { return "ins"; };
virtual std::string Name() { return "ins"; }
virtual std::string Authority() { return ins_addr_list_; }
virtual std::string Path() { return ins_root_path_; }
private:
std::string ins_root_path_;
std::string ins_addr_list_;
};

class MockInsClusterFinder : public InsClusterFinder {
public:
MockInsClusterFinder(const std::string& ins_root_path, const std::string& ins_addr_list) :
InsClusterFinder(ins_root_path, ins_addr_list) {}
protected:
virtual std::string Name() { return "mock ins"; }
};

class FakeZkClusterFinder : public ClusterFinder {
public:
FakeZkClusterFinder(const std::string& fake_zk_path_prefix);
private:
protected:
virtual bool ReadNode(const std::string& path, std::string* value);
virtual std::string Name() { return "fakezk"; };
virtual std::string Authority() { return "localhost"; }
virtual std::string Path() { return fake_zk_path_prefix_; }
private:
std::string fake_zk_path_prefix_;
};

Expand Down
8 changes: 8 additions & 0 deletions src/tabletnode/tabletnode_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DECLARE_int32(tera_tabletnode_impl_thread_min_num);
DECLARE_int32(tera_tabletnode_impl_thread_max_num);

DECLARE_bool(tera_zk_enabled);
DECLARE_bool(tera_mock_zk_enabled);

DECLARE_string(tera_master_meta_table_name);
DECLARE_int32(tera_tabletnode_retry_period);
Expand Down Expand Up @@ -82,6 +83,7 @@ DECLARE_int32(tera_tabletnode_gc_log_level);
DECLARE_string(tera_leveldb_env_type);
DECLARE_string(tera_local_addr);
DECLARE_bool(tera_ins_enabled);
DECLARE_bool(tera_mock_ins_enabled);

DECLARE_bool(tera_io_cache_path_vanish_allowed);
DECLARE_int64(tera_tabletnode_tcm_cache_size);
Expand Down Expand Up @@ -160,6 +162,12 @@ bool TabletNodeImpl::Init() {
} else if(FLAGS_tera_ins_enabled) {
LOG(INFO) << "ins mode!";
zk_adapter_.reset(new InsTabletNodeZkAdapter(this, local_addr_));
} else if (FLAGS_tera_mock_zk_enabled) {
LOG(INFO) << "mock zk mode!";
zk_adapter_.reset(new MockTabletNodeZkAdapter(this, local_addr_));
} else if (FLAGS_tera_mock_ins_enabled) {
LOG(INFO) << "mock ins mode!";
zk_adapter_.reset(new MockInsTabletNodeZkAdapter(this, local_addr_));
} else {
LOG(INFO) << "fake zk mode!";
zk_adapter_.reset(new FakeTabletNodeZkAdapter(this, local_addr_));
Expand Down
59 changes: 41 additions & 18 deletions src/tabletnode/tabletnode_zk_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ class TabletNodeZkAdapter : public TabletNodeZkAdapterBase {
virtual bool GetRootTableAddr(std::string* root_table_addr);

private:
bool Register(const std::string& session_id, int* zk_code);
bool Unregister(int* zk_code);

bool WatchMaster(std::string* master, int* zk_code);
bool WatchSafeModeMark(bool* is_exist, int* zk_code);
bool WatchKickMark(bool* is_exist, int* zk_code);
bool WatchSelfNode(bool* is_exist, int* zk_code);
bool WatchRootNode(bool* is_exist, std::string* root_tablet_addr, int* zk_errno);

void OnSafeModeMarkCreated();
void OnSafeModeMarkDeleted();
void OnKickMarkCreated();
void OnSelfNodeDeleted();
void OnRootNodeCreated();
void OnRootNodeDeleted();
void OnRootNodeChanged(const std::string& root_tablet_addr);
virtual bool Register(const std::string& session_id, int* zk_code);
virtual bool Unregister(int* zk_code);

virtual bool WatchMaster(std::string* master, int* zk_code);
virtual bool WatchSafeModeMark(bool* is_exist, int* zk_code);
virtual bool WatchKickMark(bool* is_exist, int* zk_code);
virtual bool WatchSelfNode(bool* is_exist, int* zk_code);
virtual bool WatchRootNode(bool* is_exist, std::string* root_tablet_addr, int* zk_errno);

virtual void OnSafeModeMarkCreated();
virtual void OnSafeModeMarkDeleted();
virtual void OnKickMarkCreated();
virtual void OnSelfNodeDeleted();
virtual void OnRootNodeCreated();
virtual void OnRootNodeDeleted();
virtual void OnRootNodeChanged(const std::string& root_tablet_addr);

virtual void OnChildrenChanged(const std::string& path,
const std::vector<std::string>& name_list,
Expand All @@ -72,6 +72,19 @@ class TabletNodeZkAdapter : public TabletNodeZkAdapterBase {
std::string kick_node_path_;
};

class MockTabletNodeZkAdapter : public TabletNodeZkAdapter {
public:
MockTabletNodeZkAdapter(TabletNodeImpl* tabletnode_impl,
const std::string & server_addr) :
TabletNodeZkAdapter(tabletnode_impl, server_addr) {}
virtual ~MockTabletNodeZkAdapter() {}
private:
virtual void OnKickMarkCreated() {}
virtual void OnSelfNodeDeleted() {}
virtual void OnWatchFailed(const std::string& /*path*/, int /*watch_type*/, int /*err*/) {}
virtual void OnSessionTimeout() {}
};

class FakeTabletNodeZkAdapter : public TabletNodeZkAdapterBase {
public:
FakeTabletNodeZkAdapter(TabletNodeImpl* tabletnode_impl,
Expand Down Expand Up @@ -109,8 +122,8 @@ class InsTabletNodeZkAdapter : public TabletNodeZkAdapterBase {
virtual ~InsTabletNodeZkAdapter() {}
virtual void Init();
virtual bool GetRootTableAddr(std::string* root_table_addr);
void OnKickMarkCreated();
void OnLockChange(std::string session_id, bool deleted);
virtual void OnKickMarkCreated();
virtual void OnLockChange(std::string session_id, bool deleted);
void OnMetaChange(std::string meta_addr, bool deleted);
private:
virtual void OnChildrenChanged(const std::string& path,
Expand All @@ -132,6 +145,16 @@ class InsTabletNodeZkAdapter : public TabletNodeZkAdapterBase {
galaxy::ins::sdk::InsSDK* ins_sdk_;
};

class MockInsTabletNodeZkAdapter : public InsTabletNodeZkAdapter {
public:
MockInsTabletNodeZkAdapter(TabletNodeImpl* tabletnode_impl,
const std::string& server_addr) :
InsTabletNodeZkAdapter(tabletnode_impl, server_addr) {}
virtual ~MockInsTabletNodeZkAdapter() {}
virtual void OnKickMarkCreated() {}
virtual void OnLockChange(std::string /*session_id*/, bool /*deleted*/) {}
};

} // namespace tabletnode
} // namespace tera

Expand Down
2 changes: 2 additions & 0 deletions src/tera_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ DEFINE_int32(tera_heartbeat_retry_times, 5, "the max retry times when fail to se
DEFINE_string(tera_working_dir, "./", "the base dir for system data");

DEFINE_bool(tera_zk_enabled, true, "enable zk adapter to collaborate with other master instances");
DEFINE_bool(tera_mock_zk_enabled, false, "enable mock zk adapter to collaborate with other master instances");
DEFINE_string(tera_zk_addr_list, "localhost:2180", "zookeeper server list");
DEFINE_string(tera_zk_root_path, "/tera", "zookeeper root path");
DEFINE_string(tera_fake_zk_path_prefix, "../fakezk", "fake zk path prefix in onebox tera");
Expand Down Expand Up @@ -274,6 +275,7 @@ DEFINE_int64(batch_scan_delay_retry_in_us, 1000000, "timewait in us before retry
DEFINE_string(tera_ins_addr_list, "", "the ins cluster addr. e.g. abc.com:1234,abb.com:1234");
DEFINE_string(tera_ins_root_path, "", "root path on ins. e.g /ps/sandbox");
DEFINE_bool(tera_ins_enabled, false, "option to open ins naming");
DEFINE_bool(tera_mock_ins_enabled, false, "option to open mock ins naming");

DEFINE_int64(tera_sdk_status_timeout, 600, "(s) check tablet/tabletnode status timeout");

Expand Down

0 comments on commit 88efa12

Please sign in to comment.