-
Notifications
You must be signed in to change notification settings - Fork 2
/
lstore_remover.go
121 lines (111 loc) · 2.92 KB
/
lstore_remover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package lstore
import (
"context"
"github.com/esdb/gocodec"
"github.com/v2pro/plz/concurrent"
"github.com/v2pro/plz/countlog"
"io/ioutil"
"os"
)
type removerCommand func(ctx *countlog.Context)
type remover struct {
cfg *removerConfig
indexer *indexer
state *storeState
commandQueue chan removerCommand
stream *gocodec.Stream
}
func (store *Store) newRemover(ctx *countlog.Context) (*remover, error) {
cfg := store.cfg
remover := &remover{
cfg: &cfg.removerConfig,
indexer: store.indexer,
state: &store.storeState,
commandQueue: make(chan removerCommand),
stream: gocodec.NewStream(nil),
}
err := remover.loadTombstone(ctx)
if err != nil {
return nil, err
}
remover.start(store.executor)
return remover, nil
}
func (remover *remover) start(executor *concurrent.UnboundedExecutor) {
executor.Go(func(ctx *countlog.Context) {
defer func() {
countlog.Info("event!indexer.stop")
}()
countlog.Info("event!indexer.start")
for {
var cmd removerCommand
select {
case <-ctx.Done():
return
case cmd = <-remover.commandQueue:
}
remover.runCommand(ctx, cmd)
}
})
}
func (remover *remover) runCommand(ctx *countlog.Context, cmd removerCommand) {
defer func() {
recovered := recover()
if recovered == concurrent.StopSignal {
panic(concurrent.StopSignal)
}
countlog.LogPanic(recovered)
}()
cmd(ctx)
}
func (remover *remover) asyncExecute(ctx *countlog.Context, cmd removerCommand) error {
select {
case remover.commandQueue <- cmd:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (remover *remover) Remove(ctxObj context.Context, removingFrom Offset) error {
ctx := countlog.Ctx(ctxObj)
resultChan := make(chan error)
remover.asyncExecute(ctx, func(ctx *countlog.Context) {
resultChan <- remover.doRemove(ctx, removingFrom)
})
return <-resultChan
}
func (remover *remover) doRemove(ctx *countlog.Context, removingFrom Offset) error {
err := remover.writeTombstone(ctx, removingFrom)
if err != nil {
return err
}
removedSegments, removedFrom := remover.state.removeHead(removingFrom)
remover.indexer.removeIndexedSegments(ctx, removedSegments)
if removedFrom == removingFrom {
err = os.Remove(remover.cfg.TombstoneSegmentPath())
ctx.TraceCall("callee!os.Remove", err)
}
return nil
}
func (remover *remover) writeTombstone(ctx *countlog.Context, removingOffset Offset) error {
stream := remover.stream
stream.Reset(nil)
stream.Marshal(segmentHeader{
segmentType: segmentTypeTombstone,
headOffset: removingOffset,
})
if stream.Error != nil {
return stream.Error
}
err := ioutil.WriteFile(remover.cfg.TombstoneSegmentTmpPath(), stream.Buffer(), 0666)
ctx.TraceCall("callee!ioutil.WriteFile", err)
if err != nil {
return err
}
err = os.Rename(remover.cfg.TombstoneSegmentTmpPath(), remover.cfg.TombstoneSegmentPath())
ctx.TraceCall("callee!os.Rename", err)
if err != nil {
return err
}
return nil
}