Skip to content

Commit

Permalink
optimize log
Browse files Browse the repository at this point in the history
  • Loading branch information
LLiuJJ committed Jan 29, 2024
1 parent ab4c347 commit f47f2a8
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/shardsvr/shardsvr.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
svr_peer_map[i] = addr
}

shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, node_id, gid, os.Args[3])
shard_svr := shardkvserver.MakeShardKVServer(svr_peer_map, int64(node_id), gid, os.Args[3])
lis, err := net.Listen("tcp", svr_peer_map[node_id])
if err != nil {
fmt.Printf("failed to listen: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func MakeMetaServer(peerMaps map[int]string, nodeId int) *MetaServer {
newdb_eng := storage.EngineFactory("leveldb", "./data/db/metanode_"+strconv.Itoa(nodeId))
logdb_eng := storage.EngineFactory("leveldb", "./data/log/metanode_"+strconv.Itoa(nodeId))

newRf := raftcore.MakeRaft(client_ends, nodeId, logdb_eng, newApplyCh, 50, 150)
newRf := raftcore.MakeRaft(client_ends, int64(nodeId), logdb_eng, newApplyCh, 50, 150)
meta_server := &MetaServer{
Rf: newRf,
applyCh: newApplyCh,
Expand Down
56 changes: 30 additions & 26 deletions raftcore/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NodeToString(role NodeRole) string {
type Raft struct {
mu sync.RWMutex
peers []*RaftClientEnd // rpc client end
me_ int
id int64
dead int32
applyCh chan *pb.ApplyMsg
applyCond *sync.Cond
Expand All @@ -85,10 +85,10 @@ type Raft struct {
baseElecTimeout uint64
}

func MakeRaft(peers []*RaftClientEnd, me int, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
func MakeRaft(peers []*RaftClientEnd, me int64, newdbEng storage_eng.KvStore, applyCh chan *pb.ApplyMsg, heartbeatTimeOutMs uint64, baseElectionTimeOutMs uint64) *Raft {
rf := &Raft{
peers: peers,
me_: me,
id: me,
dead: 0,
applyCh: applyCh,
replicatorCond: make([]*sync.Cond, len(peers)),
Expand All @@ -107,13 +107,12 @@ func MakeRaft(peers []*RaftClientEnd, me int, newdbEng storage_eng.KvStore, appl
heartBeatTimeout: heartbeatTimeOutMs,
}
rf.curTerm, rf.votedFor = rf.persister.ReadRaftState()
rf.ReInitLog()
rf.applyCond = sync.NewCond(&rf.mu)
last_log := rf.logs.GetLast()
for _, peer := range peers {
logger.ELogger().Sugar().Debugf("peer addr %s id %d", peer.addr, peer.id)
rf.matchIdx[peer.id], rf.nextIdx[peer.id] = 0, int(last_log.Index+1)
if int(peer.id) != me {
if int64(peer.id) != me {
rf.replicatorCond[peer.id] = sync.NewCond(&sync.Mutex{})
go rf.Replicator(peer)
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (rf *Raft) SwitchRaftNodeRole(role NodeRole) {
case NodeRoleLeader:
// become leader,set replica (matchIdx and nextIdx) processs table
lastLog := rf.logs.GetLast()
rf.leaderId = int64(rf.me_)
rf.leaderId = int64(rf.id)
for i := 0; i < len(rf.peers); i++ {
rf.matchIdx[i], rf.nextIdx[i] = 0, int(lastLog.Index+1)
}
Expand Down Expand Up @@ -201,7 +200,7 @@ func (rf *Raft) HandleRequestVote(req *pb.RequestVoteRequest, resp *pb.RequestVo

last_log := rf.logs.GetLast()

if !(req.LastLogTerm > int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex >= last_log.Index)) {
if req.LastLogTerm < int64(last_log.Term) || (req.LastLogTerm == int64(last_log.Term) && req.LastLogIndex < last_log.Index) {
resp.Term, resp.VoteGranted = rf.curTerm, false
return
}
Expand Down Expand Up @@ -244,7 +243,7 @@ func (rf *Raft) HandleAppendEntries(req *pb.AppendEntriesRequest, resp *pb.Appen
if req.PrevLogIndex < int64(rf.logs.GetFirst().Index) {
resp.Term = 0
resp.Success = false
logger.ELogger().Sugar().Debugf("peer %d reject append entires request from %d", rf.me_, req.LeaderId)
logger.ELogger().Sugar().Debugf("peer %d reject append entires request from %d", rf.id, req.LeaderId)
return
}

Expand Down Expand Up @@ -295,7 +294,7 @@ func (rf *Raft) CondInstallSnapshot(lastIncluedTerm int, lastIncludedIndex int,
if lastIncludedIndex > int(rf.logs.GetLast().Index) {
rf.logs.ReInitLogs()
} else {
rf.logs.EraseBeforeWithDel(int64(lastIncludedIndex) - rf.logs.GetFirst().Index)
rf.logs.EraseBefore(int64(lastIncludedIndex)-rf.logs.GetFirst().Index, true)
rf.logs.SetEntFirstData([]byte{})
}
// update dummy entry with lastIncludedTerm and lastIncludedIndex
Expand All @@ -318,7 +317,7 @@ func (rf *Raft) Snapshot(index int, snapshot []byte) {
logger.ELogger().Sugar().Warnf("reject snapshot, current snapshotIndex is larger in cur term")
return
}
rf.logs.EraseBeforeWithDel(int64(index) - int64(snapshot_index))
rf.logs.EraseBefore(int64(index)-int64(snapshot_index), true)
rf.logs.SetEntFirstData([]byte{})
logger.ELogger().Sugar().Debugf("del log entry before idx %d", index)
rf.isSnapshoting = false
Expand Down Expand Up @@ -377,10 +376,13 @@ func (rf *Raft) GetLogCount() int {
func (rf *Raft) advanceCommitIndexForLeader() {
sort.Ints(rf.matchIdx)
n := len(rf.matchIdx)
// [18 18 '19 19 20] majority replicate log index 19
// [18 '18 19] majority replicate log index 18
// [18 '18 19 20] majority replicate log index 18
new_commit_index := rf.matchIdx[n-(n/2+1)]
if new_commit_index > int(rf.commitIdx) {
if rf.MatchLog(rf.curTerm, int64(new_commit_index)) {
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.me_, rf.commitIdx, rf.curTerm)
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
rf.commitIdx = int64(new_commit_index)
rf.applyCond.Signal()
}
Expand All @@ -390,7 +392,7 @@ func (rf *Raft) advanceCommitIndexForLeader() {
func (rf *Raft) advanceCommitIndexForFollower(leaderCommit int) {
new_commit_index := Min(leaderCommit, int(rf.logs.GetLast().Index))
if new_commit_index > int(rf.commitIdx) {
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.me_, rf.commitIdx, rf.curTerm)
logger.ELogger().Sugar().Debugf("peer %d advance commit index %d at term %d", rf.id, rf.commitIdx, rf.curTerm)
rf.commitIdx = int64(new_commit_index)
rf.applyCond.Signal()
}
Expand All @@ -403,19 +405,19 @@ func (rf *Raft) MatchLog(term, index int64) bool {

// Election make a new election
func (rf *Raft) Election() {
logger.ELogger().Sugar().Debugf("%d start election ", rf.me_)
logger.ELogger().Sugar().Debugf("%d start election ", rf.id)

rf.IncrGrantedVotes()
rf.votedFor = int64(rf.me_)
rf.votedFor = int64(rf.id)
vote_req := &pb.RequestVoteRequest{
Term: rf.curTerm,
CandidateId: int64(rf.me_),
CandidateId: int64(rf.id),
LastLogIndex: int64(rf.logs.GetLast().Index),
LastLogTerm: int64(rf.logs.GetLast().Term),
}
rf.PersistRaftState()
for _, peer := range rf.peers {
if int(peer.id) == rf.me_ {
if int64(peer.id) == rf.id {
continue
}
go func(peer *RaftClientEnd) {
Expand All @@ -434,7 +436,7 @@ func (rf *Raft) Election() {
// success granted the votes
rf.IncrGrantedVotes()
if rf.grantedVotes > len(rf.peers)/2 {
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.me_, rf.curTerm)
logger.ELogger().Sugar().Debugf("I'm win this term, (node %d) get majority votes int term %d ", rf.id, rf.curTerm)
rf.SwitchRaftNodeRole(NodeRoleLeader)
rf.BroadcastHeartbeat()
rf.grantedVotes = 0
Expand All @@ -457,7 +459,7 @@ func (rf *Raft) Election() {

func (rf *Raft) BroadcastAppend() {
for _, peer := range rf.peers {
if peer.id == uint64(rf.me_) {
if peer.id == uint64(rf.id) {
continue
}
rf.replicatorCond[peer.id].Signal()
Expand All @@ -467,7 +469,7 @@ func (rf *Raft) BroadcastAppend() {
// BroadcastHeartbeat broadcast heartbeat to peers
func (rf *Raft) BroadcastHeartbeat() {
for _, peer := range rf.peers {
if int(peer.id) == rf.me_ {
if int64(peer.id) == rf.id {
continue
}
logger.ELogger().Sugar().Debugf("send heart beat to %s", peer.addr)
Expand Down Expand Up @@ -529,8 +531,8 @@ func (rf *Raft) Append(command []byte) *pb.Entry {
Data: command,
}
rf.logs.Append(newLog)
rf.matchIdx[rf.me_] = int(newLog.Index)
rf.nextIdx[rf.me_] = int(newLog.Index) + 1
rf.matchIdx[rf.id] = int(newLog.Index)
rf.nextIdx[rf.id] = int(newLog.Index) + 1
rf.PersistRaftState()
return newLog
}
Expand Down Expand Up @@ -568,7 +570,7 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
first_log := rf.logs.GetFirst()
snap_shot_req := &pb.InstallSnapshotRequest{
Term: rf.curTerm,
LeaderId: int64(rf.me_),
LeaderId: int64(rf.id),
LastIncludedIndex: first_log.Index,
LastIncludedTerm: int64(first_log.Term),
Data: rf.ReadSnapshot(),
Expand Down Expand Up @@ -604,11 +606,13 @@ func (rf *Raft) replicateOneRound(peer *RaftClientEnd) {
} else {
first_index := rf.logs.GetFirst().Index
logger.ELogger().Sugar().Debugf("first log index %d", first_index)
entries := make([]*pb.Entry, len(rf.logs.EraseBefore(int64(prev_log_index)+1-first_index)))
copy(entries, rf.logs.EraseBefore(int64(prev_log_index)+1-first_index))
_, new_ents := rf.logs.EraseBefore(int64(prev_log_index)+1-first_index, false)
entries := make([]*pb.Entry, len(new_ents))
copy(entries, new_ents)

append_ent_req := &pb.AppendEntriesRequest{
Term: rf.curTerm,
LeaderId: int64(rf.me_),
LeaderId: int64(rf.id),
PrevLogIndex: int64(prev_log_index),
PrevLogTerm: int64(rf.logs.GetEntry(int64(prev_log_index) - first_index).Term),
Entries: entries,
Expand Down Expand Up @@ -666,7 +670,7 @@ func (rf *Raft) Applier() {
first_index, commit_index, last_applied := rf.logs.GetFirst().Index, rf.commitIdx, rf.lastApplied
entries := make([]*pb.Entry, commit_index-last_applied)
copy(entries, rf.logs.GetRange(last_applied+1-int64(first_index), commit_index+1-int64(first_index)))
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.me_, rf.lastApplied, commit_index, rf.curTerm)
logger.ELogger().Sugar().Debugf("%d, applies entries %d-%d in term %d", rf.id, rf.lastApplied, commit_index, rf.curTerm)

rf.mu.Unlock()
for _, entry := range entries {
Expand Down
25 changes: 10 additions & 15 deletions raftcore/raft_persistent_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,29 +229,24 @@ func (rfLog *RaftLog) Append(newEnt *pb.Entry) {
// EraseBefore
// erase log before from idx, and copy [idx:] log return
// this operation don't modity log in storage engine
func (rfLog *RaftLog) EraseBefore(idx int64) []*pb.Entry {
func (rfLog *RaftLog) EraseBefore(idx int64, withDel bool) (error, []*pb.Entry) {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
ents := []*pb.Entry{}
lastlog_id := rfLog.GetLastLogId()
firstlog_id := rfLog.GetFirstLogId()
if withDel {
for i := firstlog_id; i < firstlog_id+uint64(idx); i++ {
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil {
return err, ents
}
logger.ELogger().Sugar().Debugf("del log with id %d success", i)
}
}
for i := int64(firstlog_id) + idx; i <= int64(lastlog_id); i++ {
ents = append(ents, rfLog.GetEnt(i-int64(firstlog_id)))
}
return ents
}

func (rfLog *RaftLog) EraseBeforeWithDel(idx int64) error {
rfLog.mu.Lock()
defer rfLog.mu.Unlock()
firstlog_id := rfLog.GetFirstLogId()
for i := firstlog_id; i < firstlog_id+uint64(idx); i++ {
if err := rfLog.dbEng.DeleteBytesK(EncodeRaftLogKey(i)); err != nil {
return err
}
logger.ELogger().Sugar().Debugf("del log with id %d success", i)
}
return nil
return nil, ents
}

// EraseAfter
Expand Down
10 changes: 5 additions & 5 deletions raftcore/raft_persistent_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestEraseBefore1(t *testing.T) {
t.Logf("first log %s", fristEnt.String())
lastEnt := raftLog.GetLast()
t.Logf("last log %s", lastEnt.String())
ents := raftLog.EraseBefore(1)
_, ents := raftLog.EraseBefore(1, false)
t.Logf("%v", ents)
RemoveDir("./log_data_test")
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestPersisEraseBefore0And1(t *testing.T) {
t.Logf("first log %s", fristEnt.String())
lastEnt := raftLog.GetLast()
t.Logf("last log %s", lastEnt.String())
ents := raftLog.EraseBefore(0)
_, ents := raftLog.EraseBefore(0, false)
t.Logf("%v", ents)
raftLog.Append(&pb.Entry{
Index: 1,
Expand All @@ -113,7 +113,7 @@ func TestPersisEraseBefore0And1(t *testing.T) {
Index: 2,
Term: 1,
})
ents = raftLog.EraseBefore(1)
_, ents = raftLog.EraseBefore(1, false)
t.Logf("%v", ents)
t.Logf("%d", raftLog.LogItemCount())
RemoveDir("./log_data_test")
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestTestPersisLogErase(t *testing.T) {
Term: 1,
Data: []byte{0x01, 0x02},
})
raftLog.EraseBefore(0)
raftLog.EraseBefore(0, false)
fristEnt := raftLog.GetFirst()
t.Logf("first log %s", fristEnt.String())
lastEnt := raftLog.GetLast()
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestPersisLogGetRangeAfterGc(t *testing.T) {
Term: 1,
Data: []byte{0x01, 0x02},
})
raftLog.EraseBeforeWithDel(2)
raftLog.EraseBefore(2, true)
ents := raftLog.GetRange(1, 2)
for _, ent := range ents {
t.Logf("got ent %s", ent.String())
Expand Down
6 changes: 3 additions & 3 deletions shardkvserver/shard_kvserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,17 @@ type ShardKV struct {
// nodeId: the peer's nodeId in the raft group
// gid: the node's raft group id
// configServerAddr: config server addr (leader addr, need to optimized into config server peer map)
func MakeShardKVServer(peerMaps map[int]string, nodeId int, gid int, configServerAddrs string) *ShardKV {
func MakeShardKVServer(peerMaps map[int]string, nodeId int64, gid int, configServerAddrs string) *ShardKV {
client_ends := []*raftcore.RaftClientEnd{}
for id, addr := range peerMaps {
new_end := raftcore.MakeRaftClientEnd(addr, uint64(id))
client_ends = append(client_ends, new_end)
}
new_apply_ch := make(chan *pb.ApplyMsg)

log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(nodeId))
log_db_eng := storage_eng.EngineFactory("leveldb", "./data/log/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))
new_rf := raftcore.MakeRaft(client_ends, nodeId, log_db_eng, new_apply_ch, 50, 150)
newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(nodeId))
newdb_eng := storage_eng.EngineFactory("leveldb", "./data/db/datanode_group_"+strconv.Itoa(gid)+"_nodeid_"+strconv.Itoa(int(nodeId)))

shard_kv := &ShardKV{
dead: 0,
Expand Down
7 changes: 4 additions & 3 deletions tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"testing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func RunShardKvServer(svrPeerMaps map[int]string, nodeId int, groupId int, metaa
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

shardSvr := shardkvserver.MakeShardKVServer(svrPeerMaps, nodeId, groupId, metaaddrs)
shardSvr := shardkvserver.MakeShardKVServer(svrPeerMaps, int64(nodeId), groupId, metaaddrs)
lis, err := net.Listen("tcp", svrPeerMaps[nodeId])
if err != nil {
fmt.Printf("failed to listen: %v", err)
Expand Down Expand Up @@ -165,12 +166,12 @@ func TestClusterRwBench(t *testing.T) {
// R-W test
shardkvcli := shardkvserver.MakeKvClient("127.0.0.1:8088,127.0.0.1:8089,127.0.0.1:8090")

N := 1000
N := 300
KEY_SIZE := 64
VAL_SIZE := 64
bench_kvs := map[string]string{}
for i := 0; i <= N; i++ {
k := common.RandStringRunes(KEY_SIZE)
k := strconv.Itoa(i) + "-" + common.RandStringRunes(KEY_SIZE)
v := common.RandStringRunes(VAL_SIZE)
bench_kvs[k] = v
}
Expand Down

0 comments on commit f47f2a8

Please sign in to comment.