forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sqldb_conn.go
174 lines (148 loc) · 4.45 KB
/
sqldb_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package mysqlconn
import (
"golang.org/x/net/context"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
querypb "github.com/youtube/vitess/go/vt/proto/query"
)
// This file contains the methods needed to implement the sqldb.Conn
// interface. These methods don't necessarely make sense, but it's
// easier to implement them as is, and then later on refactor
// everything, once the C version of mysql connection is gone.
//
// ExecuteFetch is in query.go.
// Close() is in conn.go.
// ExecuteStreamFetch is part of the sqldb.Conn interface.
// Returns a sqldb.SQLError.
func (c *Conn) ExecuteStreamFetch(query string) error {
// Sanity check.
if c.fields != nil {
return sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "streaming query already in progress")
}
// This is a new command, need to reset the sequence.
c.sequence = 0
// Send the query as a COM_QUERY packet.
if err := c.writeComQuery(query); err != nil {
return err
}
// Get the result.
_, _, colNumber, err := c.readComQueryResponse()
if err != nil {
return err
}
if colNumber == 0 {
// OK packet, means no results. Save an empty Fields array.
c.fields = make([]*querypb.Field, 0)
return nil
}
// Read the fields, save them.
fields := make([]querypb.Field, colNumber)
fieldsPointers := make([]*querypb.Field, colNumber)
// Read column headers. One packet per column.
// Build the fields.
for i := 0; i < colNumber; i++ {
fieldsPointers[i] = &fields[i]
if err := c.readColumnDefinition(fieldsPointers[i], i); err != nil {
return err
}
}
// Read the EOF after the fields if necessary.
if c.Capabilities&CapabilityClientDeprecateEOF == 0 {
// EOF is only present here if it's not deprecated.
data, err := c.readEphemeralPacket()
if err != nil {
return sqldb.NewSQLError(CRServerLost, SSUnknownSQLState, "%v", err)
}
switch data[0] {
case EOFPacket:
// This is what we expect.
// Warnings and status flags are ignored.
break
case ErrPacket:
// Error packet.
return parseErrorPacket(data)
default:
return sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "unexpected packet after fields: %v", data)
}
}
c.fields = fieldsPointers
return nil
}
// Fields is part of the sqldb.Conn interface.
func (c *Conn) Fields() ([]*querypb.Field, error) {
if c.fields == nil {
return nil, sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "no streaming query in progress")
}
if len(c.fields) == 0 {
// The query returned an empty field list.
return nil, nil
}
return c.fields, nil
}
// FetchNext is part of the sqldb.Conn interface.
func (c *Conn) FetchNext() ([]sqltypes.Value, error) {
if c.fields == nil {
// We are already done, and the result was closed.
return nil, sqldb.NewSQLError(CRCommandsOutOfSync, SSUnknownSQLState, "no streaming query in progress")
}
if len(c.fields) == 0 {
// We received no fields, so there is no data.
return nil, nil
}
data, err := c.ReadPacket()
if err != nil {
return nil, err
}
switch data[0] {
case EOFPacket:
// This packet may be one of two kinds:
// - an EOF packet,
// - an OK packet with an EOF header if
// CapabilityClientDeprecateEOF is set.
// We do not parse it anyway, so it doesn't matter.
// Warnings and status flags are ignored.
c.fields = nil
return nil, nil
case ErrPacket:
// Error packet.
return nil, parseErrorPacket(data)
}
// Regular row.
return c.parseRow(data, c.fields)
}
// CloseResult is part of the sqldb.Conn interface.
// Just drain the remaining values.
func (c *Conn) CloseResult() {
for c.fields != nil {
rows, err := c.FetchNext()
if err != nil || rows == nil {
// We either got an error, or got the last result.
c.fields = nil
}
}
}
// IsClosed is part of the sqldb.Conn interface.
func (c *Conn) IsClosed() bool {
return c.ConnectionID == 0
}
// Shutdown is part of the sqldb.Conn interface.
func (c *Conn) Shutdown() {
c.ConnectionID = 0
c.conn.Close()
}
// ID is part of the sqldb.Conn interface.
func (c *Conn) ID() int64 {
return int64(c.ConnectionID)
}
func init() {
sqldb.Register("mysqlconn", func(params sqldb.ConnParams) (sqldb.Conn, error) {
ctx := context.Background()
return Connect(ctx, ¶ms)
})
// Uncomment this and comment out the call to sqldb.RegisterDefault in
// go/mysql/mysql.go to make this the default.
// sqldb.RegisterDefault(func(params sqldb.ConnParams) (sqldb.Conn, error) {
// ctx := context.Background()
// return Connect(ctx, ¶ms)
// })
}