-
Notifications
You must be signed in to change notification settings - Fork 0
/
aof.go
145 lines (135 loc) · 3.18 KB
/
aof.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// A simple concurrency safe AppendOnlyFile format for log-structured storage purposes .
package aof
import (
"encoding/hex"
"bytes"
"sync"
"fmt"
"os"
"io"
)
// Our AOF struct .
type AOF struct {
file *os.File
size int64
sync.RWMutex
}
// Open an AOF datafile .
func Open(filename string, mode os.FileMode) (this *AOF, err error) {
this = new(AOF)
this.file, err = os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, mode)
if err != nil {
return nil, err
}
finfo, err := this.file.Stat()
if err != nil {
this.Close()
return nil, err
}
this.size = finfo.Size()
return this, nil
}
// Write from an io.Reader .
// It returns the id 'pointer' of the inserted data and error if any .
func (this *AOF) Put(src io.Reader) (string, error) {
this.Lock()
defer this.Unlock()
offset := this.size
length, err := io.Copy(this.file, src)
if length == 0 && err != nil {
return ``, err
}
if err = this.file.Sync(); err != nil {
return ``, err
}
this.size += int64(length)
return hex.EncodeToString([]byte(fmt.Sprintf(`%d:%d`, offset, length))), nil
}
// Read the data of the pointer "id" .
func (this *AOF) Get(id string) *io.SectionReader {
this.RLock()
defer this.RUnlock()
var offset, length int64
idBytes, e := hex.DecodeString(id)
if e != nil {
return nil
}
id = string(idBytes)
fmt.Sscanf(id, `%d:%d`, &offset, &length)
return io.NewSectionReader(this.file, offset, length)
}
// Scan the datafile using a custom separator and function.
// The provided function has two params, data and whether we at the end or not .
// This function will lock the whole file till it ends .
func (this *AOF) Scan(sep []byte, fn func(data []byte, atEOF bool) bool) {
this.Lock()
defer this.Unlock()
this.file.Seek(0, 0)
data := []byte{}
for {
tmp := make([]byte, len(sep))
n, e := this.file.Read(tmp)
if n > 0 {
data = append(data, tmp[0:n] ...)
}
if e != nil || n == 0 {
if len(data) > 0 {
fn(bytes.Trim(data, string(sep)), true)
}
break
}
if bytes.Equal(sep, tmp) {
if ! fn(bytes.Trim(data, string(sep)), false) {
break
}
data = []byte{}
}
}
data = []byte{}
}
// Scan the datafile in reverse order using a custom separator and function.
// The provided function has two params, data and whether we at the end or not .
// This function will lock the whole file till it ends .
func (this *AOF) ReverseScan(sep []byte, fn func(data []byte, atEOF bool) bool) {
this.Lock()
defer this.Unlock()
pos := int64(0)
done := int64(0)
data := []byte{}
for {
this.file.Seek(pos, 2)
tmp := make([]byte, len(sep))
n, _ := this.file.Read(tmp)
pos -= int64(len(sep))
if n > 0 {
done += int64(n)
data = append(tmp, data ...)
}
if bytes.Equal(sep, tmp) {
if ! fn(bytes.Trim(data, string(sep)), false) {
break
}
data = []byte{}
}
if done >= this.size {
fn(bytes.Trim(data, string(sep)), true)
break
}
}
data = []byte{}
}
// Clear the contents of the file
func (this *AOF) Clear() error {
return this.file.Truncate(0)
}
// Return the size of our log file .
func (this *AOF) Size() int64 {
this.RLock()
defer this.RUnlock()
return this.size
}
// Close the AOF file .
func (this *AOF) Close() {
this.file.Close()
this = nil
}