-
Notifications
You must be signed in to change notification settings - Fork 0
/
pend.go
83 lines (64 loc) · 1.45 KB
/
pend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package wal
import (
"sync"
)
func (w *Wal) Pending(capacity int) *Pending {
return &Pending{
wal: w,
pendingSize: 0,
pendingBytes: make([][]byte, 0, capacity),
}
}
// Pending provides a way to write data in batches
type Pending struct {
wal *Wal
pendingSize int64
pendingBytes [][]byte
mutex sync.Mutex
}
func (p *Pending) Write(data []byte) error {
p.mutex.Lock()
defer p.mutex.Unlock()
if int64(len(data)) >= p.wal.option.MaxFileSize {
return ErrDataExceedFile
}
p.pendingSize += EstimateBlockSize(int64(len(data)))
p.pendingBytes = append(p.pendingBytes, data)
return nil
}
func (p *Pending) Flush(sync bool) ([]ChunkPos, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.pendingSize == 0 {
return []ChunkPos{}, nil
}
defer p.rest()
if p.pendingSize >= p.wal.option.MaxFileSize {
return nil, ErrDataExceedFile
}
p.wal.mutex.Lock()
defer p.wal.mutex.Unlock()
// if no left space, rotate a new one
if p.pendingSize+p.wal.active.Size() >= p.wal.option.MaxFileSize {
if err := p.wal.rotate(); err != nil {
return nil, err
}
}
allpos, err := p.wal.active.WriteAll(p.pendingBytes)
if err != nil {
return nil, err
}
if err = p.wal.sync(p.pendingSize, sync); err != nil {
return nil, err
}
return allpos, nil
}
func (p *Pending) rest() {
p.pendingBytes = make([][]byte, 0, len(p.pendingBytes)/2)
p.pendingSize = 0
}
func (p *Pending) Reset() {
p.mutex.Lock()
p.rest()
p.mutex.Unlock()
}