/
deCodeRawData.go
66 lines (56 loc) · 1.58 KB
/
deCodeRawData.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package replication
import (
"context"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math/rand"
"net"
"time"
)
func NewRawDataDecode(log *logrus.Logger) (*BinlogSyncer, error) {
local, _ := time.LoadLocation("Asia/Shanghai")
cfg := BinlogSyncerConfig{
ServerID: uint32(rand.New(rand.NewSource(time.Now().UnixNano())).Int31n(11111)),
Flavor: "mysql",
Logger: log,
UseDecimal: true,
ParseTime: true,
TimestampStringLocation: local,
}
if cfg.TimestampStringLocation == nil {
return nil, errors.Errorf("%s\n", "time zone cannot be empty")
}
if cfg.Logger == nil {
return nil, errors.Errorf("%s\n", "Log module cannot be empty")
}
if cfg.ServerID == 0 {
return nil, errors.Errorf("can't use 0 as the server ID")
}
if cfg.Dialer == nil {
dialer := &net.Dialer{}
cfg.Dialer = dialer.DialContext
}
b := new(BinlogSyncer)
b.cfg = cfg
b.parser = NewBinlogParser()
b.parser.SetFlavor(cfg.Flavor)
b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)
b.parser.SetUseDecimal(b.cfg.UseDecimal)
b.parser.SetVerifyChecksum(b.cfg.VerifyChecksum)
b.ctx, b.cancel = context.WithCancel(context.Background())
return b, nil
}
func (b *BinlogSyncer) Decode(data []byte) (*BinlogEvent, error) {
//skip OK byte, 0x00
data = data[1:]
if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) {
data = data[2:]
}
e, err := b.parser.Parse(data)
if err != nil {
return nil, err
}
return e, nil
}