Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distinguish between normal node startup and snapshot loading #319

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
7 changes: 5 additions & 2 deletions cmake/braft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ ExternalProject_Add(
extern_braft
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS brpc
GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
GIT_TAG master
# The pr on braft is not merged, so I am using my own warehouse to run the test for the time being
GIT_REPOSITORY https://github.com/panlei-coder/braft.git
GIT_TAG merge-master-playback
# GIT_REPOSITORY "https://github.com/pikiwidb/braft.git"
# GIT_TAG master
GIT_SHALLOW true
PREFIX ${BRAFT_SOURCES_DIR}
UPDATE_COMMAND ""
Expand Down
4 changes: 2 additions & 2 deletions cmake/openssl.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# of patent rights can be found in the PATENTS file in the same directory.

INCLUDE(ExternalProject)

SET(OPENSSL_SOURCE_DIR ${THIRD_PARTY_PATH}/openssl)
SET(OPENSSL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/openssl)
SET(OPENSSL_INCLUDE_DIR ${OPENSSL_INSTALL_DIR}/include)
Expand Down Expand Up @@ -49,4 +49,4 @@ ADD_DEPENDENCIES(crypto OpenSSL)
SET(OPENSSL_CRYPTO_LIBRARY ${OPENSSL_INSTALL_DIR}/${OPENSSL_LIB}/libcrypto.a)

SET(OPENSSL_INCLUDE_DIR ${THIRD_PARTY_PATH}/install/openssl/include)
SET(OPENSSL_ROOT_DIR ${THIRD_PARTY_PATH}/install/openssl/bin/openssl)
SET(OPENSSL_ROOT_DIR ${THIRD_PARTY_PATH}/install/openssl/bin/openssl)
2 changes: 1 addition & 1 deletion pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ slowlog-max-len 128
# to the same DB is distributed among several RocksDB instances.

# RocksDB instances number per DB
db-instance-num 3
db-instance-num 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里既然设为1了,要不要把上边 databases 数量也调整成默认为1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以

# default is 86400 * 7
small-compaction-threshold 604800
# default is 86400 * 3
Expand Down
6 changes: 3 additions & 3 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The save_load.sh script contains glob patterns (rm -fr *) that could potentially be affected by filenames with dashes. To prevent filenames with dashes from being interpreted as options, consider using ./* instead of *.

  • Line 2: rm -fr * should be updated to rm -fr ./ *
  • Line 7: rm -fr * should be updated to rm -fr ./ *
Analysis chain

Consider using ./ or -- with glob patterns to prevent filenames with dashes from being interpreted as options.

Also applies to: 7-7

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if filenames with dashes are being interpreted as options.

# Test: Check if filenames with dashes exist and could potentially be interpreted as options.
find . -name "*-*" -exec echo "Filename with dash found: {}" \;

Length of output: 3925



Script:

#!/bin/bash
# Description: Check for glob patterns in save_load.sh that could be affected by filenames with dashes.

# Test: Search for glob patterns in save_load.sh
grep -E '\*|\?' save_load.sh

Length of output: 217

killall -9 pikiwidb
mkdir leader follower1

Expand All @@ -10,9 +11,8 @@ redis-cli -p 7777 raft.cluster init

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset
redis-cli -p 7777 raft.node dosnapshot

sleep 10

redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777
3 changes: 1 addition & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ TARGET_INCLUDE_DIRECTORIES(pikiwidb
PRIVATE ${BRPC_INCLUDE_DIR}
)


TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}")
TARGET_LINK_LIBRARIES(pikiwidb net; dl; fmt; storage; rocksdb; pstd braft brpc ssl crypto zlib protobuf leveldb gflags z praft praft_pb "${LIB}")

SET_TARGET_PROPERTIES(pikiwidb PROPERTIES LINKER_LANGUAGE CXX)
4 changes: 3 additions & 1 deletion src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
// @todo need to get self_snapshot_index
// auto self_snapshot_index = PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->GetSmallestFlushedLogIndex();
auto s = PRAFT.DoSnapshot(); // self_snapshot_index
Comment on lines +125 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete the implementation for self_snapshot_index or clarify the TODO.

The commented-out code suggests that self_snapshot_index is intended to be used but is not implemented. Would you like assistance in completing this part of the code?

if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
Expand Down
6 changes: 4 additions & 2 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ rocksdb::Status DB::Open() {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
storage_options.do_snapshot_function =
std::bind(&pikiwidb::PRaft::DoSnapshot, &pikiwidb::PRAFT, std::placeholders::_1, std::placeholders::_2);
storage_options.do_snapshot_function = [raft = &pikiwidb::PRAFT](auto&& self_snapshot_index, auto&& is_sync) {
raft->DoSnapshot(std::forward<decltype(self_snapshot_index)>(self_snapshot_index),
std::forward<decltype(is_sync)>(is_sync));
};
}

storage_options.db_instance_num = g_config.db_instance_num.load();
Expand Down
64 changes: 59 additions & 5 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <cassert>

#include "braft/raft.h"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The verification process did not find any direct or indirect usage of entities from braft/raft.h in src/praft/praft.cc. The references to raft found were in comments and error messages, which do not necessitate the inclusion of the header file.

  • Lines: 12-12
  • File: src/praft/praft.cc
Analysis chain

Ensure the newly added #include "braft/raft.h" is necessary.


Let's further investigate if there are any indirect usages or dependencies that might require the inclusion of braft/raft.h.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify if the newly added include is used in the file.

# Test: Search for usage of entities from `braft/raft.h`. Expect: At least one usage.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::raft::$$$ }' src/praft/praft.cc

Length of output: 80



Script:

#!/bin/bash
# Description: Verify if there are any indirect usages or dependencies that might require the inclusion of `braft/raft.h`.

# Test 1: Search for any usage of `braft` namespace in the file.
ast-grep --lang cpp --pattern $'$_ { $$$ braft::$$$ }' src/praft/praft.cc

# Test 2: Search for any usage of `raft` keyword in the file.
rg --word-regexp 'raft' src/praft/praft.cc

Length of output: 343

#include "braft/snapshot.h"
#include "braft/util.h"
#include "brpc/server.h"
Expand Down Expand Up @@ -150,7 +151,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
node_options_.fsm = this;
node_options_.node_owns_fsm = false;
node_options_.snapshot_interval_s = 0;
std::string prefix = "local://" + g_config.db_path.ToString() + "_praft";
std::string prefix = "local://" + g_config.db_path.ToString() + std::to_string(db_id_) + "/_praft";
node_options_.log_uri = prefix + "/log";
node_options_.raft_meta_uri = prefix + "/raft_meta";
node_options_.snapshot_uri = prefix + "/snapshot";
Expand Down Expand Up @@ -241,6 +242,24 @@ butil::Status PRaft::GetListPeers(std::vector<braft::PeerId>* peers) {
return node_->list_peers(peers);
}

uint64_t PRaft::GetTerm(uint64_t log_index) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
} else {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
return node_->get_term(log_index);
}
}

uint64_t PRaft::GetLastLogIndex(bool is_flush) {
if (!node_) {
ERROR("Node is not initialized");
return 0;
} else {
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
return node_->get_last_log_index(is_flush);
}
}

void PRaft::SendNodeRequest(PClient* client) {
assert(client);

Expand Down Expand Up @@ -521,10 +540,17 @@ butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) {
if (!node_) {
return ERROR_LOG_AND_STATUS("Node is not initialized");
}
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();

if (is_sync) {
braft::SynchronizedClosure done;
node_->snapshot(&done, self_snapshot_index);
done.wait();
return done.status();
} else {
node_->snapshot(nullptr, self_snapshot_index);
butil::Status status;
return status;
}
}

void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) {
Expand Down Expand Up @@ -629,10 +655,38 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done
int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
CHECK(!IsLeader()) << "Leader is not supposed to load snapshot";
assert(reader);

if (is_node_first_start_up_) {
// get replay point of one db's
/*
1. When a node starts normally, because all memory data is flushed to disks and
snapshots are truncated to the latest, the flush-index and apply-index are the
same when the node starts, so the maximum log index should be obtained.
2. When a node is improperly shut down and restarted, the minimum flush-index should
be obtained as the starting point for fault recovery.
*/
// @todo GetSmallestFlushedLogIndex
uint64_t replay_point = PSTORE.GetBackend(db_id_)->GetStorage()->GetSmallestFlushedLogIndex();
node_->set_self_playback_point(replay_point);
is_node_first_start_up_ = false;
INFO("set replay_point: {}", replay_point);

/*
If a node has just joined the cluster and does not have any data,
it does not load the local snapshot at startup. Therefore,
LoadDBFromCheckPoint is required after loading the snapshot from the leader.
*/
if (GetLastLogIndex() != 0) {
return 0;
}
}

