Skip to content

Commit

Permalink
Merge pull request #187 from eraft-io/feature_20230701_multishard
Browse files Browse the repository at this point in the history
fix log confict with snapshoting
  • Loading branch information
LLiuJJ committed Aug 29, 2023
2 parents 4eb5dc8 + 4009f81 commit 560c376
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 61 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ run-demo:
docker run --name kvserver-node2 --network mytestnetwork --ip 172.18.0.11 --privileged=true -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 1 /eraft/data/kv_db1 /eraft/data/log_db1 /eraft/data/snap_db1 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 /eraft/logs/eraftkv-2.log
docker run --name kvserver-node3 --network mytestnetwork --ip 172.18.0.12 --privileged=true -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftkv 2 /eraft/data/kv_db2 /eraft/data/log_db2 /eraft/data/snap_db2 172.18.0.10:8088,172.18.0.11:8089,172.18.0.12:8090 /eraft/logs/eraftkv-3.log
sleep 1
docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /tmp/meta_db0 /tmp/log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name metaserver-node1 --network mytestnetwork --ip 172.18.0.2 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 0 /eraft/data/meta_db0 /eraft/data/meta_log_db0 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
sleep 3
docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /tmp/meta_db1 /tmp/log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /tmp/meta_db2 /tmp/log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name metaserver-node2 --network mytestnetwork --ip 172.18.0.3 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 1 /eraft/data/meta_db1 /eraft/data/meta_log_db1 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
docker run --name metaserver-node3 --network mytestnetwork --ip 172.18.0.4 -d --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraftmeta 2 /eraft/data/meta_db2 /eraft/data/meta_log_db2 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090
sleep 16
docker run --name vdbserver-node --network mytestnetwork --ip 172.18.0.6 -it --rm -v $(realpath .):/eraft eraft/eraftkv:v0.0.6 /eraft/build/eraft-kdb 172.18.0.6:12306 172.18.0.2:8088,172.18.0.3:8089,172.18.0.4:8090 /eraft/logs/eraftkdb.log

