-
Notifications
You must be signed in to change notification settings - Fork 2
/
segment_tail.go
75 lines (70 loc) · 1.96 KB
/
segment_tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package lstore
import (
"errors"
"fmt"
"github.com/edsrzf/mmap-go"
"github.com/esdb/gocodec"
"github.com/v2pro/plz"
"github.com/v2pro/plz/countlog"
"io"
"os"
)
var SegmentOverflowError = errors.New("please rotate to new chunk")
// tailSegment is owned by appender
type tailSegment struct {
*segmentHeader
tailEntriesCount int
writeBuf []byte
writeMMap mmap.MMap
}
func (chunk *tailSegment) Close() error {
if chunk.writeMMap == nil {
return nil
}
err := chunk.writeMMap.Unmap()
countlog.TraceCall("callee!writeMMap.Unmap", err)
return err
}
func openTailSegment(ctx *countlog.Context, path string, maxSize int64, headOffset Offset) (*tailSegment, []*Entry, error) {
file, err := os.OpenFile(path, os.O_RDWR, 0666)
if os.IsNotExist(err) {
file, err = createRawSegment(ctx, path, maxSize, headOffset)
if err != nil {
return nil, nil, err
}
} else if err != nil {
ctx.TraceCall("callee!os.OpenFile", err)
return nil, nil, err
}
defer plz.Close(file)
writeMMap, err := mmap.Map(file, mmap.RDWR, 0)
ctx.TraceCall("callee!mmap.Map", err)
if err != nil {
return nil, nil, err
}
segment := &tailSegment{writeMMap: writeMMap}
iter := gocodec.NewIterator(writeMMap)
segmentHeader, _ := iter.CopyThenUnmarshal((*segmentHeader)(nil)).(*segmentHeader)
ctx.TraceCall("callee!iter.Copy", iter.Error)
if iter.Error != nil {
plz.Close(plz.WrapCloser(writeMMap.Unmap))
return nil, nil, fmt.Errorf("openTailSegment: %s", iter.Error.Error())
}
var entries []*Entry
for {
entry, _ := iter.CopyThenUnmarshal((*Entry)(nil)).(*Entry)
if iter.Error == io.EOF {
break
}
ctx.TraceCall("callee!iter.Copy", iter.Error)
if iter.Error != nil {
plz.Close(plz.WrapCloser(writeMMap.Unmap))
return nil, nil, fmt.Errorf("openTailSegment: %s", iter.Error.Error())
}
entries = append(entries, entry)
}
segment.segmentHeader = segmentHeader
segment.writeBuf = iter.Buffer()
segment.tailEntriesCount = len(entries)
return segment, entries, nil
}