-
Notifications
You must be signed in to change notification settings - Fork 0
/
delete.go
98 lines (82 loc) · 2.44 KB
/
delete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package raftlog
import (
"errors"
"log"
"os"
"path"
"strconv"
)
func (rl *RaftLog) deleteLogEntry(index uint64) {
entryPath := path.Join(
rl.logDir,
LogEntryDirectoryName,
strconv.FormatUint(storageIndexToFileIndex(index), 10),
)
fi, err := os.Stat(entryPath)
if err != nil {
log.Fatalf("unable to delete log of index %d with error: %v", index, err)
}
entrySize := uint64(fi.Size())
if err = os.Remove(entryPath); err != nil {
log.Fatalf("unable to delete log of index %d with error: %v", index, err)
}
rl.setLogSizeBytes(rl.logSizeBytes - entrySize)
}
// DiscardLogEntriesAfter discards an entry in the logs at startIndex and all logs after it
func (rl *RaftLog) DiscardLogEntriesAfter(startIndex uint64) error {
rl.indexLock.Lock()
defer rl.indexLock.Unlock()
if startIndex <= rl.startIndex || startIndex >= rl.currentIndex {
return errors.New("index out of bounds")
}
for i := rl.currentIndex - 1; i >= startIndex; i-- {
rl.deleteLogEntry(i)
rl.currentIndex--
}
if rl.currentIndex > 1 {
logEntry, err := rl.GetLogEntryUnsafe(rl.currentIndex - 1)
if err != nil {
log.Fatalf("error deleting log entries: %v", err)
}
rl.mostRecentTerm = logEntry.Term
} else {
rl.mostRecentTerm = 0
}
return nil
}
// DiscardLogEntriesBefore discards an entry in the logs at endIndex and all logs before it
func (rl *RaftLog) DiscardLogEntriesBefore(endIndex, endTerm uint64) {
rl.indexLock.Lock()
defer rl.indexLock.Unlock()
for i := rl.startIndex + 1; i <= min(endIndex, rl.currentIndex-1); i++ {
logEntry, err := rl.GetLogEntryUnsafe(i)
if err != nil {
log.Fatalf("error deleting log entries: %v", err)
}
rl.deleteLogEntry(i)
rl.setStartIndex(i)
rl.setStartTerm(logEntry.Term)
}
if rl.startIndex >= rl.currentIndex {
rl.currentIndex = rl.startIndex + 1
rl.mostRecentTerm = endTerm
}
}
// DiscardAllLogEntries once a new snapshot has been reverted to
func (rl *RaftLog) DiscardAllLogEntries(snapshotIndex, snapshotTerm uint64) {
rl.indexLock.Lock()
defer rl.indexLock.Unlock()
err := os.RemoveAll(path.Join(rl.logDir, LogEntryDirectoryName))
if err != nil {
log.Fatalf("error deleting all log entries: %v", err)
}
err = os.Mkdir(path.Join(rl.logDir, LogEntryDirectoryName), 0700)
if err != nil {
log.Fatalf("error deleting all log entries: %v", err)
}
rl.setLogSizeBytes(0)
rl.setStartIndex(snapshotIndex)
rl.setStartTerm(snapshotTerm)
rl.currentIndex = rl.startIndex + 1
rl.mostRecentTerm = snapshotTerm
}