-
Notifications
You must be signed in to change notification settings - Fork 2
/
transaction.go
executable file
·158 lines (142 loc) · 3.95 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package gobinlog
import (
"encoding/json"
"time"
"github.com/Breeze0806/gobinlog/replication"
)
//Transaction 代表一组有事务的binlog evnet
type Transaction struct {
NowPosition Position //在binlog中的当前位置
NextPosition Position //在binlog中的下一个位置
Timestamp int64 //执行时间
Events []*StreamEvent //一组有事务的binlog evnet
}
//newTransaction 创建Transaction
func newTransaction(now, next Position, timestamp int64,
events []*StreamEvent) *Transaction {
return &Transaction{
NowPosition: now,
NextPosition: next,
Timestamp: timestamp,
Events: events,
}
}
//MarshalJSON 实现Transaction的json序列化
func (t *Transaction) MarshalJSON() ([]byte, error) {
tJSON := struct {
NowPosition Position `json:"nowPosition"`
NextPosition Position `json:"nextPosition"`
Timestamp string `json:"timestamp"`
Events []*StreamEvent `json:"events"`
}{
NowPosition: t.NowPosition,
NextPosition: t.NextPosition,
Timestamp: time.Unix(t.Timestamp, 0).Local().String(),
Events: t.Events,
}
return json.Marshal(tJSON)
}
//StreamEvent means a SQL or a rows in binlog
type StreamEvent struct {
Type StatementType //语句类型
Table MysqlTableName //表名
Query replication.Query //sql
Timestamp int64 //执行时间
RowValues []*RowData //which data come to used for StatementInsert and StatementUpdate
RowIdentifies []*RowData //which data come from used for StatementUpdate and StatementDelete
}
//newStreamEvent 创建StreamEvent
func newStreamEvent(tranType StatementType,
timestamp int64, table MysqlTableName) *StreamEvent {
return &StreamEvent{
Type: tranType,
Table: table,
Timestamp: timestamp,
Query: replication.Query{},
RowValues: make([]*RowData, 0, 10),
RowIdentifies: make([]*RowData, 0, 10),
}
}
type baseStreamEventJSON struct {
Table MysqlTableName `json:"name"`
Type string `json:"type"`
Timestamp string `json:"timestamp"`
}
//MarshalJSON 实现StreamEvent的json序列化
func (s *StreamEvent) MarshalJSON() ([]byte, error) {
b := baseStreamEventJSON{
Table: s.Table,
Type: s.Type.String(),
Timestamp: time.Unix(s.Timestamp, 0).Local().String(),
}
if s.Query.SQL != "" {
sqlJSON := struct {
baseStreamEventJSON
SQL string `json:"sql"`
}{
baseStreamEventJSON: b,
SQL: s.Query.SQL,
}
return json.Marshal(sqlJSON)
}
RowJSON := struct {
baseStreamEventJSON
RowValues []*RowData `json:"rowValues"`
RowIdentifies []*RowData `json:"rowIdentifies"`
}{
baseStreamEventJSON: b,
RowValues: s.RowValues,
RowIdentifies: s.RowIdentifies,
}
return json.Marshal(RowJSON)
}
//RowData 行数据
type RowData struct {
Columns []*ColumnData
}
//newRowData 创建RowData
func newRowData(cnt int) *RowData {
return &RowData{
Columns: make([]*ColumnData, 0, cnt),
}
}
//ColumnData 单个列的信息
type ColumnData struct {
Filed string // 字段信息
Type ColumnType // binlog中的列类型
IsEmpty bool // data is empty,即该列没有变化
Data []byte // the data
}
//newColumnData 创建ColumnData
func newColumnData(filed string, typ ColumnType, isEmpty bool) *ColumnData {
return &ColumnData{
Filed: filed,
Type: typ,
IsEmpty: isEmpty,
}
}
type baseColumnJSON struct {
Filed string `json:"filed"`
Type string `json:"type"`
IsEmpty bool `json:"isEmpty"`
}
//MarshalJSON 实现ColumnData的json序列化
func (c *ColumnData) MarshalJSON() ([]byte, error) {
b := baseColumnJSON{
Filed: c.Filed,
Type: c.Type.String(),
IsEmpty: c.IsEmpty,
}
var i interface{} = string(c.Data)
if c.Data == nil {
i = nil
}
notNullJSON := struct {
baseColumnJSON
Data interface{} `json:"data"`
}{
baseColumnJSON: b,
Data: i,
}
return json.Marshal(notNullJSON)
}