Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid storing logs in Keeper containing unknown operation #50751

Merged
merged 4 commits into from Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/NuRaft
Submodule NuRaft updated 67 files
+3 −2 .gitignore
+3 −0 .gitmodules
+0 −25 .travis.yml
+60 −31 CMakeLists.txt
+20 −9 README.md
+1 −0 asio
+7 −7 docs/async_replication.md
+10 −10 docs/basic_operations.md
+28 −0 docs/custom_commit_policy.md
+43 −0 docs/custom_metadata.md
+3 −3 docs/custom_quorum_size.md
+32 −0 docs/custom_resolver.md
+6 −6 docs/dealing_with_buffer.md
+4 −4 docs/enabling_ssl.md
+6 −1 docs/how_to_use.md
+11 −11 docs/leader_election_priority.md
+7 −7 docs/leadership_expiration.md
+9 −0 docs/log_timestamp.md
+40 −0 docs/parallel_log_appending.md
+5 −5 docs/prevote_protocol.md
+6 −6 docs/readonly_member.md
+7 −7 docs/replication_flow.md
+7 −7 docs/snapshot_transmission.md
+4 −4 examples/backtrace.h
+4 −2 examples/calculator/calc_server.cxx
+1 −1 examples/in_memory_log_store.hxx
+3 −3 examples/logger.cc
+7 −0 include/libnuraft/asio_service_options.hxx
+14 −0 include/libnuraft/buffer.hxx
+7 −3 include/libnuraft/callback.hxx
+3 −1 include/libnuraft/event_awaiter.hxx
+2 −2 include/libnuraft/log_val_type.hxx
+4 −0 include/libnuraft/logger.hxx
+2 −3 include/libnuraft/msg_type.hxx
+30 −2 include/libnuraft/raft_server.hxx
+1 −0 include/libnuraft/resp_msg.hxx
+3 −0 include/libnuraft/rpc_exception.hxx
+3 −0 include/libnuraft/rpc_listener.hxx
+1 −1 include/libnuraft/snapshot_sync_ctx.hxx
+3 −1 include/libnuraft/srv_role.hxx
+2 −0 include/libnuraft/state_mgr.hxx
+4 −0 include/libnuraft/strfmt.hxx
+1 −1 manifest.sh
+1 −1 prepare.sh
+56 −37 src/asio_service.cxx
+19 −1 src/buffer.cxx
+1 −1 src/global_mgr.cxx
+80 −59 src/handle_append_entries.cxx
+16 −14 src/handle_client_request.cxx
+1 −1 src/handle_client_request.hxx
+54 −56 src/handle_commit.cxx
+2 −2 src/handle_custom_notification.cxx
+20 −12 src/handle_join_leave.cxx
+2 −2 src/handle_priority.cxx
+47 −30 src/handle_snapshot_sync.cxx
+8 −8 src/handle_timeout.cxx
+1 −1 src/handle_user_cmd.cxx
+39 −28 src/handle_vote.cxx
+5 −4 src/peer.cxx
+61 −52 src/raft_server.cxx
+4 −3 src/snapshot_sync_ctx.cxx
+10 −0 src/tracer.hxx
+14 −14 tests/unit/asio_service_test.cxx
+14 −0 tests/unit/buffer_test.cxx
+2 −2 tests/unit/failure_test.cxx
+4 −4 tests/unit/raft_package_fake.hxx
+10 −3 tests/unit/raft_server_test.cxx
3 changes: 2 additions & 1 deletion src/Coordination/Changelog.cpp
Expand Up @@ -13,6 +13,7 @@
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromFile.h>
#include <base/errnoToString.h>
#include <libnuraft/log_val_type.hxx>


namespace DB
Expand Down Expand Up @@ -479,7 +480,7 @@ class ChangelogReader
continue;

/// Create log entry for read data
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, record.header.value_type);
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, static_cast<nuraft::log_val_type>(record.header.value_type));
if (result.first_read_index == 0)
result.first_read_index = record.header.index;

Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/Changelog.h
Expand Up @@ -39,7 +39,7 @@ struct ChangelogRecordHeader
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
uint64_t index = 0; /// entry log number
uint64_t term = 0;
nuraft::log_val_type value_type{};
int32_t value_type{};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t blob_size = 0;
};

Expand Down
39 changes: 33 additions & 6 deletions src/Coordination/KeeperServer.cpp
Expand Up @@ -21,6 +21,7 @@
#include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
Expand Down Expand Up @@ -607,12 +608,30 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
}

const auto follower_preappend = [&](const auto & entry)
{
if (entry->get_val_type() != nuraft::app_log)
return nuraft::cb_func::ReturnCode::Ok;

try
{
state_machine->parseRequest(entry->get_buf(), /*final=*/false);
}
catch (...)
{
tryLogCurrentException(log, "Failed to parse request from log entry");
throw;
}
return nuraft::cb_func::ReturnCode::Ok;

};

if (initialized_flag)
{
switch (type)
{
// This event is called before a single log is appended to the entry on the leader node
case nuraft::cb_func::PreAppendLog:
case nuraft::cb_func::PreAppendLogLeader:
{
// we are relying on the fact that request are being processed under a mutex
// and not a RW lock
Expand Down Expand Up @@ -665,7 +684,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (request_for_session->digest->version != KeeperStorage::NO_DIGEST)
writeIntBinary(request_for_session->digest->value, write_buf);

break;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:
{
const auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
return follower_preappend(entry);
}
case nuraft::cb_func::AppendLogFailed:
{
Expand All @@ -678,13 +702,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf, true);
state_machine->rollbackRequest(*request_for_session, true);
break;
return nuraft::cb_func::ReturnCode::Ok;
}
default:
break;
return nuraft::cb_func::ReturnCode::Ok;
}

return nuraft::cb_func::ReturnCode::Ok;
}

size_t last_commited = state_machine->last_commit_index();
Expand Down Expand Up @@ -737,6 +759,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:
{
const auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
return follower_preappend(entry);
}
default: /// ignore other events
return nuraft::cb_func::ReturnCode::Ok;
}
Expand Down
5 changes: 2 additions & 3 deletions src/Coordination/KeeperStateMachine.cpp
Expand Up @@ -272,9 +272,8 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
rollbackRequestNoLock(request_for_session, true);
throw;
tryLogCurrentException(__PRETTY_FUNCTION__, "Failed to preprocess stored log, aborting to avoid inconsistent state");
std::abort();
}

if (keeper_context->digest_enabled && request_for_session.digest)
Expand Down