diff --git a/contrib/NuRaft b/contrib/NuRaft index b56784be1aec..491eaf592d95 160000 --- a/contrib/NuRaft +++ b/contrib/NuRaft @@ -1 +1 @@ -Subproject commit b56784be1aec568fb72aff47f281097c017623cb +Subproject commit 491eaf592d950e0e37accbe8b3f217e068c9fecf diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 894fd93cfa75..c0dfbc2cbc38 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -479,7 +480,7 @@ class ChangelogReader continue; /// Create log entry for read data - auto log_entry = nuraft::cs_new(record.header.term, record.blob, record.header.value_type); + auto log_entry = nuraft::cs_new(record.header.term, record.blob, static_cast(record.header.value_type)); if (result.first_read_index == 0) result.first_read_index = record.header.index; diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index 56b0475ba8bf..3c09370182d7 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -39,7 +39,7 @@ struct ChangelogRecordHeader ChangelogVersion version = CURRENT_CHANGELOG_VERSION; uint64_t index = 0; /// entry log number uint64_t term = 0; - nuraft::log_val_type value_type{}; + int32_t value_type{}; uint64_t blob_size = 0; }; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 45db9e85fa57..6e47412cd3a9 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -607,12 +608,30 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ } } + const auto follower_preappend = [&](const auto & entry) + { + if (entry->get_val_type() != nuraft::app_log) + return nuraft::cb_func::ReturnCode::Ok; + + try + { + state_machine->parseRequest(entry->get_buf(), /*final=*/false); + } + catch (...) + { + tryLogCurrentException(log, "Failed to parse request from log entry"); + throw; + } + return nuraft::cb_func::ReturnCode::Ok; + + }; + if (initialized_flag) { switch (type) { // This event is called before a single log is appended to the entry on the leader node - case nuraft::cb_func::PreAppendLog: + case nuraft::cb_func::PreAppendLogLeader: { // we are relying on the fact that request are being processed under a mutex // and not a RW lock @@ -665,7 +684,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ if (request_for_session->digest->version != KeeperStorage::NO_DIGEST) writeIntBinary(request_for_session->digest->value, write_buf); - break; + return nuraft::cb_func::ReturnCode::Ok; + } + case nuraft::cb_func::PreAppendLogFollower: + { + const auto & entry = *static_cast(param->ctx); + return follower_preappend(entry); } case nuraft::cb_func::AppendLogFailed: { @@ -678,13 +702,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ auto & entry_buf = entry->get_buf(); auto request_for_session = state_machine->parseRequest(entry_buf, true); state_machine->rollbackRequest(*request_for_session, true); - break; + return nuraft::cb_func::ReturnCode::Ok; } default: - break; + return nuraft::cb_func::ReturnCode::Ok; } - - return nuraft::cb_func::ReturnCode::Ok; } size_t last_commited = state_machine->last_commit_index(); @@ -737,6 +759,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ initial_batch_committed = true; return nuraft::cb_func::ReturnCode::Ok; } + case nuraft::cb_func::PreAppendLogFollower: + { + const auto & entry = *static_cast(param->ctx); + return follower_preappend(entry); + } default: /// ignore other events return nuraft::cb_func::ReturnCode::Ok; } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 65abee440504..7d251ad48b95 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -272,9 +272,8 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req } catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__); - rollbackRequestNoLock(request_for_session, true); - throw; + tryLogCurrentException(__PRETTY_FUNCTION__, "Failed to preprocess stored log, aborting to avoid inconsistent state"); + std::abort(); } if (keeper_context->digest_enabled && request_for_session.digest)