Skip to content

Commit

Permalink
return data to client via channel (#158)
Browse files Browse the repository at this point in the history
* return data to client via channel

* fix: conn leak and query block

Co-authored-by: Jeffsky <jjeffcaii@outlook.com>
  • Loading branch information
Penglq and jjeffcaii committed May 16, 2022
1 parent ec1334b commit df363ef
Show file tree
Hide file tree
Showing 27 changed files with 532 additions and 172 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/nacos-group/nacos-sdk-go/v2 v2.0.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/olekukonko/tablewriter v0.0.5
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.28.0 // indirect
Expand All @@ -29,6 +28,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -960,8 +960,9 @@ go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpK
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (executor *RedirectExecutor) ExecuteUseDB(ctx *proto.Context) error {

func (executor *RedirectExecutor) ExecuteFieldList(ctx *proto.Context) ([]proto.Field, error) {
index := bytes.IndexByte(ctx.Data, 0x00)
table := string(ctx.Data[0:index])
table := string(ctx.Data[1:index])
wildcard := string(ctx.Data[index+1:])

rt, err := runtime.Load(ctx.Schema)
Expand Down Expand Up @@ -309,7 +309,7 @@ func (executor *RedirectExecutor) ExecutorComStmtExecute(ctx *proto.Context) (pr
}

query := ctx.Stmt.StmtNode.Text()
log.Debugf(query)
log.Debugf("ComStmtExecute: %s", query)

executor.doPreFilter(ctx)
result, warn, err = executable.Execute(ctx)
Expand Down
75 changes: 47 additions & 28 deletions pkg/mysql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,16 +860,22 @@ func (conn *BackendConnection) WriteComSetOption(operation uint16) error {

func (conn *BackendConnection) WriteComFieldList(table string, wildcard string) error {
conn.c.sequence = 0
length := 1 +
lenNullString(table) +
lenNullString(wildcard)
length := lenNullString(table) + lenNullString(wildcard)
if len(wildcard) > 0 {
length++
}

data := conn.c.startEphemeralPacket(length)
pos := 0

pos = writeByte(data, 0, mysql.ComFieldList)
pos = writeNullString(data, pos, table)
writeNullString(data, pos, wildcard)
if len(wildcard) > 0 {
pos = writeNullString(data, pos, table)
writeNullString(data, pos, wildcard)
} else {
pos = writeEOFString(data, pos, table)
writeEOFString(data, pos, wildcard)
}

if err := conn.c.writeEphemeralPacket(); err != nil {
return err
Expand All @@ -878,13 +884,13 @@ func (conn *BackendConnection) WriteComFieldList(table string, wildcard string)
return nil
}

func (conn *BackendConnection) readResultSetHeaderPacket() (colNumber int, more bool, warnings uint16, err error) {
func (conn *BackendConnection) readResultSetHeaderPacket() (affectedRows, lastInsertID uint64, colNumber int, more bool, warnings uint16, err error) {
// Get the result.
_, _, colNumber, more, warning, err := conn.ReadComQueryResponse()
affectedRows, lastInsertID, colNumber, more, warning, err := conn.ReadComQueryResponse()
if err != nil {
return colNumber, more, warning, err
return affectedRows, lastInsertID, colNumber, more, warning, err
}
return colNumber, more, warning, nil
return affectedRows, lastInsertID, colNumber, more, warning, nil
}

func (conn *BackendConnection) readResultSetColumnsPacket(colNumber int) (columns []proto.Field, err error) {
Expand All @@ -902,11 +908,21 @@ func (conn *BackendConnection) readResultSetColumnsPacket(colNumber int) (column
}

// ReadQueryRow returns iterator, and the line reads the results set
func (conn *BackendConnection) ReadQueryRow() (iter *IterRow, more bool, warnings uint16, err error) {
colNumber, more, warning, err := conn.readResultSetHeaderPacket()
func (conn *BackendConnection) ReadQueryRow() (iter *IterRow, affectedRows, lastInsertID uint64, more bool, warnings uint16, err error) {
iterRow := &IterRow{BackendConnection: conn,
Row: &Row{
Content: []byte{}, ResultSet: &ResultSet{
Columns: make([]proto.Field, 0),
}},
hasNext: true,
}
affectedRows, lastInsertID, colNumber, more, warning, err := conn.readResultSetHeaderPacket()
if err != nil {
return iterRow, affectedRows, lastInsertID, more, warning, err
}
if colNumber == 0 {
// OK packet, means no results. Just use the numbers.
return nil, more, warning, nil
return iterRow, affectedRows, lastInsertID, more, warning, nil
}

// Read column headers. One packet per column.
Expand All @@ -919,19 +935,14 @@ func (conn *BackendConnection) ReadQueryRow() (iter *IterRow, more bool, warning
return
}
}
iterRow := &IterRow{BackendConnection: conn,
Row: &Row{
Content: []byte{}, ResultSet: &ResultSet{
Columns: columns,
}},
hasNext: true,
}

iterRow.Row.ResultSet.Columns = columns

if conn.capabilities&mysql.CapabilityClientDeprecateEOF == 0 {
// EOF is only present here if it's not deprecated.
data, err := conn.c.readEphemeralPacket()
if err != nil {
return nil, more, warning, err2.NewSQLError(mysql.CRServerLost, mysql.SSUnknownSQLState, "%v", err)
return nil, affectedRows, lastInsertID, more, warning, err2.NewSQLError(mysql.CRServerLost, mysql.SSUnknownSQLState, "%v", err)
}
if isEOFPacket(data) {

Expand All @@ -942,14 +953,14 @@ func (conn *BackendConnection) ReadQueryRow() (iter *IterRow, more bool, warning

} else if isErrorPacket(data) {
defer conn.c.recycleReadPacket()
return nil, more, warning, ParseErrorPacket(data)
return nil, affectedRows, lastInsertID, more, warning, ParseErrorPacket(data)
} else {
defer conn.c.recycleReadPacket()
return nil, more, warning, fmt.Errorf("unexpected packet after fields: %v", data)
return nil, affectedRows, lastInsertID, more, warning, fmt.Errorf("unexpected packet after fields: %v", data)
}
}

return iterRow, more, warnings, err
return iterRow, affectedRows, lastInsertID, more, warnings, err
}

// ReadQueryResult gets the result from the last written query.
Expand Down Expand Up @@ -1369,7 +1380,7 @@ func (conn *BackendConnection) ExecuteMulti(query string, wantFields bool) (resu
// ExecuteWithWarningCountIterRow is for fetching results and a warning count
// Note: In a future iteration this should be abolished and merged into the
// Execute API.
func (conn *BackendConnection) ExecuteWithWarningCountIterRow(query string) (iterRow Iter, warnings uint16, err error) {
func (conn *BackendConnection) ExecuteWithWarningCountIterRow(query string) (res *Result, warnings uint16, err error) {
defer func() {
if err != nil {
if sqlErr, ok := err.(*err2.SQLError); ok {
Expand All @@ -1383,8 +1394,16 @@ func (conn *BackendConnection) ExecuteWithWarningCountIterRow(query string) (ite
return nil, 0, err
}

iterTextRow, _, warnings, err := conn.ReadQueryRow()
iterRow = &TextIterRow{iterTextRow}
iterTextRow, affectedRows, lastInsertID, _, warnings, err := conn.ReadQueryRow()
iterRow := &TextIterRow{iterTextRow}

res = &Result{
AffectedRows: affectedRows,
InsertId: lastInsertID,
Fields: iterRow.Fields(),
Rows: []proto.Row{iterRow},
DataChan: make(chan proto.Row, 1),
}
return
}

Expand Down Expand Up @@ -1417,15 +1436,15 @@ func (conn *BackendConnection) PrepareExecuteArgs(query string, args []interface
return stmt.execArgs(args)
}

func (conn *BackendConnection) PrepareQueryArgsIterRow(query string, data []interface{}) (iterRow Iter, warnings uint16, err error) {
func (conn *BackendConnection) PrepareQueryArgsIterRow(query string, data []interface{}) (result *Result, warnings uint16, err error) {
stmt, err := conn.prepare(query)
if err != nil {
return nil, 0, err
}
return stmt.queryArgsIterRow(data)
}

func (conn *BackendConnection) PrepareQueryArgs(query string, data []interface{}) (Result *Result, warnings uint16, err error) {
func (conn *BackendConnection) PrepareQueryArgs(query string, data []interface{}) (result *Result, warnings uint16, err error) {
stmt, err := conn.prepare(query)
if err != nil {
return nil, 0, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/mysql/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ func TestWriteComFieldList(t *testing.T) {
assert.Equal(t, mysql.ComFieldList, int(written[4]))
assert.Equal(t, table, string(written[5:5+len(table)]))
assert.Equal(t, column, string(written[5+len(table)+1:5+len(table)+1+len(column)]))

conn.c = newConn(new(mockConn))
column = ""
err = conn.WriteComFieldList(table, column)
assert.NoError(t, err)
written = conn.c.conn.(*mockConn).written
assert.Equal(t, uint8(1+len(table)+len(column)+1), written[0])
assert.Equal(t, mysql.ComFieldList, int(written[4]))
assert.Equal(t, table, string(written[5:5+len(table)]))
assert.Equal(t, column, string(written[5+len(table)+1:5+len(table)+1+len(column)]))
}

func TestPrepare(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (c *Conn) startEphemeralPacket(length int) []byte {
}

c.currentEphemeralPolicy = ephemeralWrite
// get buffer from pool or it'll be allocated if length is too big
// get buffer from pool, or it'll be allocated if length is too big
c.currentEphemeralBuffer = bufPool.Get(length)
return *c.currentEphemeralBuffer
}
Expand Down
24 changes: 20 additions & 4 deletions pkg/mysql/execute_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,19 @@ func (l *Listener) handleQuery(c *Conn, ctx *proto.Context) error {
result, warn, err := l.executor.ExecutorComQuery(ctx)
if err != nil {
if wErr := c.writeErrorPacketFromError(err); wErr != nil {
log.Error("Error writing query error to client %v: %v", l.connectionID, wErr)
log.Errorf("Error writing query error to client %v: %v", l.connectionID, wErr)
return wErr
}
return nil
}

if cr, ok := result.(*proto.CloseableResult); ok {
result = cr.Result
defer func() {
_ = cr.Close()
}()
}

if len(result.GetFields()) == 0 {
// A successful callback with no fields means that this was a
// DML or other write-only operation.
Expand All @@ -99,7 +107,7 @@ func (l *Listener) handleQuery(c *Conn, ctx *proto.Context) error {
if err = c.writeFields(l.capabilities, result); err != nil {
return err
}
if err = c.writeRows(result); err != nil {
if err = c.writeRowChan(result); err != nil {
return err
}
if err = c.writeEndResult(l.capabilities, false, 0, 0, warn); err != nil {
Expand Down Expand Up @@ -158,11 +166,19 @@ func (l *Listener) handleStmtExecute(c *Conn, ctx *proto.Context) error {
result, warn, err := l.executor.ExecutorComStmtExecute(ctx)
if err != nil {
if wErr := c.writeErrorPacketFromError(err); wErr != nil {
log.Error("Error writing query error to client %v: %v", l.connectionID, wErr)
log.Errorf("Error writing query error to client %v: %v, executor error: %v", l.connectionID, wErr, err)
return wErr
}
return nil
}

if cr, ok := result.(*proto.CloseableResult); ok {
result = cr.Result
defer func() {
_ = cr.Close()
}()
}

if len(result.GetFields()) == 0 {
// A successful callback with no fields means that this was a
// DML or other write-only operation.
Expand All @@ -177,7 +193,7 @@ func (l *Listener) handleStmtExecute(c *Conn, ctx *proto.Context) error {
if err = c.writeFields(l.capabilities, result); err != nil {
return err
}
if err = c.writeBinaryRows(result); err != nil {
if err = c.writeBinaryRowChan(result); err != nil {
return err
}
if err = c.writeEndResult(l.capabilities, false, 0, 0, warn); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mysql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,9 @@ func (rows *BinaryIterRow) Decode() ([]*proto.Value, error) {
dest[i] = &proto.Value{
Typ: field.fieldType,
Flags: field.flags,
Len: n,
Len: int(num),
Val: val,
Raw: rows.Content[pos : pos+n],
Raw: rows.Content[pos : pos+int(num)],
}
if err == nil {
break
Expand All @@ -389,7 +389,7 @@ func (rows *BinaryIterRow) Decode() ([]*proto.Value, error) {
dest[i] = &proto.Value{
Typ: field.fieldType,
Flags: field.flags,
Len: n,
Len: int(num),
Val: val,
Raw: rows.Content[pos : pos+n],
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/mysql/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Result struct {
AffectedRows uint64
InsertId uint64
Rows []proto.Row
DataChan chan proto.Row
}

func (res *Result) GetFields() []proto.Field {
Expand All @@ -43,3 +44,7 @@ func (res *Result) LastInsertId() (uint64, error) {
func (res *Result) RowsAffected() (uint64, error) {
return res.AffectedRows, nil
}

func (res *Result) GetDataChan() chan proto.Row {
return res.DataChan
}
Loading

0 comments on commit df363ef

Please sign in to comment.