Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distinguish between normal node startup and snapshot loading #319

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ ExternalProject_Add(
extern_braft
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS brpc
GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
GIT_TAG master
# The pr on braft is not merged, so I am using my own warehouse to run the test for the time being
GIT_REPOSITORY https://github.com/panlei-coder/braft.git
GIT_TAG merge-master-playback
# GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
# GIT_TAG master
GIT_SHALLOW true
PREFIX ${BRAFT_SOURCES_DIR}
UPDATE_COMMAND ""
Expand Down
4 changes: 2 additions & 2 deletions cmake/openssl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# of patent rights can be found in the PATENTS file in the same directory.

INCLUDE(ExternalProject)

SET(OPENSSL_SOURCE_DIR ${THIRD_PARTY_PATH}/openssl)
SET(OPENSSL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/openssl)
SET(OPENSSL_INCLUDE_DIR ${OPENSSL_INSTALL_DIR}/include)
Expand Down Expand Up @@ -49,4 +49,4 @@ ADD_DEPENDENCIES(crypto OpenSSL)
SET(OPENSSL_CRYPTO_LIBRARY ${OPENSSL_INSTALL_DIR}/${OPENSSL_LIB}/libcrypto.a)

SET(OPENSSL_INCLUDE_DIR ${THIRD_PARTY_PATH}/install/openssl/include)
SET(OPENSSL_ROOT_DIR ${THIRD_PARTY_PATH}/install/openssl/bin/openssl)
SET(OPENSSL_ROOT_DIR ${THIRD_PARTY_PATH}/install/openssl/bin/openssl)
2 changes: 1 addition & 1 deletion pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ slowlog-max-len 128
# to the same DB is distributed among several RocksDB instances.

# RocksDB instances number per DB
db-instance-num 3
db-instance-num 1
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
# default is 86400 * 7
small-compaction-threshold 604800
# default is 86400 * 3
Expand Down
6 changes: 3 additions & 3 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The save_load.sh script contains glob patterns (rm -fr *) that could potentially be affected by filenames with dashes. To prevent filenames with dashes from being interpreted as options, consider using ./* instead of *.

  • Line 2: rm -fr * should be updated to rm -fr ./ *
  • Line 7: rm -fr * should be updated to rm -fr ./ *
Analysis chain

Consider using ./ or -- with glob patterns to prevent filenames with dashes from being interpreted as options.

Also applies to: 7-7

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if filenames with dashes are being interpreted as options.

# Test: Check if filenames with dashes exist and could potentially be interpreted as options.
find . -name "*-*" -exec echo "Filename with dash found: {}" \;

Length of output: 3925



Script:

#!/bin/bash
# Description: Check for glob patterns in save_load.sh that could be affected by filenames with dashes.

# Test: Search for glob patterns in save_load.sh
grep -E '\*|\?' save_load.sh

Length of output: 217

killall -9 pikiwidb
mkdir leader follower1

Expand All @@ -10,9 +11,8 @@ redis-cli -p 7777 raft.cluster init

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot

sleep 10

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777
3 changes: 1 addition & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb
PRIVATE ${BRPC_INCLUDE_DIR}
)


TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}")
TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}")

SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX)
4 changes: 3 additions & 1 deletion src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
// @todo need to get self_snapshot_index
// auto self_snapshot_index = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetSmallestFlushedLogIndex();
auto s = PRAFT.DoSnapshot(); // self_snapshot_index
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation for self_snapshot_index or clarify the TODO.

The commented-out code suggests that self_snapshot_index is intended to be used but is not implemented. Would you like assistance in completing this part of the code?

if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
Expand Down
6 changes: 4 additions & 2 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ rocksdb::Status DB::Open() {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
storage_options.do_snapshot_function =
std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};
Comment on lines +40 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:在 Lambda 表达式中使用值捕获。

使用引用捕获 raft 可能导致潜在的生命周期问题,尤其是在 raft 对象的生命周期可能比 storage_options 短的场景中。建议改为值捕获或确保 raft 的生命周期总是足够长。

- storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
+ storage_options.do_snapshot_function = [raft = pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
  raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
                   std::forward<decltype(is_sync)>(is_sync));
};
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};
storage_options.do_snapshot_function = [raft = pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};

}

storage_options.db_instance_num = g_config.db_instance_num.load();
Expand Down
64 changes: 59 additions & 5 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <cassert>

#include "braft/raft.h"
Copy link

@coderabbitai coderabbitai bot Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The verification process did not find any direct or indirect usage of entities from braft/raft.h in src/praft/praft.cc. The references to raft found were in comments and error messages, which do not necessitate the inclusion of the header file.

  • Lines: 12-12
  • File: src/praft/praft.cc
Analysis chain

Ensure the newly added #include "braft/raft.h" is necessary.


Let's further investigate if there are any indirect usages or dependencies that might require the inclusion of braft/raft.h.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if the newly added include is used in the file.

# Test: Search for usage of entities from `braft/raft.h`. Expect: At least one usage.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::raft::$$$ }' src/praft/praft.cc

Length of output: 80



Script:

#!/bin/bash
# Description: Verify if there are any indirect usages or dependencies that might require the inclusion of `braft/raft.h`.

# Test 1: Search for any usage of `braft` namespace in the file.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::$$$ }' src/praft/praft.cc

# Test 2: Search for any usage of `raft` keyword in the file.
rg --word-regexp 'raft' src/praft/praft.cc

Length of output: 343

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

兔子 留言说这个头文件没用到,却引入了,请确认

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlexStocks 经过验证,#include "braft/raft.h" 头文件在 src/praft/praft.cc 中没有被直接使用。仅在注释和错误消息中出现了 raft 关键字。因此,可以安全地移除这个头文件。

Analysis chain
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify if the newly added include is used in the file.

# Test: Search for usage of entities from `braft/raft.h`. Expect: At least one usage.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::raft::$$$ }' src/praft/praft.cc

# Test 1: Search for any usage of `braft` namespace in the file.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::$$$ }' src/praft/praft.cc

# Test 2: Search for any usage of `raft` keyword in the file.
rg --word-regexp 'raft' src/praft/praft.cc

Length of output: 424

#include "braft/snapshot.h"
#include "braft/util.h"
#include "brpc/server.h"
Expand Down Expand Up @@ -150,7 +151,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
node_options_.fsm = this;
node_options_.node_owns_fsm = false;
node_options_.snapshot_interval_s = 0;
std::string prefix = "local://" + g_config.db_path.ToString() + "_praft";
std::string prefix = "local://" + g_config.db_path.ToString() + std::to_string(db_id_) + "/_praft";
node_options_.log_uri = prefix + "/log";
node_options_.raft_meta_uri = prefix + "/raft_meta";
node_options_.snapshot_uri = prefix + "/snapshot";
Expand Down Expand Up @@ -241,6 +242,24 @@ butil::Status PRaft::GetListPeers(std::vector<braft::PeerId>* peers) {
return node_->list_peers(peers);
}

uint64_t PRaft::GetTerm(uint64_t log_index) {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
if (!node_) {
ERROR("Node is not initialized");
return 0;
}

return node_->get_term(log_index);
}

uint64_t PRaft::GetLastLogIndex(bool is_flush) {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
if (!node_) {
ERROR("Node is not initialized");
return 0;
}

return node_->get_last_log_index(is_flush);
}

void PRaft::SendNodeRequest(PClient* client) {
assert(client);

Expand Down Expand Up @@ -521,10 +540,17 @@ butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) {
if (!node_) {
return ERROR_LOG_AND_STATUS("Node is not initialized");
}
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();

if (is_sync) {
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();
} else {
node_->snapshot(nullptr, self_snapshot_index);
butil::Status status;
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
return status;
}
}

void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) {
Expand Down Expand Up @@ -629,10 +655,38 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done
int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
CHECK(!IsLeader()) << "Leader is not supposed to load snapshot";
assert(reader);

if (is_node_first_start_up_) {
// get replay point of one db's
/*
1. When a node starts normally, because all memory data is flushed to disks and
snapshots are truncated to the latest, the flush-index and apply-index are the
same when the node starts, so the maximum log index should be obtained.
2. When a node is improperly shut down and restarted, the minimum flush-index should
be obtained as the starting point for fault recovery.
*/
// @todo GetSmallestFlushedLogIndex
uint64_t replay_point = PSTORE.GetBackend(db_id_)->GetStorage()->GetSmallestFlushedLogIndex();
node_->set_self_playback_point(replay_point);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_->set_self_playback_point(replay_point+1)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 set_self_playback_point 设置的 last_applied_index,所以就是 replay_point

is_node_first_start_up_ = false;
INFO("set replay_point: {}", replay_point);

/*
If a node has just joined the cluster and does not have any data,
it does not load the local snapshot at startup. Therefore,
LoadDBFromCheckPoint is required after loading the snapshot from the leader.
*/
if (GetLastLogIndex() != 0) {
return 0;
}
}

