Skip to content

Commit

Permalink
add on_restart callback to listener (#196)
Browse files Browse the repository at this point in the history
* add on_restart callback to listener

* call on_replica_restart only once

* add snapshot overrides for repl listener

* call on_replica_restart once per home_obj instance

---------

Co-authored-by: Ravi Nagarjun Akella <raakella1@$HOSTNAME>
  • Loading branch information
raakella1 and Ravi Nagarjun Akella authored Aug 14, 2024
1 parent 6382f41 commit 9042b8d
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.0.8"
version = "2.0.10"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ target_link_libraries(homestore_test PUBLIC
homeobject_homestore
${COMMON_TEST_DEPS}
)
add_test(NAME HomestoreTest COMMAND homestore_test -csv error --executor immediate)
add_test(NAME HomestoreTest COMMAND homestore_test -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)
set_property(TEST HomestoreTest PROPERTY RUN_SERIAL 1)
39 changes: 23 additions & 16 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,29 +198,14 @@ void HSHomeObject::init_homestore() {
svc_sb.create(sizeof(svc_info_superblk_t));
svc_sb->svc_id_ = _our_id;
svc_sb.write();
on_replica_restart();
} else {
RELEASE_ASSERT(!_our_id.is_nil(), "No SvcId read after HomeStore recovery!");
auto const new_id = app->discover_svcid(_our_id);
RELEASE_ASSERT(new_id == _our_id, "Received new SvcId [{}] AFTER recovery of [{}]?!", to_string(new_id),
to_string(_our_id));
}

// recover PG
HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);

recovery_done_ = true;
LOGI("Initialize and start HomeStore is successfully");

Expand All @@ -232,6 +217,28 @@ void HSHomeObject::init_homestore() {
}
}

void HSHomeObject::on_replica_restart() {
std::call_once(replica_restart_flag_, [this]() {
LOGI("Register PG and shard meta blk handlers");
using namespace homestore;
// recover PG
HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);
});
}

#if 0
void HSHomeObject::init_timer_thread() {
auto ctx = std::make_shared< std::latch >(1);
Expand Down
7 changes: 7 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< BlobIndexTable > index_table;
};
std::unordered_map< std::string, PgIndexTable > index_table_pg_map_;
std::once_flag replica_restart_flag_;

public:
#pragma pack(1)
Expand Down Expand Up @@ -371,6 +372,12 @@ class HSHomeObject : public HomeObjectImpl {
*/
std::optional< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;

/**
* @brief recover PG and shard from the superblock.
*
*/
void on_replica_restart();

/**
* @brief Returns any chunk number for the given pg ID.
*
Expand Down
36 changes: 32 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
namespace homeobject {
void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("applying raft log commit with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
LOGD("applying raft log commit with lsn:{}, msg type: {}", lsn, msg_header->msg_type);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
home_object_->on_create_pg_message_commit(lsn, header, repl_dev(), ctx);
Expand All @@ -32,7 +32,6 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c

bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_pre_commit with lsn:{}", lsn);
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
Expand All @@ -41,6 +40,7 @@ bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& heade
LOGE("corrupted message in pre_commit, lsn:{}", lsn);
return false;
}
LOGD("on_pre_commit with lsn:{}, msg type: {}", lsn, msg_header->msg_type);
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
return home_object_->on_shard_message_pre_commit(lsn, header, key, ctx);
Expand Down Expand Up @@ -71,6 +71,8 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
}
}

void ReplicationStateMachine::on_restart() { home_object_->on_replica_restart(); }

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
RELEASE_ASSERT(ctx, "ctx should not be nullptr in on_error");
Expand Down Expand Up @@ -151,10 +153,36 @@ void ReplicationStateMachine::on_destroy() {
LOGI("replica destroyed");
}

homestore::AsyncReplResult<> ReplicationStateMachine::create_snapshot(homestore::repl_snapshot& s) {
homestore::AsyncReplResult<>
ReplicationStateMachine::create_snapshot(std::shared_ptr< homestore::snapshot_context > context) {
// TODO::add create snapshot logic
LOGI("create snapshot, last_log_idx_: {} , last_log_term_: {}", s.last_log_idx_, s.last_log_term_);
auto ctx = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context);
auto s = ctx->nuraft_snapshot();
LOGI("create snapshot, last_log_idx_: {} , last_log_term_: {}", s->get_last_log_idx(), s->get_last_log_term());
return folly::makeSemiFuture< homestore::ReplResult< folly::Unit > >(folly::Unit{});
}

bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) {
LOGE("apply_snapshot not implemented");
return false;
}

std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_snapshot() {
LOGE("last_snapshot not implemented");
return nullptr;
}

int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
LOGE("read_snapshot_data not implemented");
return -1;
}

void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
LOGE("write_snapshot_data not implemented");
}

void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { LOGE("free_user_snp_ctx not implemented"); }

} // namespace homeobject
15 changes: 14 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
void on_rollback(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
cintrusive< homestore::repl_req_ctx >& ctx) override;

/// @brief Called when the raft service is created after restart.
///
/// homeobject should recover all the necessary components to serve log replay/commit requests.
void on_restart() override;

/// @brief Called when the async_alloc_write call failed to initiate replication
///
/// Called only on the node which called async_alloc_write
Expand Down Expand Up @@ -166,8 +171,16 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
/// @brief Called when the replica is being destroyed by nuraft;
void on_destroy() override;

/// Not Implemented
/// @brief Called when the snapshot is being created by nuraft;
homestore::AsyncReplResult<> create_snapshot(homestore::repl_snapshot& s) override;
homestore::AsyncReplResult<> create_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
virtual int read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual void write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual void free_user_snp_ctx(void*& user_snp_ctx) override;

private:
HSHomeObject* home_object_{nullptr};
Expand Down
6 changes: 4 additions & 2 deletions src/lib/tests/fixture_app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ SISL_OPTION_GROUP(

SISL_LOGGING_INIT(HOMEOBJECT_LOG_MODS)

SISL_OPTIONS_ENABLE(logging, homeobject, test_home_object)
#define test_options logging, homeobject, config, test_home_object

SISL_OPTIONS_ENABLE(test_options)

FixtureApp::FixtureApp(bool is_hybrid) : is_hybrid_(is_hybrid) {
clean();
Expand Down Expand Up @@ -88,7 +90,7 @@ void TestFixture::TearDown() { std::dynamic_pointer_cast< FixtureApp >(app)->cle
int main(int argc, char* argv[]) {
int parsed_argc = argc;
::testing::InitGoogleTest(&parsed_argc, argv);
SISL_OPTIONS_LOAD(parsed_argc, argv, logging, homeobject, test_home_object);
SISL_OPTIONS_LOAD(parsed_argc, argv, test_options);
sisl::logging::SetLogger(std::string(argv[0]));
sisl::logging::SetLogPattern("[%D %T%z] [%^%L%$] [%t] %v");
parsed_argc = 1;
Expand Down

0 comments on commit 9042b8d

Please sign in to comment.