Skip to content

Commit

Permalink
send recv sst ok
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Aug 31, 2023
1 parent c37feba commit c233308
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 6 deletions.
4 changes: 3 additions & 1 deletion src/consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@

#pragma once

#define SG_META_PREFIX "SG_META"
#define SG_META_PREFIX "SG_META"

#define SNAPSHOTING_KEY_SCAN_PRE_COOUNT 500
4 changes: 2 additions & 2 deletions src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,11 @@ grpc::Status ERaftKvServer::PutSSTFile(
eraftkv::SSTFileId* fileId) {
eraftkv::SSTFileContent sst_file;
SequentialFileWriter writer;
SPDLOG_INFO("recv sst filename {} id {}", sst_file.name(), sst_file.id());
DirectoryTool::MkDir("/eraft/data/sst_recv/");
uint64_t sec = std::chrono::duration_cast<std::chrono::seconds>(
uint64_t sec = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
SPDLOG_INFO("recv sst filename {} id {} sec {}", sst_file.name(), sst_file.id(), sec);
writer.OpenIfNecessary("/eraft/data/sst_recv/" + std::to_string(sec) +
".sst");
while (reader->Read(&sst_file)) {
Expand Down
29 changes: 27 additions & 2 deletions src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "rocksdb_storage_impl.h"
#include "util.h"
#include "consts.h"

/**
* @brief Construct a new Raft Server object
Expand All @@ -68,7 +69,7 @@ RaftServer::RaftServer(RaftConfig raft_config,
, max_entries_per_append_req_(100)
, tick_interval_(1000)
, granted_votes_(0)
, snap_threshold_log_count_(20)
, snap_threshold_log_count_(300)
, open_auto_apply_(true)
, is_snapshoting_(false)
, snap_db_path_(raft_config.snap_path)
Expand Down Expand Up @@ -208,10 +209,30 @@ EStatus RaftServer::SendAppendEntries() {
if (prev_log_index < this->log_store_->FirstIndex()) {
auto new_first_log_ent = this->log_store_->GetFirstEty();

RocksDBStorageImpl* snapshot_db = new RocksDBStorageImpl(snap_db_path_);
auto kvs = snapshot_db->PrefixScan("", 0, SNAPSHOTING_KEY_SCAN_PRE_COOUNT);
DirectoryTool::MkDir("/eraft/data/sst_send/");
uint64_t count = 1;
while (kvs.size() != 0)
{
SPDLOG_INFO("scan find {} keys", kvs.size());
rocksdb::Options options;
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options);
sst_file_writer.Open("/eraft/data/sst_send/" + std::to_string(count) + ".sst");
for (auto kv : kvs) {
SPDLOG_INFO("key {} -> val {}", kv.first, kv.second);
sst_file_writer.Put(kv.first, kv.second);
}
sst_file_writer.Finish();
kvs = snapshot_db->PrefixScan("", count * SNAPSHOTING_KEY_SCAN_PRE_COOUNT, SNAPSHOTING_KEY_SCAN_PRE_COOUNT);
count += 1;
}


//
// loop send sst files
//
auto snap_files = DirectoryTool::ListDirFiles(this->snap_db_path_);
auto snap_files = DirectoryTool::ListDirFiles("/eraft/data/sst_send/");
for (auto snapfile : snap_files) {
if (StringUtil::endsWith(snapfile, ".sst")) {
SPDLOG_INFO("snapfile {}", snapfile);
Expand Down Expand Up @@ -643,6 +664,10 @@ EStatus RaftServer::HandleSnapshotReq(RaftNode* from_node,
// sstfile << req->data();
// sstfile.close();
// }
auto snap_files = DirectoryTool::ListDirFiles("/eraft/data/sst_recv/");
for (auto snapfile : snap_files) {
this->store_->IngestSST(snapfile);
}

if (req->last_included_index() <= this->commit_idx_) {
this->is_snapshoting_ = false;
Expand Down
11 changes: 11 additions & 0 deletions src/rocksdb_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft,
*/
EStatus RocksDBStorageImpl::CreateCheckpoint(std::string snap_path) {
rocksdb::Checkpoint* checkpoint;
DirectoryTool::DeleteDir(snap_path);
auto st = rocksdb::Checkpoint::Create(this->kv_db_, &checkpoint);
if (!st.ok()) {
return EStatus::kError;
Expand Down Expand Up @@ -402,6 +403,16 @@ std::map<std::string, std::string> RocksDBStorageImpl::PrefixScan(
return kvs;
}

EStatus RocksDBStorageImpl::IngestSST(std::string sst_file_path) {
rocksdb::IngestExternalFileOptions ifo;
auto st = kv_db_->IngestExternalFile({sst_file_path}, ifo);
if (!st.ok()) {
SPDLOG_ERROR("ingest sst file {} error", sst_file_path);
return EStatus::kError;
}
return EStatus::kOk;
}

/**
* @brief
*
Expand Down
9 changes: 9 additions & 0 deletions src/rocksdb_storage_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ class RocksDBStorageImpl : public Storage {
int64_t offset,
int64_t limit);

/**
* @brief
*
* @param sst_file_path
* @return EStatus
*/
EStatus IngestSST(std::string sst_file_path);


/**
* @brief
*
Expand Down
8 changes: 8 additions & 0 deletions src/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ class Storage {
int64_t offset,
int64_t limit) = 0;

/**
* @brief
*
* @param sst_file_path
* @return EStatus
*/
virtual EStatus IngestSST(std::string sst_file_path) = 0;

/**
* @brief
*
Expand Down
5 changes: 4 additions & 1 deletion utils/run-kdb-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ redis-cli -h 172.18.0.6 -p 12306 shardgroup query
sleep 1 # test mode raft interval is 1s

redis-cli -h 172.18.0.6 -p 12306 info
cat /eraft/utils/test_commands.txt | redis-cli -h 172.18.0.6 -p 12306

for i in {1024..1124}; do redis-cli -h 172.18.0.6 -p 12306 set $i $i; done

# cat /eraft/utils/test_commands.txt | redis-cli -h 172.18.0.6 -p 12306

0 comments on commit c233308

Please sign in to comment.