Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 156 additions & 103 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,39 +629,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
*err = goerrors.Join(*err, wrappedErr)
}
}
wg := sync.WaitGroup{}
wg.Add(2)

var r *sqltypes.Result
var processedAtLeastOneBatch bool

// Read rows off the row iterator and send them to the row channel.
iter, projs := GetDeferredProjections(iter)
var rowChan = make(chan sql.Row, 512)
eg.Go(func() (err error) {
defer pan2err(&err)
defer wg.Done()
defer close(rowChan)
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
row, err := iter.Next(ctx)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
select {
case rowChan <- row:
case <-ctx.Done():
return nil
}
}
}
})

// TODO: poll for closed connections should obviously also run even if
// we're doing something with an OK result or a single row result, etc.
Expand All @@ -685,75 +652,133 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s

// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
// clean out rows that have already been spooled.
resetCallback := func(r *sqltypes.Result, more bool) error {
// A server-side cursor allows the caller to fetch results cached on the server-side,
// so if a cursor exists, we can't release the buffer memory yet.
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
// A server-side cursor allows the caller to fetch results cached on the server-side,
// so if a cursor exists, we can't release the buffer memory yet.
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
callback = func(r *sqltypes.Result, more bool) error {
defer buf.Reset()
return callback(r, more)
}
return callback(r, more)
}

// Reads rows from the channel, converts them to wire format,
// and calls |callback| to give them to vitess.
iter, projs := GetDeferredProjections(iter)

wg := sync.WaitGroup{}
wg.Add(3)

// Read rows off the row iterator and send them to the row channel.
var rowChan = make(chan sql.Row, 512)
eg.Go(func() (err error) {
defer pan2err(&err)
defer wg.Done()
defer close(rowChan)
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
row, iErr := iter.Next(ctx)
if iErr == io.EOF {
return nil
}
if iErr != nil {
return iErr
}
select {
case rowChan <- row:
case <-ctx.Done():
return nil
}
}
}
})

