This repository has been archived by the owner on Jun 14, 2022. It is now read-only.
/
cleaner.go
206 lines (170 loc) · 4.84 KB
/
cleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package captain
import (
"errors"
"os"
)
// ErrSkipSegment signals the cleaner to stop & skip over the current segment.
var ErrSkipSegment = errors.New("skip this segment")
// Cleaner is responsible for cleaning the read-only portion of the stream.
// Policy for cleaning is based on the CleanFn passed to the Clean method.
type Cleaner struct {
path string
header *MagicHeader
plock fileLocker // Cleaning process file lock (advisory).
dir syncer // Used to sync the directory after renames for durability.
fs cleanerVFS // Indirection for edge case error testing.
}
// OpenCleaner returns a Cleaner on the stream.
func (s *Stream) OpenCleaner() (*Cleaner, error) {
dir, err := os.Open(s.path)
if err != nil {
return nil, err
}
plock, err := openFileMutex(s.path + "/" + readLockFile)
if err != nil {
return nil, err
}
return &Cleaner{
path: s.path,
header: s.header,
plock: plock,
dir: dir,
fs: &cleanerFS{},
}, nil
}
// Lock Cleaner across processes (advisory).
func (c *Cleaner) Lock() error {
return c.plock.Lock()
}
// Unlock Cleaner across processes (advisory).
func (c *Cleaner) Unlock() error {
return c.plock.Unlock()
}
// Clean takes a CleanFn, iterates through every read-only segment file and
// invokes CleanFn against every log record:
// * Deleting the record when CleanFn returns true.
// * Retaining the record when CleanFn returns false.
// * Skipping the segment file if ErrSkipSegment is returned.
// * Stopping the cleaning if CleanFn returns any other error.
//
// Cleaning works by rewriting the segment file with only the relevant log records.
// Requires a cleaning lock via Lock() which is an exclusive lock, blocking cursors.
func (c *Cleaner) Clean(fn CleanFn) error {
// No cursor lock needed here, Cleaner has the exlusive lock.
segs := scanSegments(c.path)
// Skip the last seg which is the active append seg.
if len(segs) > 0 {
segs = segs[0 : len(segs)-1]
}
SegLoop:
// Skip last seg which is the active append seg.
for i := 0; i < len(segs); i++ {
name := segs[i].name
seq := segs[i].seq
// Open "rewrite" version of the seg.
rwPath := segs[i].name + ".rw"
defer c.fs.remove(rwPath)
w, err := c.fs.openSegmentRewriter(rwPath, seq, c.header)
if err != nil {
return err
}
cur, err := c.fs.openSegmentCursor(name, c.header)
if err != nil {
return err
}
var total int
var cleaned int
var r *Record
for {
r, err = cur.Next()
if err != nil {
return err
}
// Graceful end of cursor.
if r == nil {
break
}
total++
ok, err := fn(name, r)
if err != nil {
if err == ErrSkipSegment {
cur.Close()
w.Close()
continue SegLoop
}
return err
}
if ok {
// Ok to delete, skip the rewrite of this rec below.
cleaned++
continue
}
if err = writeRecord(w, r); err != nil {
return err
}
}
// Delete the entire file if all records have been cleaned.
if total > 0 && total == cleaned {
if err = c.fs.remove(name); err != nil {
return err
}
c.fs.remove(rwPath)
} else if w.Size() > 0 {
if err = w.Sync(); err != nil {
return err
}
// Atomically replace the segment file with the rewrite version.
if err = c.fs.rename(rwPath, name); err != nil {
return err
}
if err = c.dir.Sync(); err != nil {
return err
}
}
cur.Close()
w.Close()
}
return nil
}
// CleanFn represents the function signature that is passed to the Clean function.
// Returning true signals the cleaner to remove the record.
// Returning false signals to retain the record.
type CleanFn func(path string, r *Record) (bool, error)
type fileLocker interface {
RLock() error
RUnlock() error
Lock() error
Unlock() error
}
// cleanVFS represents a pseduo file access layer. Indirection allows for edge
// case testing.
type cleanerVFS interface {
openSegmentRewriter(name string, seq uint32, header *MagicHeader) (*segmentWriter, error)
openSegmentCursor(name string, header *MagicHeader) (recordCursor, error)
remove(name string) error
rename(old, new string) error
}
// cleanFS implements cleanVFS and is the default implementation.
type cleanerFS struct{}
func (fs *cleanerFS) openSegmentRewriter(name string, seq uint32, header *MagicHeader) (*segmentWriter, error) {
// Speculative remove, only allow for a "not exist" error.
// Any other error could mean the old file is hanging around.
err := os.Remove(name)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
return openSegmentWriter(name, seq, header)
}
func (fs *cleanerFS) openSegmentCursor(name string, header *MagicHeader) (recordCursor, error) {
return openSegmentCursor(name, header)
}
func (fs *cleanerFS) remove(name string) error {
return os.Remove(name)
}
func (fs *cleanerFS) rename(old, new string) error {
return os.Rename(old, new)
}
type recordCursor interface {
Next() (*Record, error)
Close() error
}