Skip to content

Commit

Permalink
feat(fs): metaserver support asynchronous snapshot
Browse files Browse the repository at this point in the history
close: opencurve#1617

Signed-off-by: NaturalSelect <2145973003@qq.com>
  • Loading branch information
NaturalSelect committed Aug 31, 2023
1 parent e0d7fbf commit d1a29d3
Show file tree
Hide file tree
Showing 44 changed files with 1,940 additions and 1,705 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ build --verbose_failures
build --define=with_glog=true --define=libunwind=true
build --copt -DHAVE_ZLIB=1 --copt -DGFLAGS_NS=google --copt -DUSE_BTHREAD_MUTEX
build --cxxopt -Wno-error=format-security
build:gcc7-later --cxxopt -faligned-new --cxxopt -Wno-error=implicit-fallthrough
build:gcc7-later --cxxopt -faligned-new
build --incompatible_blacklisted_protos_requires_proto_info=false
build --copt=-fdiagnostics-color=always

Expand Down
6 changes: 5 additions & 1 deletion curvefs/proto/metaserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ message PrepareRenameTxRequest {
}

message TransactionRequest {
required uint32 type = 1;
enum TransactionType {
None = 0;
Rename = 1;
}
required TransactionType type = 1;
required string rawPayload = 2;
}

Expand Down
90 changes: 83 additions & 7 deletions curvefs/src/metaserver/copyset/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ CopysetNode::CopysetNode(PoolId poolId, CopysetId copysetId,
confChangeMtx_(),
ongoingConfChange_(),
metric_(absl::make_unique<OperatorMetric>(poolId_, copysetId_)),
isLoading_(false) {}
isLoading_(false),
enableSnapshot_(false),
snapshotCv_(),
snapshotLock_(),
snapshotTask_(),
snapshotFuture_() {}

CopysetNode::~CopysetNode() {
StopSnapshotTask();
Stop();
raftNode_.reset();
applyQueue_.reset();
Expand Down Expand Up @@ -155,6 +161,8 @@ bool CopysetNode::Init(const CopysetNodeOptions& options) {
peerId_ = braft::PeerId(addr, 0);
raftNode_ = absl::make_unique<RaftNode>(groupId_, peerId_);

// init snapshot task
StartSnapshotTask();
return true;
}

Expand Down Expand Up @@ -362,6 +370,79 @@ class OnSnapshotSaveDoneClosureImpl : public OnSnapshotSaveDoneClosure {

} // namespace

void CopysetNode::SnapshotTaskRun() {
while (enableSnapshot_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(snapshotLock_);
snapshotCv_.wait(lock);
std::swap(task, snapshotTask_);
}
if (task) {
task();
}
}
}

void CopysetNode::StartSnapshotTask() {
enableSnapshot_ = true;
{
std::unique_lock<std::mutex> lock(snapshotLock_);
snapshotFuture_ = std::async(std::launch::async,
&CopysetNode::SnapshotTaskRun,
this);
}
}

void CopysetNode::StopSnapshotTask() {
enableSnapshot_ = false;
std::future<void> future;
{
std::lock_guard<std::mutex> lock(snapshotLock_);
std::swap(future, snapshotFuture_);
snapshotCv_.notify_one();
}
if (future.valid()) {
future.wait();
}
}

void CopysetNode::DoSnapshot(OnSnapshotSaveDoneClosure* done) {
// NOTE: save metadata cannot be asynchronous
// we need maintain the consistency with
// raft snapshot metadata
std::vector<std::string> files;
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
if (!metaStore_->SaveMeta(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store metadata failed";
return;
}
// asynchronous save data
{
std::lock_guard<std::mutex> lock(snapshotLock_);
snapshotTask_ = [files, done, this]() mutable {
brpc::ClosureGuard doneGuard(done);
auto *writer = done->GetSnapshotWriter();
// save data files
if (!metaStore_->SaveData(writer->get_path(), &files)) {
done->SetError(MetaStatusCode::SAVE_META_FAIL);
LOG(ERROR) << "Save meta store data failed";
return;
}
// add files to snapshot writer
// file is a relative path under the given directory
for (const auto &f : files) {
writer->add_file(f);
}
done->SetSuccess();
};
snapshotCv_.notify_one();
}
doneGuard.release();
}

void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,
braft::Closure* done) {
LOG(INFO) << "Copyset " << name_ << " saving snapshot to '"
Expand Down Expand Up @@ -390,12 +471,7 @@ void CopysetNode::on_snapshot_save(braft::SnapshotWriter* writer,

writer->add_file(kConfEpochFilename);

// TODO(wuhanqing): MetaStore::Save will start a thread and do task
// asynchronously, after task completed it will call
// OnSnapshotSaveDoneImpl::Run
// BUT, this manner is not so clear, maybe it better to make thing
// asynchronous directly in here
metaStore_->Save(writer->get_path(), new OnSnapshotSaveDoneClosureImpl(
DoSnapshot(new OnSnapshotSaveDoneClosureImpl(
this, writer, done, metricCtx));
doneGuard.release();

Expand Down
27 changes: 27 additions & 0 deletions curvefs/src/metaserver/copyset/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <string>
#include <vector>
#include <map>
#include <condition_variable>

#include "curvefs/src/metaserver/common/types.h"
#include "curvefs/src/metaserver/copyset/concurrent_apply_queue.h"
Expand Down Expand Up @@ -220,6 +221,16 @@ class CopysetNode : public braft::StateMachine {

FRIEND_TEST(CopysetNodeBlockGroupTest, Test_AggregateBlockStatInfo);

private:
// for snapshot
void SnapshotTaskRun();

void StartSnapshotTask();

void StopSnapshotTask();

void DoSnapshot(OnSnapshotSaveDoneClosure* done);

private:
const PoolId poolId_;
const CopysetId copysetId_;
Expand Down Expand Up @@ -267,6 +278,22 @@ class CopysetNode : public braft::StateMachine {
std::unique_ptr<OperatorMetric> metric_;

std::atomic<bool> isLoading_;

// for asynchronous snapshot
std::atomic_bool enableSnapshot_;

std::condition_variable snapshotCv_;

mutable std::mutex snapshotLock_;

// NOTE: maintain a queue is unnecessary
// we only need one item
std::function<void()> snapshotTask_;
// NOTE: we need to use a thread to
// save snapshot, and this thread
// will blocking on `KVStorage::Checkpoint()`
// so we cannot use coroutine(bthread)
std::future<void> snapshotFuture_;
};

inline void CopysetNode::Propose(const braft::Task& task) {
Expand Down
Loading

0 comments on commit d1a29d3

Please sign in to comment.