-
-
Notifications
You must be signed in to change notification settings - Fork 546
/
session_backup.go
126 lines (104 loc) · 3 KB
/
session_backup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package session
import (
"context"
"fmt"
"strings"
"sync"
mysqlDriver "github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
)
// chanBackup 备份channal数据,用来传递备份的sql等信息
type chanBackup struct {
// values []interface{}
// 库名
dbname string
values []interface{}
record *Record
}
func (s *session) ProcessChanBackup(wg *sync.WaitGroup) {
for {
r := <-s.chBackupRecord
if r == nil {
s.flushBackupRecord(s.lastBackupTable, s.myRecord)
wg.Done()
break
}
// flush标志. 不能在外面调用flush函数,会导致线程并发操作,写入数据错误
// 如数据尚未进入到ch通道,此时调用flush,数据无法正确入库
if r.values == nil {
s.flushBackupRecord(r.dbname, r.record)
} else {
s.writeBackupRecord(r.dbname, r.record, r.values)
}
}
}
func (s *session) runBackup(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(1)
s.chBackupRecord = make(chan *chanBackup, 50)
go s.ProcessChanBackup(&wg)
defer func() {
close(s.chBackupRecord)
wg.Wait()
// 清空临时的库名
s.lastBackupTable = ""
}()
for _, record := range s.recordSets.All() {
if s.checkSqlIsDML(record) || s.checkSqlIsDDL(record) {
s.myRecord = record
longDataType := s.mysqlCreateBackupTable(record)
// errno := s.mysqlCreateBackupTable(record)
// if errno == 2 {
// break
// }
if record.TableInfo == nil {
s.AppendErrorNo(ErrNotFoundTableInfo)
} else {
s.mysqlBackupSql(record, longDataType)
}
if s.hasError() {
break
}
}
// // 进程Killed
// if err := checkClose(ctx); err != nil {
// log.Warn("Killed: ", err)
// s.AppendErrorMessage("Operation has been killed!")
// break
// }
}
}
// 解析的sql写入缓存,并定期入库
func (s *session) writeBackupRecord(dbname string, record *Record, values []interface{}) {
s.insertBuffer = append(s.insertBuffer, values...)
// 每500行insert提交一次
if len(s.insertBuffer) >= 500*11 {
s.flushBackupRecord(dbname, record)
}
}
// flush用以写入当前insert缓存,并清空缓存.
func (s *session) flushBackupRecord(dbname string, record *Record) {
// log.Info("flush ", len(s.insertBuffer))
if len(s.insertBuffer) > 0 {
const backupRecordColumnCount int = 11
const rowSQL = "(?,?,?,?,?,?,?,?,?,?,NOW(),?),"
tableName := fmt.Sprintf("`%s`.`%s`", dbname, remoteBackupTable)
sql := "insert into %s values%s"
values := strings.TrimRight(
strings.Repeat(rowSQL, len(s.insertBuffer)/backupRecordColumnCount), ",")
err := s.backupdb.Exec(fmt.Sprintf(sql, tableName, values),
s.insertBuffer...).Error
if err != nil {
log.Error(err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.recordSets.MaxLevel = 2
record.StageStatus = StatusBackupFail
record.AppendErrorMessage(myErr.Message)
}
}
// s.BackupTotalRows += len(s.insertBuffer) / backupRecordColumnCount
// s.SetMyProcessInfo(record.Sql, time.Now(),
// float64(s.BackupTotalRows)/float64(s.TotalChangeRows))
s.insertBuffer = nil
}
}