-
Notifications
You must be signed in to change notification settings - Fork 0
/
log_file.go
174 lines (159 loc) · 4 KB
/
log_file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package db
import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"github.com/he-wen-yao/bitcask-kvdb/util"
)
// 用来定义数据库所支持的日志类型
type logType int
const (
STR_TYPE logType = iota
LIST_TYPE
// 日志文件的权限 111
LOG_FILE_PERM = 0644
// LOG_FILE_PREFIX 日志文件的前缀
LOG_FILE_PREFIX = "kv.data"
)
// 定义错误信息
var (
// ErrWriteSizeNotEqual 写入数据大小不一致错误
ErrWriteSizeNotEqual = errors.New("logfile: write size is not equal to entry size")
LogType2FileName = map[logType]string{
STR_TYPE: "string",
}
FileName2LogType = map[string]logType{
"string": STR_TYPE,
}
)
// 日志文件
type logFile struct {
fid int64
// 读写锁
mu *sync.RWMutex
// 实际的日志文件
file *os.File
// 偏移量记录当前日志写到哪里
offset int64
}
// NewLogFile 根据目录和日志类型创建日志文件
func NewLogFile(filePath string, logType logType) (lf *logFile, err error) {
fileName := filepath.Join(filePath, fmt.Sprintf("%s.%s", LOG_FILE_PREFIX, LogType2FileName[logType]))
f, err := util.CreateFile(fileName)
if err != nil {
return nil, err
}
stat, err := os.Stat(fileName)
if err != nil {
return nil, err
}
return &logFile{offset: stat.Size(), file: f}, nil
}
// AppendEntry 向当前日志文件追加日志记录
func (lf *logFile) AppendEntry(le *logEntry) error {
lf.mu.Lock()
defer lf.mu.Unlock()
buf, _ := le.Encode()
// 如果日志内容为空,不写入
if len(buf) <= 0 {
return nil
}
// 将日志记录写道指定位置
n, err := lf.file.Write(buf)
if err != nil {
return err
}
// 如果实际写入和预计写入不一致,抛出错误
if n != len(buf) {
return ErrWriteSizeNotEqual
}
atomic.AddInt64(&lf.offset, int64(n))
return nil
}
// Remove 移除当前日志文件
func (lf *logFile) Remove() error {
lf.mu.Lock()
defer lf.mu.Unlock()
if err := lf.file.Close(); err != nil {
return err
}
return os.Remove(lf.file.Name())
}
// ToOlderLogFile 转为旧的日志文件
func (lf *logFile) ToOlderLogFile() error {
var (
err error
dstFile *os.File
srcInfo os.FileInfo
)
// 获取 lf 所在的目录以及文件名
filePath, fileName := path.Split(lf.file.Name())
// older logFile 所在目录
olderPath := path.Join(filePath, "older")
// 备份时需要检查有没有 older 目录
if !util.PathExist(olderPath) {
if err := os.MkdirAll(olderPath, os.ModePerm); err != nil {
return err
}
}
// older logFile 的文件名
destFileName := path.Join(olderPath, fileName)
if dstFile, err = os.Create(destFileName); err != nil {
return err
}
defer util.CloseFile(dstFile)
if _, err = io.Copy(dstFile, lf.file); err != nil {
return err
}
if srcInfo, err = os.Stat(filePath); err != nil {
return err
}
// 修改 older logFile 权限
return os.Chmod(destFileName, srcInfo.Mode())
}
// ReadLogEntry 在日志文件中读取一条日志
func (lf *logFile) ReadLogEntry(offset int64) (*logEntry, error) {
// 去除记录头
headerBuf, err := lf.readBytes(offset, LOG_ENTRY_HEADER_SIZE)
if err != nil {
return nil, err
}
header, err := DecodeHeader(headerBuf)
if err != nil {
return nil, err
}
// 获取 key 和 value 的值
buf, err := lf.readBytes(offset+LOG_ENTRY_HEADER_SIZE, int64(header.KeySize+header.ValueSize))
if err != nil {
return nil, err
}
header.Key = buf[0:header.KeySize]
header.Value = buf[header.KeySize:]
return header, nil
}
// ReadAllLogEntryFromStart 从头开始读取所有日志记录,读取时处理日志
func (lf *logFile) ReadAllLogEntryFromStart(process func(entry *logEntry, offset int64)) error {
offset := int64(0)
for {
entry, err := lf.ReadLogEntry(offset)
if err != nil {
if err.Error() == "EOF" || entry == nil {
return nil
}
return err
}
process(entry, offset)
offset += entry.GetSize()
}
}
// ReadLogEntry 读取长度为 n 字节数据
func (lf *logFile) readBytes(offset, n int64) (buf []byte, err error) {
buf = make([]byte, n)
_, err = lf.file.ReadAt(buf, offset)
return
}