Skip to content

Commit

Permalink
Merge pull request #50 from absolute8511/syncer-timestamp-lag
Browse files Browse the repository at this point in the history
Syncer stats support timestamp
  • Loading branch information
absolute8511 committed May 21, 2018
2 parents 84b12a6 + 11a2dd7 commit d6a974f
Show file tree
Hide file tree
Showing 16 changed files with 264 additions and 118 deletions.
9 changes: 5 additions & 4 deletions common/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ type NamespaceStats struct {
}

type LogSyncStats struct {
Name string `json:"name"`
Term uint64 `json:"term"`
Index uint64 `json:"index"`
IsLeader bool `json:"is_leader"`
Name string `json:"name"`
Term uint64 `json:"term"`
Index uint64 `json:"index"`
Timestamp int64 `json:"timestamp"`
IsLeader bool `json:"is_leader"`
}

type ScanStats struct {
Expand Down
11 changes: 6 additions & 5 deletions node/log_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,19 @@ func (s *RemoteLogSender) doSendOnce(r []*BatchInternalRaftRequest) error {
}

in := &syncerpb.RaftReqs{RaftLog: raftLogs}
if nodeLog.Level() >= common.LOG_DETAIL {
nodeLog.Debugf("sending log : %v", addr, in.String())
if nodeLog.Level() > common.LOG_DETAIL {
nodeLog.Debugf("sending(%v) log : %v", addr, in.String())
}
ctx, cancel := context.WithTimeout(context.Background(), sendLogTimeout)
defer cancel()
rpcErr, err := c.ApplyRaftReqs(ctx, in)
rpcErr, err := c.ApplyRaftReqs(ctx, in, grpc.MaxCallSendMsgSize(256<<20))
if err != nil {
nodeLog.Infof("sending(%v) log failed: %v, %v", addr, err.Error(), in.String())
nodeLog.Infof("sending(%v) log failed: %v", addr, err.Error())
return err
}
if rpcErr != nil && rpcErr.ErrCode != http.StatusOK &&
rpcErr.ErrCode != 0 {
nodeLog.Infof("sending(%v) log failed: %v, %v", addr, rpcErr, in.String())
nodeLog.Infof("sending(%v) log failed: %v", addr, rpcErr)
return errors.New(rpcErr.String())
}
return nil
Expand Down Expand Up @@ -383,6 +383,7 @@ func (s *RemoteLogSender) getRemoteSyncedRaftOnce() (SyncedState, error) {
}
state.SyncedTerm = rsp.Term
state.SyncedIndex = rsp.Index
state.Timestamp = rsp.Timestamp
nodeLog.Debugf("remote(%v) raft group %v synced : %v", addr, s.grpName, state)
return state, nil
}
Expand Down
24 changes: 23 additions & 1 deletion node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,27 @@ func (nsm *NamespaceMgr) GetDBStats(leaderOnly bool) map[string]string {
return nsStats
}

func (nsm *NamespaceMgr) GetLogSyncStatsInSyncer() ([]common.LogSyncStats, []common.LogSyncStats) {
nsm.mutex.RLock()
nsRecvStats := make([]common.LogSyncStats, 0, len(nsm.kvNodes))
nsSyncStats := make([]common.LogSyncStats, 0, len(nsm.kvNodes))
for k, n := range nsm.kvNodes {
if !n.IsReady() {
continue
}
recvStats, syncStats := n.Node.GetLogSyncStatsInSyncLearner()
if recvStats == nil || syncStats == nil {
continue
}
recvStats.Name = k
syncStats.Name = k
nsRecvStats = append(nsRecvStats, *recvStats)
nsSyncStats = append(nsSyncStats, *syncStats)
}
nsm.mutex.RUnlock()
return nsRecvStats, nsSyncStats
}

func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string) []common.LogSyncStats {
if srcClusterName == "" {
return nil
Expand All @@ -461,7 +482,7 @@ func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string)
if leaderOnly && !n.Node.IsLead() {
continue
}
term, index := n.Node.GetRemoteClusterSyncedRaft(srcClusterName)
term, index, ts := n.Node.GetRemoteClusterSyncedRaft(srcClusterName)
if term == 0 && index == 0 {
continue
}
Expand All @@ -470,6 +491,7 @@ func (nsm *NamespaceMgr) GetLogSyncStats(leaderOnly bool, srcClusterName string)
s.IsLeader = n.Node.IsLead()
s.Term = term
s.Index = index
s.Timestamp = ts
nsStats = append(nsStats, s)
}
nsm.mutex.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (nd *KVNode) handleProposeReq() {
}()
for {
pc := nd.reqProposeC
if len(reqList.Reqs) >= proposeQueueLen*4 {
if len(reqList.Reqs) >= proposeQueueLen*2 {
pc = nil
}
select {
Expand Down
36 changes: 28 additions & 8 deletions node/remote_sync_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"errors"
"sync"
"time"

"github.com/absolute8511/ZanRedisDB/common"
)

type SyncedState struct {
SyncedTerm uint64 `json:"synced_term,omitempty"`
SyncedIndex uint64 `json:"synced_index,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
// this is used to disallow compare using the ==
disableEqual []byte
}

func (ss *SyncedState) IsNewer(other *SyncedState) bool {
Expand All @@ -19,6 +24,10 @@ func (ss *SyncedState) IsNewer(other *SyncedState) bool {
return false
}

func (ss *SyncedState) IsSame(other *SyncedState) bool {
return ss.SyncedTerm == other.SyncedTerm && ss.SyncedIndex == other.SyncedIndex
}

func (ss *SyncedState) IsNewer2(term uint64, index uint64) bool {
if ss.SyncedTerm >= term && ss.SyncedIndex >= index {
return true
Expand Down Expand Up @@ -68,7 +77,7 @@ func newRemoteSyncedStateMgr() *remoteSyncedStateMgr {
func (rss *remoteSyncedStateMgr) RemoveApplyingSnap(name string, state SyncedState) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && sas.SS == state {
if ok && sas.SS.IsSame(&state) {
delete(rss.remoteSnapshotsApplying, name)
}
rss.Unlock()
Expand Down Expand Up @@ -109,7 +118,7 @@ func (rss *remoteSyncedStateMgr) AddApplyingSnap(name string, state SyncedState)
func (rss *remoteSyncedStateMgr) UpdateApplyingSnapStatus(name string, ss SyncedState, status int) {
rss.Lock()
sas, ok := rss.remoteSnapshotsApplying[name]
if ok && status < len(applyStatusMsgs) && ss == sas.SS {
if ok && status < len(applyStatusMsgs) && ss.IsSame(&sas.SS) {
if sas.StatusCode != status {
sas.StatusCode = status
sas.Status = applyStatusMsgs[status]
Expand Down Expand Up @@ -175,7 +184,8 @@ func (nd *KVNode) isAlreadyApplied(reqList BatchInternalRaftRequest) bool {

// return as (cluster name, is transferring remote snapshot, is applying remote snapshot)
func (nd *KVNode) preprocessRemoteSnapApply(reqList BatchInternalRaftRequest) (bool, bool) {
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex}
ss := SyncedState{SyncedTerm: reqList.OrigTerm,
SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
for _, req := range reqList.Reqs {
if req.Header.DataType == int32(CustomReq) {
var cr customProposeData
Expand All @@ -199,7 +209,7 @@ func (nd *KVNode) preprocessRemoteSnapApply(reqList BatchInternalRaftRequest) (b

func (nd *KVNode) postprocessRemoteSnapApply(reqList BatchInternalRaftRequest,
isRemoteSnapTransfer bool, isRemoteSnapApply bool, retErr error) {
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex}
ss := SyncedState{SyncedTerm: reqList.OrigTerm, SyncedIndex: reqList.OrigIndex, Timestamp: reqList.Timestamp}
// for remote snapshot transfer, we need wait apply success before update sync state
if !isRemoteSnapTransfer {
if retErr != errIgnoredRemoteApply {
Expand All @@ -221,12 +231,22 @@ func (nd *KVNode) postprocessRemoteSnapApply(reqList BatchInternalRaftRequest,
}
}

func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64) {
nd.remoteSyncedStates.UpdateState(name, SyncedState{SyncedTerm: term, SyncedIndex: index})
func (nd *KVNode) SetRemoteClusterSyncedRaft(name string, term uint64, index uint64, ts int64) {
nd.remoteSyncedStates.UpdateState(name, SyncedState{SyncedTerm: term, SyncedIndex: index, Timestamp: ts})
}
func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64) {
func (nd *KVNode) GetRemoteClusterSyncedRaft(name string) (uint64, uint64, int64) {
state, _ := nd.remoteSyncedStates.GetState(name)
return state.SyncedTerm, state.SyncedIndex
return state.SyncedTerm, state.SyncedIndex, state.Timestamp
}

func (nd *KVNode) GetLogSyncStatsInSyncLearner() (*common.LogSyncStats, *common.LogSyncStats) {
logSyncer, ok := nd.sm.(*logSyncerSM)
if !ok {
return nil, nil
}

recv, sync := logSyncer.GetLogSyncStats()
return &recv, &sync
}

func (nd *KVNode) ApplyRemoteSnapshot(skip bool, name string, term uint64, index uint64) error {
Expand Down
91 changes: 65 additions & 26 deletions node/syncer_learner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func EnableForTest() {
}

const (
logSendBufferLen = 100
logSendBufferLen = 64
)

var syncerNormalInit = false
Expand All @@ -29,14 +29,17 @@ func SetSyncerNormalInit() {
syncerNormalInit = true
}

var syncLearnerRecvStats common.WriteStats
var syncLearnerDoneStats common.WriteStats

type logSyncerSM struct {
clusterInfo common.IClusterInfo
fullNS string
machineConfig MachineConfig
ID uint64
syncedCnt int64
syncedIndex uint64
syncedTerm uint64
receivedState SyncedState
syncedState SyncedState
lgSender *RemoteLogSender
stopping int32
sendCh chan *BatchInternalRaftRequest
Expand Down Expand Up @@ -96,13 +99,30 @@ func (sm *logSyncerSM) GetDBInternalStats() string {
return ""
}

func GetLogLatencyStats() (*common.WriteStats, *common.WriteStats) {
return syncLearnerRecvStats.Copy(), syncLearnerDoneStats.Copy()
}

func (sm *logSyncerSM) GetLogSyncStats() (common.LogSyncStats, common.LogSyncStats) {
var recvStats common.LogSyncStats
var syncStats common.LogSyncStats
syncStats.Name = sm.fullNS
syncStats.Term, syncStats.Index, syncStats.Timestamp = sm.getSyncedState()
recvStats.Term = atomic.LoadUint64(&sm.receivedState.SyncedTerm)
recvStats.Index = atomic.LoadUint64(&sm.receivedState.SyncedIndex)
recvStats.Timestamp = atomic.LoadInt64(&sm.receivedState.Timestamp)
recvStats.Name = sm.fullNS
return recvStats, syncStats
}

func (sm *logSyncerSM) GetStats() common.NamespaceStats {
var ns common.NamespaceStats
stat := make(map[string]interface{})
stat["role"] = common.LearnerRoleLogSyncer
stat["synced"] = atomic.LoadInt64(&sm.syncedCnt)
stat["synced_index"] = atomic.LoadUint64(&sm.syncedIndex)
stat["synced_term"] = atomic.LoadUint64(&sm.syncedTerm)
stat["synced_index"] = atomic.LoadUint64(&sm.syncedState.SyncedIndex)
stat["synced_term"] = atomic.LoadUint64(&sm.syncedState.SyncedTerm)
stat["synced_timestamp"] = atomic.LoadInt64(&sm.syncedState.Timestamp)
ns.InternalStats = stat
return ns
}
Expand Down Expand Up @@ -136,32 +156,49 @@ func (sm *logSyncerSM) Close() {
sm.wg.Wait()
}

func (sm *logSyncerSM) setReceivedState(term uint64, index uint64, ts int64) {
atomic.StoreUint64(&sm.receivedState.SyncedTerm, term)
atomic.StoreUint64(&sm.receivedState.SyncedIndex, index)
atomic.StoreInt64(&sm.receivedState.Timestamp, ts)
}

func (sm *logSyncerSM) setSyncedState(term uint64, index uint64, ts int64) {
atomic.StoreUint64(&sm.syncedState.SyncedTerm, term)
atomic.StoreUint64(&sm.syncedState.SyncedIndex, index)
atomic.StoreInt64(&sm.syncedState.Timestamp, ts)
}

func (sm *logSyncerSM) getSyncedState() (uint64, uint64, int64) {
syncedTerm := atomic.LoadUint64(&sm.syncedState.SyncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedState.SyncedIndex)
syncedTs := atomic.LoadInt64(&sm.syncedState.Timestamp)
return syncedTerm, syncedIndex, syncedTs
}

func (sm *logSyncerSM) switchIgnoreSend(send bool) {
old := atomic.LoadInt32(&sm.ignoreSend)

syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
if send {
if old == 0 {
return
}
sm.Infof("switch to send log really at: %v-%v", syncedTerm, syncedIndex)
sm.Infof("switch to send log really at: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
atomic.StoreInt32(&sm.ignoreSend, 0)
} else {
if old == 1 {
return
}
sm.Infof("switch to ignore send log at: %v-%v", syncedTerm, syncedIndex)
sm.Infof("switch to ignore send log at: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
atomic.StoreInt32(&sm.ignoreSend, 1)
}
}

func (sm *logSyncerSM) handlerRaftLogs() {
defer func() {
sm.lgSender.Stop()
syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
sm.Infof("raft log syncer send loop exit at synced: %v-%v", syncedTerm, syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
sm.Infof("raft log syncer send loop exit at synced: %v-%v-%v", syncedTerm, syncedIndex, syncedTs)
}()
raftLogs := make([]*BatchInternalRaftRequest, 0, logSendBufferLen)
var last *BatchInternalRaftRequest
Expand Down Expand Up @@ -225,17 +262,19 @@ func (sm *logSyncerSM) handlerRaftLogs() {
case <-sm.sendStop:
return
default:
syncedTerm := atomic.LoadUint64(&sm.syncedTerm)
syncedIndex := atomic.LoadUint64(&sm.syncedIndex)
sm.Errorf("failed to send raft log to remote: %v, %v, current: %v-%v",
err, raftLogs, syncedTerm, syncedIndex)
syncedTerm, syncedIndex, syncedTs := sm.getSyncedState()
sm.Errorf("failed to send raft log to remote: %v, %v, current: %v-%v-%v",
err, len(raftLogs), syncedTerm, syncedIndex, syncedTs)
}
continue
}
if handled {
atomic.AddInt64(&sm.syncedCnt, int64(len(raftLogs)))
atomic.StoreUint64(&sm.syncedIndex, last.OrigIndex)
atomic.StoreUint64(&sm.syncedTerm, last.OrigTerm)
sm.setSyncedState(last.OrigTerm, last.OrigIndex, last.Timestamp)
t := time.Now().UnixNano()
for _, rl := range raftLogs {
syncLearnerDoneStats.UpdateLatencyStats((t - rl.Timestamp) / time.Microsecond.Nanoseconds())
}
raftLogs = raftLogs[:0]
}
}
Expand Down Expand Up @@ -278,8 +317,7 @@ func (sm *logSyncerSM) waitIgnoreUntilChanged(term uint64, index uint64, stop ch
for {
if atomic.LoadInt32(&sm.ignoreSend) == 1 {
// check local to avoid call rpc too much
syncTerm := atomic.LoadUint64(&sm.syncedTerm)
syncIndex := atomic.LoadUint64(&sm.syncedIndex)
syncTerm, syncIndex, _ := sm.getSyncedState()
if syncTerm >= term && syncIndex >= index {
return true, nil
}
Expand All @@ -288,8 +326,7 @@ func (sm *logSyncerSM) waitIgnoreUntilChanged(term uint64, index uint64, stop ch
sm.Infof("failed to get the synced state from remote: %v, at %v-%v", err, term, index)
} else {
if state.IsNewer2(term, index) {
atomic.StoreUint64(&sm.syncedIndex, state.SyncedIndex)
atomic.StoreUint64(&sm.syncedTerm, state.SyncedTerm)
sm.setSyncedState(state.SyncedTerm, state.SyncedIndex, state.Timestamp)
return true, nil
}
}
Expand Down Expand Up @@ -319,8 +356,7 @@ func (sm *logSyncerSM) RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Sna
}
if state.IsNewer2(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index) {
sm.Infof("ignored restore snapshot since remote has newer raft: %v than %v", state, raftSnapshot.Metadata.String())
atomic.StoreUint64(&sm.syncedIndex, raftSnapshot.Metadata.Index)
atomic.StoreUint64(&sm.syncedTerm, raftSnapshot.Metadata.Term)
sm.setSyncedState(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index, 0)
return nil
}

Expand Down Expand Up @@ -389,8 +425,7 @@ func (sm *logSyncerSM) RestoreFromSnapshot(startup bool, raftSnapshot raftpb.Sna
}

sm.Infof("apply snap done %v", raftSnapshot.Metadata)
atomic.StoreUint64(&sm.syncedIndex, raftSnapshot.Metadata.Index)
atomic.StoreUint64(&sm.syncedTerm, raftSnapshot.Metadata.Term)
sm.setSyncedState(raftSnapshot.Metadata.Term, raftSnapshot.Metadata.Index, 0)
return nil
}

Expand Down Expand Up @@ -425,6 +460,10 @@ func (sm *logSyncerSM) ApplyRaftRequest(isReplaying bool, reqList BatchInternalR
sm.Infof("ignore sync from cluster syncer, %v-%v:%v", term, index, reqList.String())
return false, nil
}
sm.setReceivedState(term, index, reqList.Timestamp)
latency := time.Now().UnixNano() - reqList.Timestamp
syncLearnerRecvStats.UpdateLatencyStats(latency / time.Microsecond.Nanoseconds())

forceBackup := false
reqList.OrigTerm = term
reqList.OrigIndex = index
Expand Down

0 comments on commit d6a974f

Please sign in to comment.