-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
191 lines (180 loc) · 5.74 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package canal
import (
"fmt"
"github.com/ayah-go/canal-go/client"
pbe "github.com/ayah-go/canal-go/protocol/entry"
"github.com/golang/protobuf/proto"
"log"
"strings"
"time"
)
type Sql struct {
Content string
Type string
Insert string
}
type Channel struct {
ChannelSql chan []Sql
Stop bool
}
func RunCanalClient(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32, regex string) *Channel {
result := &Channel{
ChannelSql: make(chan []Sql),
Stop: false,
}
go func() {
defer func() {
fmt.Println("退出sql消费")
}()
// 创建链接
connector := client.NewSimpleCanalConnector(address, port, username, password, destination, soTimeOut, idleTimeOut)
err := connector.Connect()
if err != nil {
//log.Println(err)
log.Println("canal连接失败:", err)
//os.Exit(1)
return
}
log.Println("canal连接成功")
// filter
err = connector.Subscribe(regex)
if err != nil {
//log.Println(err)
log.Println("canal订阅失败:", err)
//os.Exit(1)
return
}
log.Println("canal订阅成功")
// 阻塞
for {
if result.Stop {
log.Println("canal Stop")
break
}
message, err := connector.Get(100, nil, nil)
if err != nil {
log.Println("canal get获取失败:", err)
//os.Exit(1)
return
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(300 * time.Millisecond)
log.Println("canal no sql data...")
continue
}
sqls := GetSql(message.Entries)
log.Println("canal get sqls, len is ", len(sqls))
if len(sqls) == 0 {
continue
}
log.Println("canal put sqls to channel", sqls)
result.ChannelSql <- sqls
}
}()
return result
}
/*
FormatColName 格式化列名 使用反引号包围
*/
func FormatColName(col *pbe.Column) string {
return "`" + col.Name + "`"
}
/*
FormatColValue 格式化列数据 除去int bit类型的都要使用单引号包围
*/
func FormatColValue(col *pbe.Column) string {
if strings.Index(col.MysqlType, "int") > -1 || strings.Index(col.MysqlType, "bit") > -1 {
if col.Value == "" {
return "null"
}
return col.Value
} else {
if col.Value == "" {
return "null"
}
return "'" + col.Value + "'"
}
}
func FormatSql(cols []*pbe.Column, isUpdate bool) (keyColName string, keyColValue string, colNames string, colValues string, updateChanges string) {
log.Println(cols)
for index, col := range cols {
if col.IsKey {
keyColName = FormatColName(col)
keyColValue = FormatColValue(col)
} else {
if index != len(cols)-1 {
colNames += FormatColName(col) + ","
colValues += FormatColValue(col) + ","
if isUpdate && col.Updated {
// 更新需要拼接为 col=value,
updateChanges += FormatColName(col) + "=" + FormatColValue(col) + ","
}
} else {
colNames += FormatColName(col)
colValues += FormatColValue(col)
if isUpdate && col.Updated {
// 更新需要拼接为 col=value
updateChanges += FormatColName(col) + "=" + FormatColValue(col)
}
}
}
}
if strings.HasSuffix(updateChanges, ",") {
updateChanges = updateChanges[:len(updateChanges)-len(",")]
}
return
}
func GetSql(entrys []pbe.Entry) []Sql {
var sqls []Sql
for _, entry := range entrys {
if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pbe.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
if err != nil {
fmt.Println("出现异常", err)
return nil
}
eventType := rowChange.GetEventType()
header := entry.GetHeader()
for _, rowData := range rowChange.GetRowDatas() {
if eventType == pbe.EventType_DELETE {
// 删除
keyColName, keyColValue, _, _, _ := FormatSql(rowData.GetBeforeColumns(), false)
tempSql := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s = %s ;\n", header.GetSchemaName(), header.GetTableName(), keyColName, keyColValue)
sqls = append(sqls, Sql{Content: tempSql, Type: "DELETE"})
} else if eventType == pbe.EventType_INSERT {
// 插入
_, _, colNames, colValues, _ := FormatSql(rowData.GetAfterColumns(), false)
tempSql := fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s) ;\n", header.GetSchemaName(), header.GetTableName(), colNames, colValues)
sqls = append(sqls, Sql{Content: tempSql, Type: "INSERT"})
} else if eventType == pbe.EventType_UPDATE {
// 更新
keyColName, keyColValue, _, _, colChange := FormatSql(rowData.GetAfterColumns(), true)
if keyColName == "" || keyColValue == "" {
keyColName, keyColValue, _, _, _ = FormatSql(rowData.GetBeforeColumns(), true)
}
tempSql := fmt.Sprintf("UPDATE `%s`.`%s` SET %s WHERE %s=%s ;\n", header.GetSchemaName(), header.GetTableName(), colChange, keyColName, keyColValue)
// 同时给出insert语句
_, _, colNames, colValues, _ := FormatSql(rowData.GetAfterColumns(), false)
insertSql := fmt.Sprintf("INSERT INTO `%s`.`%s` (%s) VALUES (%s) ;\n", header.GetSchemaName(), header.GetTableName(), keyColName+","+colNames, keyColValue+","+colValues)
sqls = append(sqls, Sql{Content: tempSql, Insert: insertSql, Type: "UPDATE"})
} else {
}
}
if rowChange.Sql != "" && strings.Contains(strings.ToLower(rowChange.Sql), "alter") {
if strings.Contains(rowChange.Sql, "`.`") {
// 自带库名
sqls = append(sqls, Sql{Content: rowChange.Sql + ";\n", Type: "ALTER"})
} else {
// 需要手动替换库名
tableName := strings.Split(strings.Split(rowChange.Sql, "alter table ")[1], " ")[0]
replaceSql := strings.Replace(rowChange.Sql, "alter table "+tableName, "alter table `"+header.SchemaName+"`"+".`"+header.TableName+"`", 1)
sqls = append(sqls, Sql{Content: replaceSql + ";\n", Type: "ALTER"})
}
}
}
return sqls
}