// 3. When a snapshot is installed on a node, you do not need to set a playback point.
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto path = g_config.db_path.ToString() + std::to_string(db_id_); // db/db_id
TasksVector tasks(1, {TaskType::kLoadDBFromCheckpoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true});
PSTORE.HandleTaskSpecificDB(tasks);
INFO("load snapshot success!");
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class PRaft : public braft::StateMachine {
std::string GetGroupID() const;
braft::NodeStatus GetNodeStatus() const;
butil::Status GetListPeers(std::vector<braft::PeerId>* peers);
uint64_t GetTerm(uint64_t log_index);
uint64_t GetLastLogIndex(bool is_flush = false);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如上所示,把 uint64_t 改为 LogIndex

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; }

Expand Down Expand Up @@ -164,6 +166,8 @@ class PRaft : public braft::StateMachine {
ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command
std::string group_id_; // group id
int db_id_ = 0; // db_id

bool is_node_first_start_up_ = true;
};

} // namespace pikiwidb
27 changes: 24 additions & 3 deletions src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,44 @@
#include "psnapshot.h"

#include "braft/local_file_meta.pb.h"
#include "braft/snapshot.h"
#include "butil/files/file_path.h"

#include "pstd/log.h"
#include "pstd/pstd_string.h"

#include "config.h"
#include "praft.h"
Comment on lines +14 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additions and changes to snapshot path parsing are necessary for the new functionality. Consider adding unit tests to cover potential edge cases in the parsing logic.