Expand Down
46 changes: 23 additions & 23 deletions protocol/eraftkv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ message AppendEntriesResp {


message SnapshotReq {
int64 term = 1;
int64 leader_id = 2;
string message_index = 3;
int64 last_included_index = 4;
int64 last_included_term = 5;
int64 offset = 6;
bytes data = 7;
bool done = 8;
int64 term = 1;
int64 leader_id = 2;
string message_index = 3;
int64 last_included_index = 4;
int64 last_included_term = 5;
int64 offset = 6;
bytes data = 7;
bool done = 8;
}

message SnapshotResp {
Expand All @@ -86,9 +86,9 @@ enum SlotStatus {
}

message Slot {
int64 id = 1;
SlotStatus slot_status = 2;
int64 status_modify_time = 3;
int64 id = 1;
SlotStatus slot_status = 2;
int64 status_modify_time = 3;
}

enum ServerStatus {
Expand Down Expand Up @@ -126,14 +126,14 @@ enum HandleServerType {
}

message ClusterConfigChangeReq {
ChangeType change_type = 1;
HandleServerType handle_server_type = 2;
int64 shard_id = 3;
Server server = 4;
int64 config_version = 5;
int64 op_count = 6;
int64 command_id = 7;
ShardGroup shard_group = 8;
ChangeType change_type = 1;
HandleServerType handle_server_type = 2;
int64 shard_id = 3;
Server server = 4;
int64 config_version = 5;
int64 op_count = 6;
int64 command_id = 7;
ShardGroup shard_group = 8;
}

message ClusterConfigChangeResp {
Expand All @@ -154,8 +154,8 @@ enum ClientOpType {

message KvOpPair {
ClientOpType op_type = 1;
string key = 2;
string value = 3;
string key = 2;
string value = 3;
bool success = 4;
int64 op_count = 5;
}
Expand All @@ -168,8 +168,8 @@ message ClientOperationReq {

message ClientOperationResp {
repeated KvOpPair ops = 1;
ErrorCode error_code = 2;
int64 leader_addr = 3;
ErrorCode error_code = 2;
int64 leader_addr = 3;
}

service ERaftKv {
Expand Down
6 changes: 3 additions & 3 deletions src/eraftkv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
* @brief
*
* @param argc
* @param argv (eg: eraftkv 0 /tmp/kv_db0 /tmp/log_db0
* @param argv (eg: eraftkv 0 /tmp/kv_db0 /tmp/log_db0 /tmp/snap_db0
* 127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090)
* eraftkv [node id] [kv data path] [log data path] [meta server addrs]
* [log_file_path]
* eraftkv [node id] [kv data path] [log data path] [snap db path] [meta server
* addrs] [log_file_path]
* @return int
*/
int main(int argc, char* argv[]) {
Expand Down
2 changes: 1 addition & 1 deletion src/eraftkv_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ grpc::Status ERaftKvServer::ClusterConfigChange(
}

EStatus ERaftKvServer::TakeSnapshot(int64_t log_idx) {
return raft_context_->SnapshotingStart(log_idx, options_.kv_db_path);
return raft_context_->SnapshotingStart(log_idx);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/eraftkv_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ERaftKvServer : public eraftkv::ERaftKv::Service {
raft_config.peer_address_map[count] = peer;
count++;
}
raft_config.snap_path = option.snap_db_path;
raft_config.snap_path = options_.snap_db_path;
options_.svr_addr = raft_config.peer_address_map[options_.svr_id];
GRpcNetworkImpl* net_rpc = new GRpcNetworkImpl();
net_rpc->InitPeerNodeConnections(raft_config.peer_address_map);
Expand Down
22 changes: 14 additions & 8 deletions src/log_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,11 @@ eraftkv::Entry* RocksDBSingleLogStorageImpl::Get(int64_t index) {
key.append("E:");
EncodeDecodeTool::PutFixed64(&key, static_cast<uint64_t>(index));
auto status = log_db_->Get(rocksdb::ReadOptions(), key, &new_ety_str);
assert(status.ok());
bool parse_ok = new_ety->ParseFromString(new_ety_str);
assert(parse_ok);
// assert(status.ok());
if (status.ok()) {
bool parse_ok = new_ety->ParseFromString(new_ety_str);
}
// assert(parse_ok);
return new_ety;
}

Expand Down Expand Up @@ -362,14 +364,16 @@ void RocksDBSingleLogStorageImpl::ResetFirstLogEntry(int64_t term,

eraftkv::Entry* ety = new eraftkv::Entry();
// write init log with index 0 to rocksdb
SPDLOG_INFO("reset first log with index {}, term {}", index, term);
ety->set_e_type(eraftkv::EntryType::NoOp);
ety->set_id(index);
ety->set_term(term);
std::string* key = new std::string();
key->append("E:");
EncodeDecodeTool::PutFixed64(key, static_cast<uint64_t>(this->FirstIndex()));
EncodeDecodeTool::PutFixed64(key, static_cast<uint64_t>(index));
std::string val = ety->SerializeAsString();
auto status = log_db_->Put(rocksdb::WriteOptions(), *key, val);
this->first_idx = index;
assert(status.ok());
}

Expand All @@ -383,12 +387,14 @@ int64_t RocksDBSingleLogStorageImpl::LastIndex() {
}

EStatus RocksDBSingleLogStorageImpl::Reinit() {
auto iter = log_db_->NewIterator(rocksdb::ReadOptions());
rocksdb::ReadOptions read_options;
auto iter = log_db_->NewIterator(read_options);
iter->Seek("E:");
while (iter->Valid()) {
auto st = log_db_->Delete(rocksdb::WriteOptions(), iter->key());
SPDLOG_INFO("delete log entry {}", iter->key().ToString());
assert(st.ok());
if (iter->key().ToString().rfind("E:", 0) == 0) {
auto st = log_db_->Delete(rocksdb::WriteOptions(), iter->key());
SPDLOG_INFO("delete log entry {}", iter->key().ToString());
}
iter->Next();
}
return EStatus::kOk;
Expand Down
47 changes: 27 additions & 20 deletions src/raft_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,14 @@ EStatus RaftServer::SendAppendEntries() {
this->log_store_->FirstIndex());

if (prev_log_index < this->log_store_->FirstIndex()) {
auto new_first_log_ent = this->log_store_->GetFirstEty();
auto new_first_log_ent = this->log_store_->GetFirstEty();

// TODO: loop send sst files
// auto snap_files = DirectoryTool::ListDirFiles(this->snap_db_path_);
// for (auto snapfile : snap_files) {
// SPDLOG_INFO("snapfile {}", snapfile);
// }

eraftkv::SnapshotReq* snap_req = new eraftkv::SnapshotReq();
snap_req->set_term(this->current_term_);
snap_req->set_leader_id(this->id_);
Expand Down Expand Up @@ -261,8 +268,9 @@ EStatus RaftServer::SendAppendEntries() {
* @return EStatus
*/
EStatus RaftServer::ApplyEntries() {
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
this->store_->ApplyLog(this, 0, 0);
if (!this->IsSnapshoting()) {
this->store_->ApplyLog(this, 0, 0);
}
return EStatus::kOk;
}

Expand All @@ -282,7 +290,6 @@ bool RaftServer::IsUpToDate(int64_t last_idx, int64_t term) {
EStatus RaftServer::HandleRequestVoteReq(RaftNode* from_node,
const eraftkv::RequestVoteReq* req,
eraftkv::RequestVoteResp* resp) {
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
resp->set_term(current_term_);
resp->set_prevote(req->prevote());
SPDLOG_INFO("handle vote req " + req->DebugString());
Expand Down Expand Up @@ -411,7 +418,6 @@ EStatus RaftServer::Propose(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -452,8 +458,7 @@ EStatus RaftServer::Propose(std::string payload,
EStatus RaftServer::HandleAppendEntriesReq(RaftNode* from_node,
const eraftkv::AppendEntriesReq* req,
eraftkv::AppendEntriesResp* resp) {
// std::lock_guard<std::mutex> guard(raft_op_mutex_);

SPDLOG_INFO("handle ae {}", req->DebugString());
ResetRandomElectionTimeout();
election_tick_count_ = 0;

Expand Down Expand Up @@ -485,15 +490,18 @@ EStatus RaftServer::HandleAppendEntriesReq(RaftNode* from_node,
this->leader_id_ = req->leader_id();

if (req->prev_log_index() < this->log_store_->FirstIndex()) {
resp->set_conflict_index(this->log_store_->FirstIndex());
resp->set_conflict_term(0);
resp->set_term(0);
resp->set_success(false);
return EStatus::kOk;
}

if (!this->MatchLog(req->prev_log_term(), req->prev_log_index())) {
resp->set_success(false);
// after snapshoting GetLastEty()->term() is 0
if (!(this->MatchLog(req->prev_log_term(), req->prev_log_index()) ||
this->log_store_->GetLastEty()->term() == 0)) {
resp->set_success(true);
if (this->log_store_->LastIndex() < req->prev_log_index()) {
SPDLOG_INFO("log conflict with index {} term {}",
this->log_store_->LastIndex(),
this->log_store_->GetLastEty()->term());
resp->set_conflict_index(this->log_store_->LastIndex());
resp->set_conflict_term(this->log_store_->GetLastEty()->term());
} else {
Expand Down Expand Up @@ -602,10 +610,12 @@ EStatus RaftServer::HandleSnapshotReq(RaftNode* from_node,
const eraftkv::SnapshotReq* req,
eraftkv::SnapshotResp* resp) {
SPDLOG_INFO("handle snapshot req {} ", req->DebugString());
this->is_snapshoting_ = true;
resp->set_term(this->current_term_);
resp->set_success(false);

if (req->term() < this->current_term_) {
this->is_snapshoting_ = false;
return EStatus::kOk;
}

Expand All @@ -621,6 +631,7 @@ EStatus RaftServer::HandleSnapshotReq(RaftNode* from_node,
resp->set_success(true);

if (req->last_included_index() <= this->commit_idx_) {
this->is_snapshoting_ = false;
return EStatus::kOk;
}

Expand All @@ -636,7 +647,7 @@ EStatus RaftServer::HandleSnapshotReq(RaftNode* from_node,

this->last_applied_idx_ = req->last_included_index();
this->commit_idx_ = req->last_included_index();

this->is_snapshoting_ = false;
return EStatus::kOk;
}

Expand Down Expand Up @@ -751,7 +762,6 @@ EStatus RaftServer::ProposeConfChange(std::string payload,
int64_t* new_log_index,
int64_t* new_log_term,
bool* is_success) {
// std::lock_guard<std::mutex> guard(raft_op_mutex_);
if (this->role_ != NodeRaftRoleEnum::Leader) {
*new_log_index = -1;
*new_log_term = -1;
Expand Down Expand Up @@ -894,10 +904,7 @@ EStatus RaftServer::ElectionStart(bool is_prevote) {
* @param snapdir
* @return EStatus
*/
EStatus RaftServer::SnapshotingStart(int64_t ety_idx, std::string snapdir) {

std::lock_guard<std::mutex> guard(raft_op_mutex_);

EStatus RaftServer::SnapshotingStart(int64_t ety_idx) {
this->is_snapshoting_ = true;
auto snap_index = this->log_store_->FirstIndex();

Expand All @@ -911,9 +918,9 @@ EStatus RaftServer::SnapshotingStart(int64_t ety_idx, std::string snapdir) {

this->log_store_->EraseBefore(ety_idx);
// reset first log index
this->log_store_->ResetFirstLogEntry(0, ety_idx);
this->log_store_->ResetFirstLogEntry(this->current_term_, ety_idx);

this->store_->CreateCheckpoint(snapdir);
this->store_->CreateCheckpoint(snap_db_path_);

this->is_snapshoting_ = false;

Expand Down
2 changes: 1 addition & 1 deletion src/raft_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class RaftServer {
* @param snapdir
* @return EStatus
*/
EStatus SnapshotingStart(int64_t ety_idx, std::string snapdir);
EStatus SnapshotingStart(int64_t ety_idx);

/**
* @brief
Expand Down
5 changes: 4 additions & 1 deletion src/rocksdb_storage_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft,
if (raft->commit_idx_ == raft->last_applied_idx_) {
return EStatus::kOk;
}
SPDLOG_INFO("appling entries from {} to {}",
raft->last_applied_idx_,
raft->commit_idx_);
auto etys =
raft->log_store_->Gets(raft->last_applied_idx_, raft->commit_idx_);
for (auto ety : etys) {
Expand Down Expand Up @@ -136,7 +139,7 @@ EStatus RocksDBStorageImpl::ApplyLog(RaftServer* raft,
delete op_pair;
if (raft->log_store_->LogCount() > raft->snap_threshold_log_count_) {
// to snapshot
raft->SnapshotingStart(ety->id(), raft->snap_db_path_);
raft->SnapshotingStart(ety->id());
}
break;
}
Expand Down

0 comments on commit 560c376

Please sign in to comment.