From 4fb859a3a226f20fe9ea9c70fa207108da120740 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 00:05:52 -0800 Subject: [PATCH 1/6] more threads --- server/handler.go | 165 ++++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 64 deletions(-) diff --git a/server/handler.go b/server/handler.go index 392181fc02..a49fa22ec7 100644 --- a/server/handler.go +++ b/server/handler.go @@ -629,14 +629,57 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s *err = goerrors.Join(*err, wrappedErr) } } + + // TODO: poll for closed connections should obviously also run even if + // we're doing something with an OK result or a single row result, etc. + // This should be in the caller. + pollCtx, cancelF := ctx.NewSubContext() + eg.Go(func() (err error) { + defer pan2err(&err) + return h.pollForClosedConnection(pollCtx, c) + }) + + // Default waitTime is one minute if there is no timeout configured, in which case + // it will loop to iterate again unless the socket died by the OS timeout or other problems. + // If there is a timeout, it will be enforced to ensure that Vitess has a chance to + // call Handler.CloseConnection() + waitTime := 1 * time.Minute + if h.readTimeout > 0 { + waitTime = h.readTimeout + } + timer := time.NewTimer(waitTime) + defer timer.Stop() + + // Wait for signal on the timer.C channel, and error accordingly + eg.Go(func() (err error) { + <-timer.C + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Tracef("connection timeout") + return ErrRowTimeout.New() + } + return nil + }) + + // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to + // clean out rows that have already been spooled. + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + resetCallback := callback + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + resetCallback = func(r *sqltypes.Result, more bool) error { + buf.Reset() + return callback(r, more) + } + } + wg := sync.WaitGroup{} - wg.Add(2) + wg.Add(3) - var r *sqltypes.Result var processedAtLeastOneBatch bool + iter, projs := GetDeferredProjections(iter) // Read rows off the row iterator and send them to the row channel. - iter, projs := GetDeferredProjections(iter) var rowChan = make(chan sql.Row, 512) eg.Go(func() (err error) { defer pan2err(&err) @@ -647,8 +690,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s case <-ctx.Done(): return context.Cause(ctx) default: - row, err := iter.Next(ctx) - if err == io.EOF { + row, iErr := iter.Next(ctx) + if iErr == io.EOF { return nil } if err != nil { @@ -663,97 +706,91 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) - // TODO: poll for closed connections should obviously also run even if - // we're doing something with an OK result or a single row result, etc. - // This should be in the caller. - pollCtx, cancelF := ctx.NewSubContext() - eg.Go(func() (err error) { - defer pan2err(&err) - return h.pollForClosedConnection(pollCtx, c) - }) - - // Default waitTime is one minute if there is no timeout configured, in which case - // it will loop to iterate again unless the socket died by the OS timeout or other problems. - // If there is a timeout, it will be enforced to ensure that Vitess has a chance to - // call Handler.CloseConnection() - waitTime := 1 * time.Minute - if h.readTimeout > 0 { - waitTime = h.readTimeout - } - timer := time.NewTimer(waitTime) - defer timer.Stop() - - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to - // clean out rows that have already been spooled. - resetCallback := func(r *sqltypes.Result, more bool) error { - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - defer buf.Reset() - } - return callback(r, more) - } - - // Reads rows from the channel, converts them to wire format, - // and calls |callback| to give them to vitess. + // TODO: remember to deal with last result differently + // Read rows off the row channel and convert to wire format. + var resChan = make(chan *sqltypes.Result, 4) + var res *sqltypes.Result eg.Go(func() (err error) { defer pan2err(&err) - defer cancelF() defer wg.Done() + defer close(resChan) + for { - if r == nil { - r = &sqltypes.Result{ + if res == nil { + res = &sqltypes.Result{ Fields: resultFields, Rows: make([][]sqltypes.Value, 0, rowsBatch), } } - if r.RowsAffected == rowsBatch { - if err := resetCallback(r, more); err != nil { - return err - } - r = nil - processedAtLeastOneBatch = true - continue - } select { case <-ctx.Done(): return context.Cause(ctx) + case row, ok := <-rowChan: if !ok { return nil } + if types.IsOkResult(row) { - if len(r.Rows) > 0 { + if res.RowsAffected > 0 { panic("Got OkResult mixed with RowResult") } - r = resultFromOkResult(row[0].(types.OkResult)) + res = resultFromOkResult(row[0].(types.OkResult)) continue } - outputRow, err := RowToSQL(ctx, schema, row, projs, buf) - if err != nil { - return err + outRow, sqlErr := RowToSQL(ctx, schema, row, projs, buf) + if sqlErr != nil { + return sqlErr } - ctx.GetLogger().Tracef("spooling result row %s", outputRow) - r.Rows = append(r.Rows, outputRow) - r.RowsAffected++ + ctx.GetLogger().Tracef("spooling result row %s", outRow) + res.Rows = append(res.Rows, outRow) + res.RowsAffected++ + + // timer has fired, so send on the timer channel if !timer.Stop() { <-timer.C } - case <-timer.C: - // TODO: timer should probably go in its own thread, as rowChan is blocking - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() + + if res.RowsAffected == rowsBatch { + select { + case resChan <- res: + res = nil + case <-ctx.Done(): + return context.Cause(ctx) + } } } + timer.Reset(waitTime) } }) + // Read sqltypes.Result from resChan and send to client + eg.Go(func() (err error) { + defer pan2err(&err) + defer cancelF() + defer wg.Done() + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + + case r, ok := <-resChan: + if !ok { + return nil + } + processedAtLeastOneBatch = true + err = resetCallback(r, more) + if err != nil { + return err + } + } + } + }) + // Close() kills this PID in the process list, // wait until all rows have be sent over the wire eg.Go(func() (err error) { @@ -770,7 +807,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } return nil, false, err } - return r, processedAtLeastOneBatch, nil + return res, processedAtLeastOneBatch, nil } func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema sql.Schema, iter sql.ValueRowIter, resultFields []*querypb.Field, buf *sql.ByteBuffer, callback func(*sqltypes.Result, bool) error, more bool) (*sqltypes.Result, bool, error) { From a71ab39ad39eb6f054432c143408aed3632081a8 Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 01:53:00 -0800 Subject: [PATCH 2/6] fix timer code --- server/handler.go | 138 ++++++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/server/handler.go b/server/handler.go index a49fa22ec7..7c6c1b9a6e 100644 --- a/server/handler.go +++ b/server/handler.go @@ -650,17 +650,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s timer := time.NewTimer(waitTime) defer timer.Stop() - // Wait for signal on the timer.C channel, and error accordingly - eg.Go(func() (err error) { - <-timer.C - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() - } - return nil - }) - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. // A server-side cursor allows the caller to fetch results cached on the server-side, @@ -676,7 +665,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s wg := sync.WaitGroup{} wg.Add(3) - var processedAtLeastOneBatch bool iter, projs := GetDeferredProjections(iter) // Read rows off the row iterator and send them to the row channel. @@ -706,8 +694,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) - // TODO: remember to deal with last result differently - // Read rows off the row channel and convert to wire format. + // Drain rows from rowChan, convert to wire format, and send to resChan var resChan = make(chan *sqltypes.Result, 4) var res *sqltypes.Result eg.Go(func() (err error) { @@ -749,26 +736,29 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s res.Rows = append(res.Rows, outRow) res.RowsAffected++ - // timer has fired, so send on the timer channel - if !timer.Stop() { - <-timer.C - } - if res.RowsAffected == rowsBatch { select { - case resChan <- res: - res = nil case <-ctx.Done(): return context.Cause(ctx) + case resChan <- res: + res = nil } } } - timer.Reset(waitTime) + // timer has gone off + if !timer.Reset(waitTime) { + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Tracef("connection timeout") + return ErrRowTimeout.New() + } + } } }) - // Read sqltypes.Result from resChan and send to client + // Drain sqltypes.Result from resChan and call callback (send to client and reset buffer) + var processedAtLeastOneBatch bool eg.Go(func() (err error) { defer pan2err(&err) defer cancelF() @@ -777,7 +767,6 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s select { case <-ctx.Done(): return context.Cause(ctx) - case r, ok := <-resChan: if !ok { return nil @@ -841,22 +830,22 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema timer := time.NewTimer(waitTime) defer timer.Stop() - wg := sync.WaitGroup{} - wg.Add(2) - // Wrap the callback to include a BytesBuffer.Reset() for non-cursor requests, to // clean out rows that have already been spooled. - resetCallback := func(r *sqltypes.Result, more bool) error { - // A server-side cursor allows the caller to fetch results cached on the server-side, - // so if a cursor exists, we can't release the buffer memory yet. - if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - defer buf.Reset() + // A server-side cursor allows the caller to fetch results cached on the server-side, + // so if a cursor exists, we can't release the buffer memory yet. + resetCallback := callback + if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { + resetCallback = func(r *sqltypes.Result, more bool) error { + buf.Reset() + return callback(r, more) } - return callback(r, more) } - // TODO: send results instead of rows? - // Read rows from iter and send them off + wg := sync.WaitGroup{} + wg.Add(3) + + // Drain rows from iter and send to rowsChan var rowChan = make(chan sql.ValueRow, 512) eg.Go(func() (err error) { defer pan2err(&err) @@ -867,12 +856,12 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema case <-ctx.Done(): return context.Cause(ctx) default: - row, err := iter.NextValueRow(ctx) - if err == io.EOF { + row, iErr := iter.NextValueRow(ctx) + if iErr == io.EOF { return nil } - if err != nil { - return err + if iErr != nil { + return iErr } select { case rowChan <- row: @@ -883,53 +872,81 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } }) + // Drain rows from rowChan, convert to wire format, and send to resChan + var resChan = make(chan *sqltypes.Result, 4) var res *sqltypes.Result - var processedAtLeastOneBatch bool eg.Go(func() (err error) { defer pan2err(&err) - defer cancelF() + defer close(resChan) defer wg.Done() + for { if res == nil { res = &sqltypes.Result{ Fields: resultFields, - Rows: make([][]sqltypes.Value, 0, rowsBatch), - } - } - if res.RowsAffected == rowsBatch { - if err := resetCallback(res, more); err != nil { - return err + Rows: make([][]sqltypes.Value, rowsBatch), } - res = nil - processedAtLeastOneBatch = true - continue } select { case <-ctx.Done(): return context.Cause(ctx) - case <-timer.C: + + case row, ok := <-rowChan: + if !ok { + return nil + } + + outRow, sqlErr := RowValueToSQLValues(ctx, schema, row, buf) + if sqlErr != nil { + return sqlErr + } + + ctx.GetLogger().Tracef("spooling result row %s", outRow) + res.Rows[res.RowsAffected] = outRow + res.RowsAffected++ + + if res.RowsAffected == rowsBatch { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case resChan <- res: + res = nil + } + } + } + + // timer has gone off + if !timer.Reset(waitTime) { if h.readTimeout != 0 { // Cancel and return so Vitess can call the CloseConnection callback ctx.GetLogger().Tracef("connection timeout") return ErrRowTimeout.New() } - case row, ok := <-rowChan: + } + } + }) + + // Drain sqltypes.Result from resChan and call callback (send to client and reset buffer) + var processedAtLeastOneBatch bool + eg.Go(func() (err error) { + defer pan2err(&err) + defer cancelF() + defer wg.Done() + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case r, ok := <-resChan: if !ok { return nil } - resRow, err := RowValueToSQLValues(ctx, schema, row, buf) + processedAtLeastOneBatch = true + err = resetCallback(r, more) if err != nil { return err } - ctx.GetLogger().Tracef("spooling result row %s", resRow) - res.Rows = append(res.Rows, resRow) - res.RowsAffected++ - if !timer.Stop() { - <-timer.C - } } - timer.Reset(waitTime) } }) @@ -950,6 +967,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil, false, err } + res.Rows = res.Rows[:res.RowsAffected] return res, processedAtLeastOneBatch, nil } From 877067dabf8dc116bffae80ecea7afe79884837a Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 02:25:54 -0800 Subject: [PATCH 3/6] fix timer again --- server/handler.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/server/handler.go b/server/handler.go index 7c6c1b9a6e..781e19201c 100644 --- a/server/handler.go +++ b/server/handler.go @@ -499,6 +499,9 @@ func (h *Handler) doQuery( r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) } else { r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) + if err != nil { + return remainder, err + } } if err != nil { return remainder, err @@ -714,6 +717,13 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s case <-ctx.Done(): return context.Cause(ctx) + case <-timer.C: + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Tracef("connection timeout") + return ErrRowTimeout.New() + } + case row, ok := <-rowChan: if !ok { return nil @@ -892,6 +902,13 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema case <-ctx.Done(): return context.Cause(ctx) + case <-timer.C: + if h.readTimeout != 0 { + // Cancel and return so Vitess can call the CloseConnection callback + ctx.GetLogger().Tracef("connection timeout") + return ErrRowTimeout.New() + } + case row, ok := <-rowChan: if !ok { return nil @@ -968,7 +985,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } res.Rows = res.Rows[:res.RowsAffected] - return res, processedAtLeastOneBatch, nil + return res, processedAtLeastOneBatch, err } // See https://dev.mysql.com/doc/internals/en/status-flags.html From ab4d1ac5a878b41b3a3b1c82b5bd0b8148057f0a Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 10:26:02 -0800 Subject: [PATCH 4/6] cleanup --- server/handler.go | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/server/handler.go b/server/handler.go index 781e19201c..51a3a3c1e1 100644 --- a/server/handler.go +++ b/server/handler.go @@ -660,7 +660,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s resetCallback := callback if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { resetCallback = func(r *sqltypes.Result, more bool) error { - buf.Reset() + defer buf.Reset() return callback(r, more) } } @@ -685,8 +685,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s if iErr == io.EOF { return nil } - if err != nil { - return err + if iErr != nil { + return iErr } select { case rowChan <- row: @@ -756,14 +756,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } } - // timer has gone off - if !timer.Reset(waitTime) { - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() - } - } + timer.Reset(waitTime) } }) @@ -847,7 +840,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema resetCallback := callback if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { resetCallback = func(r *sqltypes.Result, more bool) error { - buf.Reset() + defer buf.Reset() return callback(r, more) } } @@ -933,14 +926,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema } } - // timer has gone off - if !timer.Reset(waitTime) { - if h.readTimeout != 0 { - // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") - return ErrRowTimeout.New() - } - } + timer.Reset(waitTime) } }) From feef0825af7602ff0418324ea43c30e44c7b5a9d Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 10:43:56 -0800 Subject: [PATCH 5/6] tidy --- server/handler.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/handler.go b/server/handler.go index 51a3a3c1e1..3c916eff65 100644 --- a/server/handler.go +++ b/server/handler.go @@ -499,9 +499,6 @@ func (h *Handler) doQuery( r, processedAtLeastOneBatch, err = h.resultForValueRowIter(sqlCtx, c, schema, vr, resultFields, buf, callback, more) } else { r, processedAtLeastOneBatch, err = h.resultForDefaultIter(sqlCtx, c, schema, rowIter, callback, resultFields, more, buf) - if err != nil { - return remainder, err - } } if err != nil { return remainder, err @@ -665,11 +662,11 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } } + iter, projs := GetDeferredProjections(iter) + wg := sync.WaitGroup{} wg.Add(3) - iter, projs := GetDeferredProjections(iter) - // Read rows off the row iterator and send them to the row channel. var rowChan = make(chan sql.Row, 512) eg.Go(func() (err error) { From 8ef1f74c5dbdadb28fb7734a0c7b6d56bba6645d Mon Sep 17 00:00:00 2001 From: James Cor Date: Wed, 19 Nov 2025 13:30:24 -0800 Subject: [PATCH 6/6] feedback --- server/handler.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/server/handler.go b/server/handler.go index 3c916eff65..827fe3cb53 100644 --- a/server/handler.go +++ b/server/handler.go @@ -654,9 +654,8 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s // clean out rows that have already been spooled. // A server-side cursor allows the caller to fetch results cached on the server-side, // so if a cursor exists, we can't release the buffer memory yet. - resetCallback := callback if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - resetCallback = func(r *sqltypes.Result, more bool) error { + callback = func(r *sqltypes.Result, more bool) error { defer buf.Reset() return callback(r, more) } @@ -717,7 +716,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s case <-timer.C: if h.readTimeout != 0 { // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") + ctx.GetLogger().Warn("connection timeout") return ErrRowTimeout.New() } @@ -757,7 +756,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s } }) - // Drain sqltypes.Result from resChan and call callback (send to client and reset buffer) + // Drain sqltypes.Result from resChan and call callback (send to client and potentially reset buffer) var processedAtLeastOneBatch bool eg.Go(func() (err error) { defer pan2err(&err) @@ -772,7 +771,7 @@ func (h *Handler) resultForDefaultIter(ctx *sql.Context, c *mysql.Conn, schema s return nil } processedAtLeastOneBatch = true - err = resetCallback(r, more) + err = callback(r, more) if err != nil { return err } @@ -834,9 +833,8 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema // clean out rows that have already been spooled. // A server-side cursor allows the caller to fetch results cached on the server-side, // so if a cursor exists, we can't release the buffer memory yet. - resetCallback := callback if c.StatusFlags&uint16(mysql.ServerCursorExists) != 0 { - resetCallback = func(r *sqltypes.Result, more bool) error { + callback = func(r *sqltypes.Result, more bool) error { defer buf.Reset() return callback(r, more) } @@ -895,7 +893,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema case <-timer.C: if h.readTimeout != 0 { // Cancel and return so Vitess can call the CloseConnection callback - ctx.GetLogger().Tracef("connection timeout") + ctx.GetLogger().Warn("connection timeout") return ErrRowTimeout.New() } @@ -942,7 +940,7 @@ func (h *Handler) resultForValueRowIter(ctx *sql.Context, c *mysql.Conn, schema return nil } processedAtLeastOneBatch = true - err = resetCallback(r, more) + err = callback(r, more) if err != nil { return err }