forked from go-mysql-org/go-mysql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dump.go
120 lines (98 loc) · 2.59 KB
/
dump.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package canal
import (
"strconv"
"time"
"github.com/juju/errors"
"github.com/siddontang/go-mysql/dump"
"github.com/siddontang/go-mysql/schema"
"github.com/siddontang/go/log"
)
type dumpParseHandler struct {
c *Canal
name string
pos uint64
}
func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
h.name = name
h.pos = pos
return nil
}
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
if h.c.isClosed() {
return errCanalClosed
}
tableInfo, err := h.c.GetTable(db, table)
if err != nil {
log.Errorf("get %s.%s information err: %v", db, table, err)
return errors.Trace(err)
}
vs := make([]interface{}, len(values))
for i, v := range values {
if v == "NULL" {
vs[i] = nil
} else if v[0] != '\'' {
if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
n, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
return dump.ErrSkip
}
vs[i] = n
} else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
f, err := strconv.ParseFloat(v, 64)
if err != nil {
log.Errorf("parse row %v at %d error %v, skip", values, i, err)
return dump.ErrSkip
}
vs[i] = f
} else {
log.Errorf("parse row %v error, invalid type at %d, skip", values, i)
return dump.ErrSkip
}
} else {
vs[i] = v[1 : len(v)-1]
}
}
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs})
return h.c.travelRowsEventHandler(events)
}
func (c *Canal) AddDumpDatabases(dbs ...string) {
if c.dumper == nil {
return
}
c.dumper.AddDatabases(dbs...)
}
func (c *Canal) AddDumpTables(db string, tables ...string) {
if c.dumper == nil {
return
}
c.dumper.AddTables(db, tables...)
}
func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
if c.dumper == nil {
return
}
c.dumper.AddIgnoreTables(db, tables...)
}
func (c *Canal) tryDump() error {
if len(c.master.Name) > 0 && c.master.Position > 0 {
// we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos (%s, %d)", c.master.Name, c.master.Position)
return nil
}
if c.dumper == nil {
log.Info("skip dump, no mysqldump")
return nil
}
h := &dumpParseHandler{c: c}
start := time.Now()
log.Info("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil {
return errors.Trace(err)
}
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)",
time.Now().Sub(start).Seconds(), h.name, h.pos)
c.master.Update(h.name, uint32(h.pos))
c.master.Save(true)
return nil
}