forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
binlog_event.go
282 lines (244 loc) · 8.76 KB
/
binlog_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mysqlctl
import (
"bytes"
"encoding/binary"
"fmt"
mproto "github.com/youtube/vitess/go/mysql/proto"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
)
// binlogEvent wraps a raw packet buffer and provides methods to examine it
// by partially implementing blproto.BinlogEvent. These methods can be composed
// into flavor-specific event types to pull in common parsing code.
//
// The default v4 header format is:
// offset : size
// +============================+
// | timestamp 0 : 4 |
// +----------------------------+
// | type_code 4 : 1 |
// +----------------------------+
// | server_id 5 : 4 |
// +----------------------------+
// | event_length 9 : 4 |
// +----------------------------+
// | next_position 13 : 4 |
// +----------------------------+
// | flags 17 : 2 |
// +----------------------------+
// | extra_headers 19 : x-19 |
// +============================+
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
type binlogEvent []byte
// IsValid implements BinlogEvent.IsValid().
func (ev binlogEvent) IsValid() bool {
bufLen := len(ev.Bytes())
// The buffer must be at least 19 bytes to contain a valid header.
if bufLen < 19 {
return false
}
// It's now safe to use methods that examine header fields.
// Let's see if the event is right about its own size.
evLen := ev.Length()
if evLen < 19 || evLen != uint32(bufLen) {
return false
}
// Everything's there, so we shouldn't have any out-of-bounds issues while
// reading header fields or constant-offset data fields. We should still check
// bounds any time we compute an offset based on values in the buffer itself.
return true
}
// Bytes returns the underlying byte buffer.
func (ev binlogEvent) Bytes() []byte {
return []byte(ev)
}
// Type returns the type_code field from the header.
func (ev binlogEvent) Type() byte {
return ev.Bytes()[4]
}
// Flags returns the flags field from the header.
func (ev binlogEvent) Flags() uint16 {
return binary.LittleEndian.Uint16(ev.Bytes()[17 : 17+2])
}
// Timestamp returns the timestamp field from the header.
func (ev binlogEvent) Timestamp() uint32 {
return binary.LittleEndian.Uint32(ev.Bytes()[:4])
}
// ServerID returns the server_id field from the header.
func (ev binlogEvent) ServerID() uint32 {
return binary.LittleEndian.Uint32(ev.Bytes()[5 : 5+4])
}
// Length returns the event_length field from the header.
func (ev binlogEvent) Length() uint32 {
return binary.LittleEndian.Uint32(ev.Bytes()[9 : 9+4])
}
// IsFormatDescription implements BinlogEvent.IsFormatDescription().
func (ev binlogEvent) IsFormatDescription() bool {
return ev.Type() == 15
}
// IsQuery implements BinlogEvent.IsQuery().
func (ev binlogEvent) IsQuery() bool {
return ev.Type() == 2
}
// IsRotate implements BinlogEvent.IsRotate().
func (ev binlogEvent) IsRotate() bool {
return ev.Type() == 4
}
// IsXID implements BinlogEvent.IsXID().
func (ev binlogEvent) IsXID() bool {
return ev.Type() == 16
}
// IsIntVar implements BinlogEvent.IsIntVar().
func (ev binlogEvent) IsIntVar() bool {
return ev.Type() == 5
}
// IsRand implements BinlogEvent.IsRand().
func (ev binlogEvent) IsRand() bool {
return ev.Type() == 13
}
// Format implements BinlogEvent.Format().
//
// Expected format (L = total length of event data):
// # bytes field
// 2 format version
// 50 server version string, 0-padded but not necessarily 0-terminated
// 4 timestamp (same as timestamp header field)
// 1 header length
func (ev binlogEvent) Format() (f blproto.BinlogFormat, err error) {
// FORMAT_DESCRIPTION_EVENT has a fixed header size of 19 because we have to
// read it before we know the header_length.
data := ev.Bytes()[19:]
f.FormatVersion = binary.LittleEndian.Uint16(data[:2])
if f.FormatVersion != 4 {
return f, fmt.Errorf("format version = %d, we only support version 4", f.FormatVersion)
}
f.ServerVersion = string(bytes.TrimRight(data[2:2+50], "\x00"))
f.HeaderLength = data[2+50+4]
if f.HeaderLength < 19 {
return f, fmt.Errorf("header length = %d, should be >= 19", f.HeaderLength)
}
// MySQL/MariaDB 5.3+ always adds a 4-byte checksum to the end of a
// FORMAT_DESCRIPTION_EVENT, regardless of the server setting. The byte
// immediately before that checksum tells us which checksum algorithm (if any)
// is used for the rest of the events.
f.ChecksumAlgorithm = data[len(data)-5]
return f, nil
}
// Query implements BinlogEvent.Query().
//
// Expected format (L = total length of event data):
// # bytes field
// 4 thread_id
// 4 execution time
// 1 length of db_name, not including NULL terminator (X)
// 2 error code
// 2 length of status vars block (Y)
// Y status vars block
// X+1 db_name + NULL terminator
// L-X-1-Y SQL statement (no NULL terminator)
func (ev binlogEvent) Query(f blproto.BinlogFormat) (query blproto.Query, err error) {
const varsPos = 4 + 4 + 1 + 2 + 2
data := ev.Bytes()[f.HeaderLength:]
// length of database name
dbLen := int(data[4+4])
// length of status variables block
varsLen := int(binary.LittleEndian.Uint16(data[4+4+1+2 : 4+4+1+2+2]))
// position of database name
dbPos := varsPos + varsLen
// position of SQL query
sqlPos := dbPos + dbLen + 1 // +1 for NULL terminator
if sqlPos > len(data) {
return query, fmt.Errorf("SQL query position overflows buffer (%v > %v)", sqlPos, len(data))
}
// We've checked that the buffer is big enough for sql, so everything before
// it (db and vars) is in-bounds too.
query.Database = string(data[dbPos : dbPos+dbLen])
query.Sql = data[sqlPos:]
// Scan the status vars for ones we care about. This requires us to know the
// size of every var that comes before the ones we're interested in.
vars := data[varsPos : varsPos+varsLen]
varsLoop:
for pos := 0; pos < len(vars); {
code := vars[pos]
pos++
// All codes are optional, but if present they must occur in numerically
// increasing order (except for 6 which occurs in the place of 2) to allow
// for backward compatibility.
switch code {
case 0, 3: // Q_FLAGS2_CODE, Q_AUTO_INCREMENT
pos += 4
case 1: // Q_SQL_MODE_CODE
pos += 8
case 2: // Q_CATALOG_CODE (used in MySQL 5.0.0 - 5.0.3)
if pos+1 > len(vars) {
return query, fmt.Errorf("Q_CATALOG_CODE status var overflows buffer (%v + 1 > %v)", pos, len(vars))
}
pos += 1 + int(vars[pos]) + 1
case 6: // Q_CATALOG_NZ_CODE (used in MySQL > 5.0.3 to replace 2)
if pos+1 > len(vars) {
return query, fmt.Errorf("Q_CATALOG_NZ_CODE status var overflows buffer (%v + 1 > %v)", pos, len(vars))
}
pos += 1 + int(vars[pos])
case 4: // Q_CHARSET_CODE
if pos+6 > len(vars) {
return query, fmt.Errorf("Q_CHARSET_CODE status var overflows buffer (%v + 6 > %v)", pos, len(vars))
}
query.Charset = &mproto.Charset{
Client: int(binary.LittleEndian.Uint16(vars[pos : pos+2])),
Conn: int(binary.LittleEndian.Uint16(vars[pos+2 : pos+4])),
Server: int(binary.LittleEndian.Uint16(vars[pos+4 : pos+6])),
}
pos += 6
default:
// If we see something higher than what we're interested in, we can stop.
break varsLoop
}
}
return query, nil
}
// IntVar implements BinlogEvent.IntVar().
//
// Expected format (L = total length of event data):
// # bytes field
// 1 variable ID
// 8 variable value
func (ev binlogEvent) IntVar(f blproto.BinlogFormat) (name string, value uint64, err error) {
data := ev.Bytes()[f.HeaderLength:]
switch data[0] {
case 1:
name = "LAST_INSERT_ID"
case 2:
name = "INSERT_ID"
default:
return "", 0, fmt.Errorf("invalid IntVar ID: %v", data[0])
}
value = binary.LittleEndian.Uint64(data[1 : 1+8])
return name, value, nil
}
// Rand implements BinlogEvent.Rand().
//
// Expected format (L = total length of event data):
// # bytes field
// 8 seed 1
// 8 seed 2
func (ev binlogEvent) Rand(f blproto.BinlogFormat) (seed1 uint64, seed2 uint64, err error) {
data := ev.Bytes()[f.HeaderLength:]
seed1 = binary.LittleEndian.Uint64(data[0:8])
seed2 = binary.LittleEndian.Uint64(data[8 : 8+8])
return seed1, seed2, nil
}
// IsBeginGTID implements BinlogEvent.IsBeginGTID().
func (ev binlogEvent) IsBeginGTID(f blproto.BinlogFormat) bool {
return false
}
// These constants are common between MariaDB 10.0 and MySQL 5.6.
const (
// BinlogChecksumAlgOff indicates that checksums are supported but off.
BinlogChecksumAlgOff = 0
// BinlogChecksumAlgCRC32 indicates that CRC32 checksums are used.
BinlogChecksumAlgCRC32 = 1
// BinlogChecksumAlgUndef indicates that checksums are not supported.
BinlogChecksumAlgUndef = 255
)