// 3. When a snapshot is installed on a node, you do not need to set a playback point.
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto path = g_config.db_path.ToString() + std::to_string(db_id_); // db/db_id
TasksVector tasks(1, {TaskType::kLoadDBFromCheckpoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true});
PSTORE.HandleTaskSpecificDB(tasks);
INFO("load snapshot success!");
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class PRaft : public braft::StateMachine {
std::string GetGroupID() const;
braft::NodeStatus GetNodeStatus() const;
butil::Status GetListPeers(std::vector<braft::PeerId>* peers);
uint64_t GetTerm(uint64_t log_index);
uint64_t GetLastLogIndex(bool is_flush = false);

bool IsInitialized() const { return node_ != nullptr && server_ != nullptr; }

Expand Down Expand Up @@ -164,6 +166,8 @@ class PRaft : public braft::StateMachine {
ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command
std::string group_id_; // group id
int db_id_ = 0; // db_id

bool is_node_first_start_up_ = true;
};

} // namespace pikiwidb
27 changes: 24 additions & 3 deletions src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,44 @@
#include "psnapshot.h"

#include "braft/local_file_meta.pb.h"
#include "braft/snapshot.h"
#include "butil/files/file_path.h"

#include "pstd/log.h"
#include "pstd/pstd_string.h"

