forked from go-mysql-org/go-mysql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
command.go
226 lines (203 loc) · 5.76 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package server
import (
"bytes"
"fmt"
. "github.com/blackhu/go-mysql/mysql"
"github.com/blackhu/go-mysql/replication"
"github.com/siddontang/go/hack"
)
type Handler interface {
//handle COM_INIT_DB command, you can check whether the dbName is valid, or other.
UseDB(dbName string) error
//handle COM_QUERY command, like SELECT, INSERT, UPDATE, etc...
//If Result has a Resultset (SELECT, SHOW, etc...), we will send this as the response, otherwise, we will send Result
HandleQuery(query string) (*Result, error)
//handle COM_FILED_LIST command
HandleFieldList(table string, fieldWildcard string) ([]*Field, error)
//handle COM_STMT_PREPARE, params is the param number for this statement, columns is the column number
//context will be used later for statement execute
HandleStmtPrepare(query string) (params int, columns int, context interface{}, err error)
//handle COM_STMT_EXECUTE, context is the previous one set in prepare
//query is the statement prepare query, and args is the params for this statement
HandleStmtExecute(context interface{}, query string, args []interface{}) (*Result, error)
//handle COM_STMT_CLOSE, context is the previous one set in prepare
//this handler has no response
HandleStmtClose(context interface{}) error
//handle any other command that is not currently handled by the library,
//default implementation for this method will return an ER_UNKNOWN_ERROR
HandleOtherCommand(cmd byte, data []byte) error
}
type ReplicationHandler interface {
// handle Replication command
HandleRegisterSlave(data []byte) error
HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error)
}
func (c *Conn) HandleCommand() error {
if c.Conn == nil {
return fmt.Errorf("connection closed")
}
data, err := c.ReadPacket()
if err != nil {
c.Close()
c.Conn = nil
return err
}
v := c.dispatch(data)
err = c.WriteValue(v)
if c.Conn != nil {
c.ResetSequence()
}
if err != nil {
c.Close()
c.Conn = nil
}
return err
}
func (c *Conn) dispatch(data []byte) interface{} {
cmd := data[0]
data = data[1:]
switch cmd {
case COM_QUIT:
c.Close()
c.Conn = nil
return noResponse{}
case COM_QUERY:
if r, err := c.h.HandleQuery(hack.String(data)); err != nil {
return err
} else {
return r
}
case COM_PING:
return nil
case COM_INIT_DB:
if err := c.h.UseDB(hack.String(data)); err != nil {
return err
} else {
return nil
}
case COM_FIELD_LIST:
index := bytes.IndexByte(data, 0x00)
table := hack.String(data[0:index])
wildcard := hack.String(data[index+1:])
if fs, err := c.h.HandleFieldList(table, wildcard); err != nil {
return err
} else {
return fs
}
case COM_STMT_PREPARE:
c.stmtID++
st := new(Stmt)
st.ID = c.stmtID
st.Query = hack.String(data)
var err error
if st.Params, st.Columns, st.Context, err = c.h.HandleStmtPrepare(st.Query); err != nil {
return err
} else {
st.ResetParams()
c.stmts[c.stmtID] = st
return st
}
case COM_STMT_EXECUTE:
if r, err := c.handleStmtExecute(data); err != nil {
return err
} else {
return r
}
case COM_STMT_CLOSE:
if err := c.handleStmtClose(data); err != nil {
return err
}
return noResponse{}
case COM_STMT_SEND_LONG_DATA:
if err := c.handleStmtSendLongData(data); err != nil {
return err
}
return noResponse{}
case COM_STMT_RESET:
if r, err := c.handleStmtReset(data); err != nil {
return err
} else {
return r
}
case COM_SET_OPTION:
if err := c.h.HandleOtherCommand(cmd, data); err != nil {
return err
}
return eofResponse{}
case COM_REGISTER_SLAVE:
if h, ok := c.h.(ReplicationHandler); ok {
return h.HandleRegisterSlave(data)
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP:
if h, ok := c.h.(ReplicationHandler); ok {
pos, err := parseBinlogDump(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDump(pos); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP_GTID:
if h, ok := c.h.(ReplicationHandler); ok {
gtidSet, err := parseBinlogDumpGTID(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
default:
return c.h.HandleOtherCommand(cmd, data)
}
}
type EmptyHandler struct {
}
type EmptyReplicationHandler struct {
EmptyHandler
}
func (h EmptyHandler) UseDB(dbName string) error {
return nil
}
func (h EmptyHandler) HandleQuery(query string) (*Result, error) {
return nil, fmt.Errorf("not supported now")
}
func (h EmptyHandler) HandleFieldList(table string, fieldWildcard string) ([]*Field, error) {
return nil, fmt.Errorf("not supported now")
}
func (h EmptyHandler) HandleStmtPrepare(query string) (int, int, interface{}, error) {
return 0, 0, nil, fmt.Errorf("not supported now")
}
func (h EmptyHandler) HandleStmtExecute(context interface{}, query string, args []interface{}) (*Result, error) {
return nil, fmt.Errorf("not supported now")
}
func (h EmptyHandler) HandleStmtClose(context interface{}) error {
return nil
}
func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
return fmt.Errorf("not supported now")
}
func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}
func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}
func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
return NewError(
ER_UNKNOWN_ERROR,
fmt.Sprintf("command %d is not supported now", cmd),
)
}