Would you like me to help in creating unit tests for the snapshot path parsing logic?

Also applies to: 31-51

#include "store.h"

namespace pikiwidb {

extern PConfig g_config;

braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int oflag,
const ::google::protobuf::Message* file_meta, butil::File::Error* e) {
if ((oflag & IS_RDONLY) == 0) { // This is a read operation
bool snapshots_exists = false;
std::string snapshot_path;
int db_id = -1;

// parse snapshot path
butil::FilePath parse_snapshot_path(path);
std::vector<std::string> components;
bool is_find_db = false;
parse_snapshot_path.GetComponents(&components);
for (auto component : components) {
for (const auto& component : components) {
snapshot_path += component + "/";

if (is_find_db && pstd::String2int(component, &db_id)) {
is_find_db = false;
}

if (component.find("snapshot_") != std::string::npos) {
break;
} else if (component == "db") {
is_find_db = true;
}
}

Comment on lines +31 to +51
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:为快照路径解析逻辑添加单元测试

快照路径解析逻辑对于新功能是必要的。建议添加单元测试以覆盖潜在的边界情况。

需要我帮助创建快照路径解析逻辑的单元测试吗?

// check whether snapshots have been created
std::lock_guard<braft::raft_mutex_t> guard(mutex_);
if (!snapshot_path.empty()) {
Expand All @@ -55,6 +66,8 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o

// Snapshot generation
if (!snapshots_exists) {
assert(db_id >= 0);

braft::LocalSnapshotMetaTable snapshot_meta_memtable;
std::string meta_path = snapshot_path + "/" PRAFT_SNAPSHOT_META_FILE;
INFO("start to generate snapshot in path {}", snapshot_path);
Expand All @@ -66,6 +79,14 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o
PSTORE.HandleTaskSpecificDB(tasks);
AddAllFiles(snapshot_path, &snapshot_meta_memtable, snapshot_path);

// update snapshot last log index and last_log_term
auto& new_meta = const_cast<braft::SnapshotMeta&>(snapshot_meta_memtable.meta());
auto last_log_index = 30000; // @todo PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行是不是忘换了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯 忘记换了

new_meta.set_last_included_index(last_log_index);
auto last_log_term = PRAFT.GetTerm(last_log_index);
new_meta.set_last_included_term(last_log_term);
INFO("Succeed to fix snapshot meta: {}, {}", last_log_index, last_log_term);

auto rc = snapshot_meta_memtable.save_to_file(fs, meta_path);
if (rc == 0) {
INFO("Succeed to save snapshot in path {}", snapshot_path);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,8 @@ class Storage {
void GetRocksDBInfo(std::string& info);
Status OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx);

LogIndex GetSmallestFlushedLogIndex() const;

private:
std::vector<std::unique_ptr<Redis>> insts_;
std::unique_ptr<SlotIndexer> slot_indexer_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "log_index.h"

#include "pstd/log.h"

#include <algorithm>
#include <cinttypes>
#include <set>
Expand Down Expand Up @@ -167,6 +169,7 @@ void LogIndexAndSequenceCollectorPurger::OnFlushCompleted(rocksdb::DB *db,
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
INFO("do snapshot after flush: {}", smallest_flushed_log_index);
callback_(smallest_flushed_log_index, false);
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class Redis {
void UpdateAppliedLogIndexOfColumnFamily(size_t cf_idx, LogIndex logidx, SequenceNumber seqno) {
log_index_of_all_cfs_.Update(cf_idx, logidx, seqno);
}
LogIndex GetSmallestFlushedLogIndex() const {
return log_index_of_all_cfs_.GetSmallestLogIndex(-1).smallest_flushed_log_index;
}
bool IsRestarting() const { return is_starting_; }
void StartingPhaseEnd() { is_starting_ = false; }

Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2517,4 +2517,13 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {
return s;
}

LogIndex Storage::GetSmallestFlushedLogIndex() const {
LogIndex smallest_flushed_log_index = INT64_MAX;
for (auto& inst : insts_) {
smallest_flushed_log_index = std::min(smallest_flushed_log_index, inst->GetSmallestFlushedLogIndex());
}

return smallest_flushed_log_index;
}

} // namespace storage
Loading