-
Notifications
You must be signed in to change notification settings - Fork 270
/
segment.go
426 lines (376 loc) · 11.4 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
package wkstore
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"go.uber.org/atomic"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type segment struct {
cfg *StoreConfig
baseMessageSeq uint32
segmentDir string
wklog.Log
segmentFile *os.File // message segment file
position uint32 // current write position
isSanityCheck atomic.Bool // sanity check
lastMsgSeq atomic.Uint32
fileSize int64
index *Index
sync.RWMutex
indexIntervalBytes int64 // index interval bytes
bytesSinceLastIndexEntry int64 // number of bytes written since the last index entry
topic *topic
}
func newSegment(topic *topic, baseMessageSeq uint32, cfg *StoreConfig) *segment {
segmentDir := filepath.Join(topic.topicDir, "logs")
s := &segment{
cfg: cfg,
segmentDir: segmentDir,
baseMessageSeq: baseMessageSeq,
indexIntervalBytes: 4 * 1024,
topic: topic,
}
err := os.MkdirAll(s.segmentDir, FileDefaultMode)
if err != nil {
s.Error("mkdir segment dir fail", zap.Error(err))
panic(err)
}
s.Log = wklog.NewWKLog(fmt.Sprintf("segment[%s]", s.segmentPath()))
s.index = NewIndex(s.indexPath(), baseMessageSeq)
return s
}
func (s *segment) appendMessages(msgs []Message) (int, error) {
if len(msgs) == 0 {
return 0, nil
}
firstData := msgs[0].Encode()
var msgData []byte
for i := 1; i < len(msgs); i++ {
msgData = msgs[i].Encode()
firstData = append(firstData, msgData...)
}
n, err := s.append(firstData)
if err != nil {
return 0, err
}
if s.bytesSinceLastIndexEntry > s.indexIntervalBytes || len(firstData) > int(s.indexIntervalBytes) {
err = s.index.Append(msgs[0].GetSeq(), s.position-uint32(n))
if err != nil {
return 0, err
}
s.bytesSinceLastIndexEntry = 0
}
s.bytesSinceLastIndexEntry += int64(n)
return n, nil
}
func (s *segment) append(data []byte) (int, error) {
s.Lock()
defer s.Unlock()
n, err := s.segmentFile.Write(data)
if err != nil {
return 0, errors.Wrap(err, "log write failed")
}
s.position += uint32(n)
return n, nil
}
// readMessages readMessages
func (s *segment) readMessages(messageSeq uint32, limit uint64, callback func(msg Message) error) error {
s.RLock()
defer s.RUnlock()
messageSeqPosition, err := s.index.Lookup(messageSeq)
if err != nil {
s.Error("readMessages-index.Lookup is error", zap.Error(err))
return err
}
var startPosition int64 = 0
if messageSeqPosition.MessageSeq == messageSeq {
startPosition = messageSeqPosition.Position
} else {
startPosition, _, err = s.readTargetPosition(messageSeqPosition.Position, messageSeq)
if err != nil {
if errors.Is(err, ErrorNotData) {
s.Debug("未读取到数据!")
return nil
}
s.Error("readMessages-readTargetPosition is error", zap.Error(err))
return err
}
}
err = s.readMessagesAtPosition(startPosition, limit, callback)
if err != nil {
s.Error("readMessages.readLogsAtPosition is error", zap.Error(err), zap.Int64("startPosition", startPosition), zap.Uint32("epectMessage", messageSeq), zap.Int64("actMessageSeq", int64(messageSeqPosition.MessageSeq)), zap.Int64("Position", messageSeqPosition.Position))
return err
}
return nil
}
// readMessageAtPosition 在文件指定开始位置,读取指定数量[limit]的日志数据
func (s *segment) readMessagesAtPosition(position int64, limit uint64, callback func(m Message) error) error {
var count uint64 = 0
var startPosition = position
for {
if startPosition >= s.getFileSize() || count >= limit {
// s.Info("startPosition已超过文件大小!", zap.Int64("", startPosition), zap.Int64("s.getFileSize()", s.getFileSize()), zap.Int("count", int(count)), zap.Uint64("limit", limit))
break
}
lg, msgSize, err := decodeMessageAt(s.segmentFile, startPosition, s.cfg.DecodeMessageFnc) // 解码日志
// data, startPosition, err = s.readLogDataAtPosition(startPosition)
if err != nil {
return err
}
startPosition = startPosition + int64(msgSize)
if callback != nil {
err = callback(lg)
if err != nil {
return err
}
}
count++
}
return nil
}
// 获取目标offset的文件位置
func (s *segment) readTargetPosition(startPosition int64, targetMessageSeq uint32) (int64, int64, error) {
if startPosition >= s.getFileSize() {
s.Debug("当前文件位置大于文件本身大小", zap.Int64("startPosition", startPosition), zap.Int64("fileSize", s.getFileSize()), zap.Uint32("targetMessageSeq", targetMessageSeq))
return 0, 0, ErrorNotData
}
resultOffset, dataLen, err := decodeMessageSeq(s.segmentFile, startPosition)
if err != nil {
s.Error("DecodeLogOffset is error", zap.Error(err))
return 0, 0, err
}
nextStartPosition := startPosition + int64(getMinMessageLen()+dataLen) // 下一条日志的文件开始位置
if resultOffset == targetMessageSeq {
return startPosition, nextStartPosition, nil
}
targetPosition, nextP, err := s.readTargetPosition(nextStartPosition, targetMessageSeq)
if err != nil {
return 0, 0, err
}
return targetPosition, nextP, nil
}
// init check segment
func (s *segment) init(mode SegmentMode) error {
if s.isSanityCheck.Load() {
return nil
}
var err error
pathStr := s.segmentPath()
if mode == SegmentModeAll {
s.segmentFile, err = os.OpenFile(pathStr, os.O_RDWR|os.O_CREATE|os.O_APPEND, FileDefaultMode)
} else {
s.segmentFile, err = os.OpenFile(pathStr, os.O_RDONLY|os.O_CREATE, FileDefaultMode)
}
if err != nil {
s.Error("open file fail!", zap.Error(err), zap.String("path", pathStr))
return err
}
fi, err := s.segmentFile.Stat()
if err != nil {
return err
} else if fi.Size() > 0 {
s.fileSize = fi.Size()
}
lastMsgStartPosition, err := s.sanityCheck()
if err != nil {
return err
}
if lastMsgStartPosition == 0 {
if s.position > 0 { // // lastMsgStartPosition等0 position大于0 说明有一条消息
s.lastMsgSeq.Store(uint32(s.baseMessageSeq + 1))
} else {
s.lastMsgSeq.Store(uint32(s.baseMessageSeq))
}
} else {
messageSeq, _, err := decodeMessageSeq(s.segmentFile, lastMsgStartPosition)
if err != nil {
return err
}
s.lastMsgSeq.Store(uint32(messageSeq))
}
s.isSanityCheck.Store(true)
return nil
}
// ReadAt ReadAt
func (s *segment) readAt(messageSeq uint32) (Message, error) {
s.Lock()
defer s.Unlock()
messageSeqPosition, err := s.index.Lookup(messageSeq)
if err != nil {
return nil, err
}
targetPosition, _, err := s.readTargetPosition(messageSeqPosition.Position, messageSeq)
if err != nil {
return nil, err
}
m, _, err := decodeMessageAt(s.segmentFile, targetPosition, s.cfg.DecodeMessageFnc)
return m, err
}
// SanityCheck Sanity check
func (s *segment) sanityCheck() (int64, error) {
stat, err := s.segmentFile.Stat()
if err != nil {
s.Error("Stat file fail!", zap.Error(err))
panic(err)
}
segmentSizeOfByte := stat.Size()
if segmentSizeOfByte == 0 {
return 0, nil
}
hasEndMgNumer, err := s.hasEndMagicNumer(segmentSizeOfByte)
if err != nil {
return 0, err
}
if !hasEndMgNumer {
s.Debug("No magic number at the end,sanity check mode is full")
return s.sanityFullCheck(segmentSizeOfByte)
}
if segmentSizeOfByte <= int64(s.cfg.EachMessagegMaxSizeOfBytes)+int64(getMinMessageLen()) {
s.Debug("File is too small,sanity check mode is full")
return s.sanityFullCheck(segmentSizeOfByte)
}
s.Debug("sanity check mode is simple")
check, lastMsgStartPosition, err := s.sanitySimpleCheck(segmentSizeOfByte)
if err != nil {
s.Warn("sanitySimpleCheck is error,start sanityFullCheck", zap.Error(err))
return s.sanityFullCheck(segmentSizeOfByte)
}
if !check {
s.Debug("sanity check simple mode is fail!Turn on full mode")
return s.sanityFullCheck(segmentSizeOfByte)
}
return lastMsgStartPosition, nil
}
// 返回最后一条日志的开始位置
func (s *segment) sanitySimpleCheck(segmentSizeOfByte int64) (bool, int64, error) {
assertDataSize := int64(s.cfg.EachMessagegMaxSizeOfBytes + getMinMessageLen()) // assert last message size
if assertDataSize >= segmentSizeOfByte {
s.Debug("assertDataSize > segmentSizeOfByte") // if return false will go to fullCheck
return false, 0, nil
}
offsetPosistion := s.index.LastPosition()
if offsetPosistion.Position <= 0 && offsetPosistion.MessageSeq <= 0 {
return false, 0, nil
}
startCheckPosition := offsetPosistion.Position
fmt.Println("startCheckPosition----->", startCheckPosition, segmentSizeOfByte)
lastMsgLen := 0 // last message len
for {
if startCheckPosition >= segmentSizeOfByte {
break
}
readLen, err := nextMessageIsVail(s.segmentFile, startCheckPosition)
if err != nil {
if errors.Is(err, io.EOF) { // 无内容了
break
}
startCheckPosition += int64(readLen)
continue
}
if readLen == 0 {
break
}
lastMsgLen = readLen
startCheckPosition += int64(readLen)
}
if lastMsgLen == 0 {
s.Debug("lastMsgLen is 0")
return false, 0, nil
}
s.position = uint32(startCheckPosition)
return true, startCheckPosition - int64(lastMsgLen), nil
}
func (s *segment) sanityFullCheck(segmentSizeOfByte int64) (int64, error) {
return s.check(segmentSizeOfByte)
}
func (s *segment) hasEndMagicNumer(segmentSizeOfByte int64) (bool, error) {
var p = make([]byte, 1)
_, err := s.segmentFile.ReadAt(p, segmentSizeOfByte-1)
if err != nil {
return false, err
}
return bytes.Equal(p, EndMagicNumber[:]), nil
}
// 检查消息文件的有效性。
// keepCorrect 是否保持消息文件的有效果性,如果为true 将删除掉无效的消息字节
func (s *segment) check(segmentSizeOfByte int64) (int64, error) {
// _, err := s.logFile.Seek(startPosition, io.SeekStart)
// if err != nil {
// return 0, err
// }
var checkPosition int64 = 0 // 文件开始检查的位置
var err error
var vailMsgLen uint32 = 0 // 整个消息的有效长度
lastMsgLen := 0 // 最后一条消息长度
for {
if vailMsgLen >= uint32(segmentSizeOfByte) {
break
}
len, err := nextMessageIsVail(s.segmentFile, checkPosition)
if err != nil {
break
}
checkPosition += int64(len)
lastMsgLen = len
vailMsgLen += uint32(len)
}
s.position = vailMsgLen
if s.position != uint32(segmentSizeOfByte) {
s.Warn("Back up the original log and remove the damaged log")
err = s.backup()
if err != nil {
s.Error("backup fail!", zap.Error(err))
return 0, err
}
err = s.segmentFile.Truncate(int64(s.position))
if err != nil {
s.Error("truncate fail", zap.Error(err))
return 0, err
}
_, err := s.segmentFile.Seek(int64(s.position), io.SeekStart)
if err != nil {
return 0, err
}
}
return int64(vailMsgLen) - int64(lastMsgLen), nil
}
func (s *segment) getFileSize() int64 {
if s.position == 0 {
return s.fileSize
}
return int64(s.position)
}
// sync
func (s *segment) sync() error {
return s.segmentFile.Sync()
}
func (s *segment) close() error {
err := s.sync()
s.segmentFile.Close()
s.index.Close()
return err
}
func (s *segment) backup() error {
_, err := wkutil.CopyFile(s.backupPath(time.Now().UnixNano()), s.segmentPath())
return err
}
func (s *segment) release() {
s.close()
}
func (s *segment) segmentPath() string {
return filepath.Join(s.segmentDir, fmt.Sprintf(fileFormat, s.baseMessageSeq, segmentSuffix))
}
func (s *segment) backupPath(t int64) string {
return filepath.Join(s.segmentDir, fmt.Sprintf(fileFormat, s.baseMessageSeq, fmt.Sprintf("%s.bak%d", segmentSuffix, t)))
}
func (s *segment) indexPath() string {
return filepath.Join(s.segmentDir, fmt.Sprintf(fileFormat, s.baseMessageSeq, indexSuffix))
}