/
file_writer.go
168 lines (137 loc) · 3.92 KB
/
file_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package tunnel
import (
"bytes"
"encoding/binary"
"io"
"os"
"sync/atomic"
"time"
LOG "github.com/vinllen/log4go"
)
const (
OPEN_FILE_FLAGS = os.O_CREATE | os.O_RDWR | os.O_TRUNC
)
const (
FILE_MAGIC_NUMBER uint64 = 0xeeeeeeeeee201314
FILE_PROTOCOL_NUMBER uint32 = 1
BLOCK_HEADER_SIZE = 20
)
var globalInitializer = int32(0)
var oplogMessage chan *TMessage
type FileWriter struct {
// local file folder path
Local string
// data file header
fileHeader *FileHeader
// data file handle
dataFile *DataFile
logs uint64
}
/**
* File Structure
*
* |----- Header ------|------ OplogBlock ------|------ OplogBlock --------| ......
* |<--- 32bytes ---->|
*
*/
type FileHeader struct {
Magic uint64
Protocol uint32
Checksum uint32
Reserved [16]byte
}
type DataFile struct {
filehandle *os.File
}
func (dataFile *DataFile) WriteHeader() {
fileHeader := new(FileHeader)
fileHeader.Magic = FILE_MAGIC_NUMBER
fileHeader.Protocol = FILE_PROTOCOL_NUMBER
buffer := bytes.Buffer{}
binary.Write(&buffer, binary.BigEndian, fileHeader.Magic)
binary.Write(&buffer, binary.BigEndian, fileHeader.Protocol)
binary.Write(&buffer, binary.BigEndian, fileHeader.Checksum)
binary.Write(&buffer, binary.BigEndian, fileHeader.Reserved)
dataFile.filehandle.Write(buffer.Bytes())
dataFile.filehandle.Sync()
dataFile.filehandle.Seek(32, 0)
}
func (dataFile *DataFile) ReadHeader() *FileHeader {
fileHeader := &FileHeader{}
header := [32]byte{}
io.ReadFull(dataFile.filehandle, header[:])
buffer := bytes.NewBuffer(header[:])
binary.Read(buffer, binary.BigEndian, &fileHeader.Magic)
binary.Read(buffer, binary.BigEndian, &fileHeader.Protocol)
binary.Read(buffer, binary.BigEndian, &fileHeader.Checksum)
binary.Read(buffer, binary.BigEndian, &fileHeader.Reserved)
return fileHeader
}
func (tunnel *FileWriter) Name() string {
return "file"
}
func (tunnel *FileWriter) Send(message *WMessage) int64 {
if message.Tag&MsgProbe == 0 {
oplogMessage <- message.TMessage
}
return 0
}
func (tunnel *FileWriter) SyncToDisk() {
buffer := &bytes.Buffer{}
for {
select {
case message := <-oplogMessage:
// oplogs array
for _, log := range message.RawLogs {
tunnel.logs++
binary.Write(buffer, binary.BigEndian, uint32(len(log)))
binary.Write(buffer, binary.BigEndian, log)
}
tag := message.Tag | MsgPersistent | MsgStorageBackend
headerBuffer := &bytes.Buffer{}
binary.Write(headerBuffer, binary.BigEndian, message.Checksum)
binary.Write(headerBuffer, binary.BigEndian, tag)
binary.Write(headerBuffer, binary.BigEndian, message.Shard)
binary.Write(headerBuffer, binary.BigEndian, message.Compress)
binary.Write(headerBuffer, binary.BigEndian, uint32(0xeeeeeeee))
binary.Write(headerBuffer, binary.BigEndian, uint32(buffer.Len()))
tunnel.dataFile.filehandle.Write(headerBuffer.Bytes())
tunnel.dataFile.filehandle.Write(buffer.Bytes())
buffer.Reset()
case <-time.After(time.Millisecond * 1000):
LOG.Info("File tunnel sync flush. total oplogs %d", tunnel.logs)
tunnel.dataFile.filehandle.Sync()
}
}
}
func _Open(path string) (*os.File, bool) {
if file, err := os.OpenFile(path, OPEN_FILE_FLAGS, os.ModePerm); err == nil {
return file, true
}
LOG.Critical("File tunnel create data file failed")
return nil, false
}
func (tunnel *FileWriter) Prepare() bool {
if atomic.CompareAndSwapInt32(&globalInitializer, 0, 1) {
if file, ok := _Open(tunnel.Local); ok {
tunnel.dataFile = &DataFile{filehandle: file}
} else {
LOG.Critical("File tunnel open failed")
return false
}
if info, err := os.Stat(tunnel.Local); err != nil || info.IsDir() {
LOG.Critical("File tunnel check path failed. %v", err)
return false
}
tunnel.dataFile.WriteHeader()
oplogMessage = make(chan *TMessage, 8192)
go tunnel.SyncToDisk()
}
return true
}
func (tunnel *FileWriter) AckRequired() bool {
return false
}
func (tunnel *FileWriter) ParsedLogsRequired() bool {
return false
}