// Drain rows from rowChan, convert to wire format, and send to resChan
var resChan = make(chan *sqltypes.Result, 4)
var res *sqltypes.Result
eg.Go(func() (err error) {
defer pan2err(&err)
defer cancelF()
defer wg.Done()
defer close(resChan)

for {
if r == nil {
r = &sqltypes.Result{
if res == nil {
res = &sqltypes.Result{
Fields: resultFields,
Rows: make([][]sqltypes.Value, 0, rowsBatch),
}
}
if r.RowsAffected == rowsBatch {
if err := resetCallback(r, more); err != nil {
return err
}
r = nil
processedAtLeastOneBatch = true
continue
}

select {
case <-ctx.Done():
return context.Cause(ctx)

case <-timer.C:
if h.readTimeout != 0 {
// Cancel and return so Vitess can call the CloseConnection callback
ctx.GetLogger().Warn("connection timeout")
return ErrRowTimeout.New()
}

case row, ok := <-rowChan:
if !ok {
return nil
}

if types.IsOkResult(row) {
if len(r.Rows) > 0 {
if res.RowsAffected > 0 {
panic("Got OkResult mixed with RowResult")
}
r = resultFromOkResult(row[0].(types.OkResult))
res = resultFromOkResult(row[0].(types.OkResult))
continue
}

outputRow, err := RowToSQL(ctx, schema, row, projs, buf)
if err != nil {
return err
outRow, sqlErr := RowToSQL(ctx, schema, row, projs, buf)
if sqlErr != nil {
return sqlErr
}

ctx.GetLogger().Tracef("spooling result row %s", outputRow)
r.Rows = append(r.Rows, outputRow)
r.RowsAffected++
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
// TODO: timer should probably go in its own thread, as rowChan is blocking
if h.readTimeout != 0 {
// Cancel and return so Vitess can call the CloseConnection callback
ctx.GetLogger().Tracef("connection timeout")
return ErrRowTimeout.New()
ctx.GetLogger().Tracef("spooling result row %s", outRow)
res.Rows = append(res.Rows, outRow)
res.RowsAffected++

if res.RowsAffected == rowsBatch {
select {
case <-ctx.Done():
return context.Cause(ctx)
case resChan <- res:
res = nil
}
}
}

timer.Reset(waitTime)
}
})

// Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer)
var processedAtLeastOneBatch bool
eg.Go(func() (err error) {
defer pan2err(&err)
defer cancelF()
defer wg.Done()
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case r, ok := <-resChan:
if !ok {
return nil
}
processedAtLeastOneBatch = true
err = callback(r, more)
if err != nil {
return err
}
}
}
})

// Close() kills this PID in the process list,
// wait until all rows have be sent over the wire
eg.Go(func() (err error) {
Expand All @@ -770,7 +795,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s
}
return nil, false, err
}
return r, processedAtLeastOneBatch, nil
return res, processedAtLeastOneBatch, nil
}

func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) {
Expand Down Expand Up @@ -804,22 +829,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
timer := time.NewTimer(waitTime)
defer timer.Stop()

wg := sync.WaitGroup{}
wg.Add(2)

// Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to
// clean out rows that have already been spooled.
resetCallback := func(r *sqltypes.Result, more bool) error {
// A server-side cursor allows the caller to fetch results cached on the server-side,
// so if a cursor exists, we can't release the buffer memory yet.
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
// A server-side cursor allows the caller to fetch results cached on the server-side,
// so if a cursor exists, we can't release the buffer memory yet.
if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 {
callback = func(r *sqltypes.Result, more bool) error {
defer buf.Reset()
return callback(r, more)
}
return callback(r, more)
}

// TODO: send results instead of rows?
// Read rows from iter and send them off
wg := sync.WaitGroup{}
wg.Add(3)

// Drain rows from iter and send to rowsChan
var rowChan = make(chan sql.ValueRow, 512)
eg.Go(func() (err error) {
defer pan2err(&err)
Expand All @@ -830,12 +854,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
case <-ctx.Done():
return context.Cause(ctx)
default:
row, err := iter.NextValueRow(ctx)
if err == io.EOF {
row, iErr := iter.NextValueRow(ctx)
if iErr == io.EOF {
return nil
}
if err != nil {
return err
if iErr != nil {
return iErr
}
select {
case rowChan <- row:
Expand All @@ -846,56 +870,84 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
}
})

// Drain rows from rowChan, convert to wire format, and send to resChan
var resChan = make(chan *sqltypes.Result, 4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for this buffer size being different than the row one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a buffer of *sqltypes.Result, which hold rowBatch = 128 rows each, so 512/128 = 4.

var res *sqltypes.Result
var processedAtLeastOneBatch bool
eg.Go(func() (err error) {
defer pan2err(&err)
defer cancelF()
defer close(resChan)
defer wg.Done()

for {
if res == nil {
res = &sqltypes.Result{
Fields: resultFields,
Rows: make([][]sqltypes.Value, 0, rowsBatch),
}
}
if res.RowsAffected == rowsBatch {
if err := resetCallback(res, more); err != nil {
return err
Rows: make([][]sqltypes.Value, rowsBatch),
}
res = nil
processedAtLeastOneBatch = true
continue
}

select {
case <-ctx.Done():
return context.Cause(ctx)

case <-timer.C:
if h.readTimeout != 0 {
// Cancel and return so Vitess can call the CloseConnection callback
ctx.GetLogger().Tracef("connection timeout")
ctx.GetLogger().Warn("connection timeout")
return ErrRowTimeout.New()
}

case row, ok := <-rowChan:
if !ok {
return nil
}
resRow, err := RowValueToSQLValues(ctx, schema, row, buf)
if err != nil {
return err

outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf)
if sqlErr != nil {
return sqlErr
}
ctx.GetLogger().Tracef("spooling result row %s", resRow)
res.Rows = append(res.Rows, resRow)

ctx.GetLogger().Tracef("spooling result row %s", outRow)
res.Rows[res.RowsAffected] = outRow
res.RowsAffected++
if !timer.Stop() {
<-timer.C

if res.RowsAffected == rowsBatch {
select {
case <-ctx.Done():
return context.Cause(ctx)
case resChan <- res:
res = nil
}
}
}

timer.Reset(waitTime)
}
})

// Drain sqltypes.Result from resChan and call callback (send to client and reset buffer)
var processedAtLeastOneBatch bool
eg.Go(func() (err error) {
defer pan2err(&err)
defer cancelF()
defer wg.Done()
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case r, ok := <-resChan:
if !ok {
return nil
}
processedAtLeastOneBatch = true
err = callback(r, more)
if err != nil {
return err
}
}
}
})

// Close() kills this PID in the process list,
// wait until all rows have be sent over the wire
eg.Go(func() (err error) {
Expand All @@ -913,7 +965,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema
return nil, false, err
}

return res, processedAtLeastOneBatch, nil
res.Rows = res.Rows[:res.RowsAffected]
return res, processedAtLeastOneBatch, err
}

// See https://dev.mysql.com/doc/internals/en/status-flags.html
Expand Down