#include "config.h"
#include "praft.h"
Comment on lines +14 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additions and changes to snapshot path parsing are necessary for the new functionality. Consider adding unit tests to cover potential edge cases in the parsing logic.

Would you like me to help in creating unit tests for the snapshot path parsing logic?

Also applies to: 31-51

#include "store.h"

namespace pikiwidb {

extern PConfig g_config;

braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int oflag,
const ::google::protobuf::Message* file_meta, butil::File::Error* e) {
if ((oflag & IS_RDONLY) == 0) { // This is a read operation
bool snapshots_exists = false;
std::string snapshot_path;
int db_id = -1;

// parse snapshot path
butil::FilePath parse_snapshot_path(path);
std::vector<std::string> components;
bool is_find_db = false;
parse_snapshot_path.GetComponents(&components);
for (auto component : components) {
for (const auto& component : components) {
snapshot_path += component + "/";

if (is_find_db && pstd::String2int(component, &db_id)) {
is_find_db = false;
}

if (component.find("snapshot_") != std::string::npos) {
break;
} else if (component == "db") {
is_find_db = true;
}
}

// check whether snapshots have been created
std::lock_guard<braft::raft_mutex_t> guard(mutex_);
if (!snapshot_path.empty()) {
Expand All @@ -55,6 +66,8 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o

// Snapshot generation
if (!snapshots_exists) {
assert(db_id >= 0);

braft::LocalSnapshotMetaTable snapshot_meta_memtable;
std::string meta_path = snapshot_path + "/" PRAFT_SNAPSHOT_META_FILE;
INFO("start to generate snapshot in path {}", snapshot_path);
Expand All @@ -66,6 +79,14 @@ braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int o
PSTORE.HandleTaskSpecificDB(tasks);
AddAllFiles(snapshot_path, &snapshot_meta_memtable, snapshot_path);

// update snapshot last log index and last_log_term
auto& new_meta = const_cast<braft::SnapshotMeta&>(snapshot_meta_memtable.meta());
auto last_log_index = 30000; // @todo PSTORE.GetBackend(db_id)->GetStorage()->GetSmallestFlushedLogIndex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行是不是忘换了

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯 忘记换了

new_meta.set_last_included_index(last_log_index);
auto last_log_term = PRAFT.GetTerm(last_log_index);
new_meta.set_last_included_term(last_log_term);
INFO("Succeed to fix snapshot meta: {}, {}", last_log_index, last_log_term);

auto rc = snapshot_meta_memtable.save_to_file(fs, meta_path);
if (rc == 0) {
INFO("Succeed to save snapshot in path {}", snapshot_path);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,8 @@ class Storage {
void GetRocksDBInfo(std::string& info);
Status OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx);

LogIndex GetSmallestFlushedLogIndex() const;

private:
std::vector<std::unique_ptr<Redis>> insts_;
std::unique_ptr<SlotIndexer> slot_indexer_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/log_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "log_index.h"

#include "pstd/log.h"

#include <algorithm>
#include <cinttypes>
#include <set>
Expand Down Expand Up @@ -167,6 +169,7 @@ void LogIndexAndSequenceCollectorPurger::OnFlushCompleted(rocksdb::DB *db,
auto count = count_.fetch_add(1);

if (count % 10 == 0) {
INFO("do snapshot after flush: {}", smallest_flushed_log_index);
callback_(smallest_flushed_log_index, false);
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class Redis {
void UpdateAppliedLogIndexOfColumnFamily(size_t cf_idx, LogIndex logidx, SequenceNumber seqno) {
log_index_of_all_cfs_.Update(cf_idx, logidx, seqno);
}
LogIndex GetSmallestFlushedLogIndex() const {
return log_index_of_all_cfs_.GetSmallestLogIndex(-1).smallest_flushed_log_index;
}
bool IsRestarting() const { return is_starting_; }
void StartingPhaseEnd() { is_starting_ = false; }

Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2517,4 +2517,13 @@ Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log, LogIndex log_idx) {
return s;
}

LogIndex Storage::GetSmallestFlushedLogIndex() const {
LogIndex smallest_flushed_log_index = INT64_MAX;
for (auto& inst : insts_) {
smallest_flushed_log_index = std::min(smallest_flushed_log_index, inst->GetSmallestFlushedLogIndex());
}

return smallest_flushed_log_index;
}

} // namespace storage
Loading