Skip to content

Commit

Permalink
Backport #50751 to 23.3: Avoid storing logs in Keeper containing unkn…
Browse files Browse the repository at this point in the history
…own operation
  • Loading branch information
robot-clickhouse committed Jun 12, 2023
1 parent 8d9d3c9 commit 3cb6c40
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 12 deletions.
2 changes: 1 addition & 1 deletion contrib/NuRaft
Submodule NuRaft updated 67 files
+3 −2 .gitignore
+3 −0 .gitmodules
+0 −25 .travis.yml
+60 −31 CMakeLists.txt
+20 −9 README.md
+1 −0 asio
+7 −7 docs/async_replication.md
+10 −10 docs/basic_operations.md
+28 −0 docs/custom_commit_policy.md
+43 −0 docs/custom_metadata.md
+3 −3 docs/custom_quorum_size.md
+32 −0 docs/custom_resolver.md
+6 −6 docs/dealing_with_buffer.md
+4 −4 docs/enabling_ssl.md
+6 −1 docs/how_to_use.md
+11 −11 docs/leader_election_priority.md
+7 −7 docs/leadership_expiration.md
+9 −0 docs/log_timestamp.md
+40 −0 docs/parallel_log_appending.md
+5 −5 docs/prevote_protocol.md
+6 −6 docs/readonly_member.md
+7 −7 docs/replication_flow.md
+7 −7 docs/snapshot_transmission.md
+4 −4 examples/backtrace.h
+4 −2 examples/calculator/calc_server.cxx
+1 −1 examples/in_memory_log_store.hxx
+3 −3 examples/logger.cc
+7 −0 include/libnuraft/asio_service_options.hxx
+14 −0 include/libnuraft/buffer.hxx
+7 −3 include/libnuraft/callback.hxx
+3 −1 include/libnuraft/event_awaiter.hxx
+2 −2 include/libnuraft/log_val_type.hxx
+4 −0 include/libnuraft/logger.hxx
+2 −3 include/libnuraft/msg_type.hxx
+30 −2 include/libnuraft/raft_server.hxx
+1 −0 include/libnuraft/resp_msg.hxx
+3 −0 include/libnuraft/rpc_exception.hxx
+3 −0 include/libnuraft/rpc_listener.hxx
+1 −1 include/libnuraft/snapshot_sync_ctx.hxx
+3 −1 include/libnuraft/srv_role.hxx
+2 −0 include/libnuraft/state_mgr.hxx
+4 −0 include/libnuraft/strfmt.hxx
+1 −1 manifest.sh
+1 −1 prepare.sh
+56 −37 src/asio_service.cxx
+19 −1 src/buffer.cxx
+1 −1 src/global_mgr.cxx
+80 −59 src/handle_append_entries.cxx
+16 −14 src/handle_client_request.cxx
+1 −1 src/handle_client_request.hxx
+54 −56 src/handle_commit.cxx
+2 −2 src/handle_custom_notification.cxx
+20 −12 src/handle_join_leave.cxx
+2 −2 src/handle_priority.cxx
+47 −30 src/handle_snapshot_sync.cxx
+8 −8 src/handle_timeout.cxx
+1 −1 src/handle_user_cmd.cxx
+39 −28 src/handle_vote.cxx
+5 −4 src/peer.cxx
+61 −52 src/raft_server.cxx
+4 −3 src/snapshot_sync_ctx.cxx
+10 −0 src/tracer.hxx
+14 −14 tests/unit/asio_service_test.cxx
+14 −0 tests/unit/buffer_test.cxx
+2 −2 tests/unit/failure_test.cxx
+4 −4 tests/unit/raft_package_fake.hxx
+10 −3 tests/unit/raft_server_test.cxx
3 changes: 2 additions & 1 deletion src/Coordination/Changelog.cpp
Expand Up @@ -13,6 +13,7 @@
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromFile.h>
#include <base/errnoToString.h>
#include <libnuraft/log_val_type.hxx>


namespace DB
Expand Down Expand Up @@ -469,7 +470,7 @@ class ChangelogReader
continue;

/// Create log entry for read data
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, record.header.value_type);
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, static_cast<nuraft::log_val_type>(record.header.value_type));
if (result.first_read_index == 0)
result.first_read_index = record.header.index;

Expand Down
2 changes: 1 addition & 1 deletion src/Coordination/Changelog.h
Expand Up @@ -38,7 +38,7 @@ struct ChangelogRecordHeader
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
uint64_t index = 0; /// entry log number
uint64_t term = 0;
nuraft::log_val_type value_type{};
int32_t value_type{};
uint64_t blob_size = 0;
};

Expand Down
39 changes: 33 additions & 6 deletions src/Coordination/KeeperServer.cpp
Expand Up @@ -21,6 +21,7 @@
#include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <Common/Exception.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
Expand Down Expand Up @@ -621,12 +622,30 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
}

const auto follower_preappend = [&](const auto & entry)
{
if (entry->get_val_type() != nuraft::app_log)
return nuraft::cb_func::ReturnCode::Ok;

try
{
state_machine->parseRequest(entry->get_buf(), /*final=*/false);
}
catch (...)
{
tryLogCurrentException(log, "Failed to parse request from log entry");
throw;
}
return nuraft::cb_func::ReturnCode::Ok;

};

if (initialized_flag)
{
switch (type)
{
// This event is called before a single log is appended to the entry on the leader node
case nuraft::cb_func::PreAppendLog:
case nuraft::cb_func::PreAppendLogLeader:
{
// we are relying on the fact that request are being processed under a mutex
// and not a RW lock
Expand All @@ -643,7 +662,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ

request_for_session.digest = state_machine->getNodesDigest();
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type());
break;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:
{
const auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
return follower_preappend(entry);
}
case nuraft::cb_func::AppendLogFailed:
{
Expand All @@ -656,13 +680,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf);
state_machine->rollbackRequest(request_for_session, true);
break;
return nuraft::cb_func::ReturnCode::Ok;
}
default:
break;
return nuraft::cb_func::ReturnCode::Ok;
}

return nuraft::cb_func::ReturnCode::Ok;
}

size_t last_commited = state_machine->last_commit_index();
Expand Down Expand Up @@ -715,6 +737,11 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::PreAppendLogFollower:
{
const auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
return follower_preappend(entry);
}
default: /// ignore other events
return nuraft::cb_func::ReturnCode::Ok;
}
Expand Down
5 changes: 2 additions & 3 deletions src/Coordination/KeeperStateMachine.cpp
Expand Up @@ -218,9 +218,8 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
rollbackRequestNoLock(request_for_session, true);
throw;
tryLogCurrentException(__PRETTY_FUNCTION__, "Failed to preprocess stored log, aborting to avoid inconsistent state");
std::abort();
}

if (keeper_context->digest_enabled && request_for_session.digest)
Expand Down

0 comments on commit 3cb6c40

Please sign in to comment.