forked from alibaba/MongoShake
/
file_reader.go
137 lines (120 loc) · 3.66 KB
/
file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package tunnel
import (
"bytes"
"encoding/binary"
"errors"
"io"
"os"
LOG "github.com/vinllen/log4go"
)
type FileReader struct {
File string
dataFile *DataFile
pipe []chan *TMessage
replayers []Replayer
}
func (tunnel *FileReader) Link(relativeReplayer []Replayer) error {
tunnel.replayers = relativeReplayer
tunnel.pipe = make([]chan *TMessage, 0)
for i := 0; i != len(tunnel.replayers); i++ {
ch := make(chan *TMessage)
tunnel.pipe = append(tunnel.pipe, ch)
go tunnel.consume(ch)
}
var file *os.File
var err error
if file, err = os.Open(tunnel.File); err != nil {
LOG.Critical("File tunnel reader open %s failed, %v", tunnel.File, err)
return err
}
tunnel.dataFile = &DataFile{filehandle: file}
if fileHeader := tunnel.dataFile.ReadHeader(); fileHeader.Magic != FILE_MAGIC_NUMBER || fileHeader.Protocol != FILE_PROTOCOL_NUMBER {
LOG.Critical("File is not belong to mongoshake. magic header or protocol header is invalid")
return errors.New("file magic number or protocol number is invalid")
}
go tunnel.read()
return nil
}
func (tunnel *FileReader) consume(pipe <-chan *TMessage) {
seqKey := 1
for msg := range pipe {
// hash corresponding replayer
seqKey++
switch tunnel.replayers[msg.Shard].Sync(msg, func(context *TMessage, seq int) func() {
return func() {
LOG.Info("Sync tunnel message successful, signature: %d, %d", context.Checksum, seq)
}
}(msg, seqKey)) {
case ReplyChecksumInvalid:
fallthrough
case ReplyRetransmission:
fallthrough
case ReplyCompressorNotSupported:
fallthrough
case ReplyNetworkOpFail:
LOG.Warn("File tunnel rejected by replayer-%d", msg.Shard)
case ReplyError:
fallthrough
case ReplyServerFault:
LOG.Critical("File tunnel handle server fault")
}
}
}
func (tunnel *FileReader) read() {
defer tunnel.dataFile.filehandle.Close()
bufferedReader := tunnel.dataFile.filehandle
bits := make([]byte, 4, 4)
totalLogs := 0
for {
message := new(TMessage)
// for checksum multi read() is acceptable, the underlaying reader is Buffered
if n, err := io.ReadFull(bufferedReader, bits); n != len(bits) || err != nil {
break
}
message.Checksum = binary.BigEndian.Uint32(bits[:])
// for tag
io.ReadFull(bufferedReader, bits)
message.Tag = binary.BigEndian.Uint32(bits[:])
// for shard
io.ReadFull(bufferedReader, bits)
message.Shard = binary.BigEndian.Uint32(bits[:])
// for compress
io.ReadFull(bufferedReader, bits)
message.Compress = binary.BigEndian.Uint32(bits[:])
// for 0xeeeeeeee
io.ReadFull(bufferedReader, bits)
if !bytes.Equal(bits, []byte{0xee, 0xee, 0xee, 0xee}) {
LOG.Critical("File oplog block magic is not 0xeeeeeeee. found 0x%x", bits)
break
}
io.ReadFull(bufferedReader, bits)
blockRemained := binary.BigEndian.Uint32(bits)
logs := [][]byte{}
for blockRemained > 0 {
// oplog entry length
io.ReadFull(bufferedReader, bits[:])
oplogLength := binary.BigEndian.Uint32(bits[:])
log := make([]byte, oplogLength, oplogLength)
if _, err := io.ReadFull(bufferedReader, log); err == io.EOF {
break
}
logs = append(logs, log)
// header + body
blockRemained -= (4 + oplogLength)
totalLogs++
}
message.RawLogs = logs
if message.Shard < 0 {
LOG.Warn("Oplog hashed value is bad negative")
break
}
message.Tag |= MsgRetransmission
// resharding
if message.Shard >= uint32(len(tunnel.pipe)) {
message.Shard %= uint32(len(tunnel.pipe))
}
tunnel.pipe[message.Shard] <- message
LOG.Info("File tunnel reader extract oplogs with shard[%d], compressor[%d], count (%d)", message.Shard, message.Compress, len(message.RawLogs))
}
LOG.Info("File tunnel reader complete. total oplogs %d", totalLogs)
}