diff --git a/server/handler.go b/server/handler.go index 392181fc02..827fe3cb53 100644 --- a/server/handler.go +++ b/server/handler.go @@ -629,39 +629,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s *err = goerrors.Join(*err, wrappedErr) } } - wg := sync.WaitGroup{} - wg.Add(2) - - var r *sqltypes.Result - var processedAtLeastOneBatch bool - - // Read rows off the row iterator and send them to the row channel. - iter, projs := GetDeferredProjections(iter) - var rowChan = make(chan sql.Row, 512) - eg.Go(func() (err error) { - defer pan2err(&err) - defer wg.Done() - defer close(rowChan) - for { - select { - case <-ctx.Done(): - return context.Cause(ctx) - default: - row, err := iter.Next(ctx) - if err == io.EOF { - return nil - } - if err != nil { - return err - } - select { - case rowChan <- row: - case <-ctx.Done(): - return nil - } - } - } - }) // TODO: poll for closed connections should obviously also run even if // we're doing something with an OK result or a single row result, etc. @@ -685,75 +652,133 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. - resetCallback := func(r *sqltypes.Result, more bool) error { - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + callback = func(r *sqltypes.Result, more bool) error { defer buf.Reset() + return callback(r, more) } - return callback(r, more) } - // Reads rows from the channel, converts them to wire format, - // and calls |callback| to give them to vitess. + iter, projs := GetDeferredProjections(iter) + + wg := sync.WaitGroup{} + wg.Add(3) + + // Read rows off the row iterator and send them to the row channel. + var rowChan = make(chan sql.Row, 512) + eg.Go(func() (err error) { + defer pan2err(&err) + defer wg.Done() + defer close(rowChan) + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + row, iErr := iter.Next(ctx) + if iErr == io.EOF { + return nil + } + if iErr != nil { + return iErr + } + select { + case rowChan <- row: + case <-ctx.Done(): + return nil + } + } + } + }) + + // Drain rows from rowChan, convert to wire format, and send to resChan + var resChan = make(chan *sqltypes.Result, 4) + var res *sqltypes.Result eg.Go(func() (err error) { defer pan2err(&err) - defer cancelF() defer wg.Done() + defer close(resChan) + for { - if r == nil { - r = &sqltypes.Result{ + if res == nil { + res = &sqltypes.Result{ Fields: resultFields, Rows: make([][]sqltypes.Value, 0, rowsBatch), } } - if r.RowsAffected == rowsBatch { - if err := resetCallback(r, more); err != nil { - return err - } - r = nil - processedAtLeastOneBatch = true - continue - } select { case <-ctx.Done(): return context.Cause(ctx) + + case <-timer.C: + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Warn("connection timeout") + return ErrRowTimeout.New() + } + case row, ok := <-rowChan: if !ok { return nil } + if types.IsOkResult(row) { - if len(r.Rows) > 0 { + if res.RowsAffected > 0 { panic("Got OkResult mixed with RowResult") } - r = resultFromOkResult(row[0].(types.OkResult)) + res = resultFromOkResult(row[0].(types.OkResult)) continue } - outputRow, err := RowToSQL(ctx, schema, row, projs, buf) - if err != nil { - return err + outRow, sqlErr := RowToSQL(ctx, schema, row, projs, buf) + if sqlErr != nil { + return sqlErr } - ctx.GetLogger().Tracef("spooling result row %s", outputRow) - r.Rows = append(r.Rows, outputRow) - r.RowsAffected++ - if !timer.Stop() { - <-timer.C - } - case <-timer.C: - // TODO: timer should probably go in its own thread, as rowChan is blocking - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() + ctx.GetLogger().Tracef("spooling result row %s", outRow) + res.Rows = append(res.Rows, outRow) + res.RowsAffected++ + + if res.RowsAffected == rowsBatch { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case resChan <- res: + res = nil + } } } + timer.Reset(waitTime) } }) + // Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer) + var processedAtLeastOneBatch bool + eg.Go(func() (err error) { + defer pan2err(&err) + defer cancelF() + defer wg.Done() + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case r, ok := <-resChan: + if !ok { + return nil + } + processedAtLeastOneBatch = true + err = callback(r, more) + if err != nil { + return err + } + } + } + }) + // Close() kills this PID in the process list, // wait until all rows have be sent over the wire eg.Go(func() (err error) { @@ -770,7 +795,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } return nil, false, err } - return r, processedAtLeastOneBatch, nil + return res, processedAtLeastOneBatch, nil } func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) { @@ -804,22 +829,21 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema timer := time.NewTimer(waitTime) defer timer.Stop() - wg := sync.WaitGroup{} - wg.Add(2) - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. - resetCallback := func(r *sqltypes.Result, more bool) error { - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + callback = func(r *sqltypes.Result, more bool) error { defer buf.Reset() + return callback(r, more) } - return callback(r, more) } - // TODO: send results instead of rows? - // Read rows from iter and send them off + wg := sync.WaitGroup{} + wg.Add(3) + + // Drain rows from iter and send to rowsChan var rowChan = make(chan sql.ValueRow, 512) eg.Go(func() (err error) { defer pan2err(&err) @@ -830,12 +854,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema case <-ctx.Done(): return context.Cause(ctx) default: - row, err := iter.NextValueRow(ctx) - if err == io.EOF { + row, iErr := iter.NextValueRow(ctx) + if iErr == io.EOF { return nil } - if err != nil { - return err + if iErr != nil { + return iErr } select { case rowChan <- row: @@ -846,56 +870,84 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } }) + // Drain rows from rowChan, convert to wire format, and send to resChan + var resChan = make(chan *sqltypes.Result, 4) var res *sqltypes.Result - var processedAtLeastOneBatch bool eg.Go(func() (err error) { defer pan2err(&err) - defer cancelF() + defer close(resChan) defer wg.Done() + for { if res == nil { res = &sqltypes.Result{ Fields: resultFields, - Rows: make([][]sqltypes.Value, 0, rowsBatch), - } - } - if res.RowsAffected == rowsBatch { - if err := resetCallback(res, more); err != nil { - return err + Rows: make([][]sqltypes.Value, rowsBatch), } - res = nil - processedAtLeastOneBatch = true - continue } select { case <-ctx.Done(): return context.Cause(ctx) + case <-timer.C: if h.readTimeout != 0 { // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") + ctx.GetLogger().Warn("connection timeout") return ErrRowTimeout.New() } + case row, ok := <-rowChan: if !ok { return nil } - resRow, err := RowValueToSQLValues(ctx, schema, row, buf) - if err != nil { - return err + + outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf) + if sqlErr != nil { + return sqlErr } - ctx.GetLogger().Tracef("spooling result row %s", resRow) - res.Rows = append(res.Rows, resRow) + + ctx.GetLogger().Tracef("spooling result row %s", outRow) + res.Rows[res.RowsAffected] = outRow res.RowsAffected++ - if !timer.Stop() { - <-timer.C + + if res.RowsAffected == rowsBatch { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case resChan <- res: + res = nil + } } } + timer.Reset(waitTime) } }) + // Drain sqltypes.Result from resChan and call callback (send to client and reset buffer) + var processedAtLeastOneBatch bool + eg.Go(func() (err error) { + defer pan2err(&err) + defer cancelF() + defer wg.Done() + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case r, ok := <-resChan: + if !ok { + return nil + } + processedAtLeastOneBatch = true + err = callback(r, more) + if err != nil { + return err + } + } + } + }) + // Close() kills this PID in the process list, // wait until all rows have be sent over the wire eg.Go(func() (err error) { @@ -913,7 +965,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil, false, err } - return res, processedAtLeastOneBatch, nil + res.Rows = res.Rows[:res.RowsAffected] + return res, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html