Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for context.Context #608

Merged
merged 18 commits into from Jun 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions AUTHORS
Expand Up @@ -14,18 +14,21 @@
Aaron Hopkins <go-sql-driver at die.net>
Arne Hormann <arnehormann at gmail.com>
Asta Xie <xiemengjun at gmail.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Carlos Nieto <jose.carlos at menteslibres.net>
Chris Moos <chris at tech9computers.com>
Daniel Nichter <nil at codenode.com>
Daniël van Eeden <git at myname.nl>
Dave Protasowski <dprotaso at gmail.com>
DisposaBoy <disposaboy at dby.me>
Egor Smolyakov <egorsmkv at gmail.com>
Evan Shaw <evan at vendhq.com>
Frederick Mayle <frederickmayle at gmail.com>
Gustavo Kristic <gkristic at gmail.com>
Hanno Braun <mail at hannobraun.com>
Henri Yandell <flamefew at gmail.com>
Hirotaka Yamamoto <ymmt2005 at gmail.com>
ICHINOSE Shogo <shogo82148 at gmail.com>
INADA Naoki <songofacandy at gmail.com>
Jacek Szwec <szwec.jacek at gmail.com>
James Harr <james.harr at gmail.com>
Expand All @@ -45,6 +48,7 @@ Luke Scott <luke at webconnex.com>
Michael Woolnough <michael.woolnough at gmail.com>
Nicola Peduzzi <thenikso at gmail.com>
Olivier Mengué <dolmen at cpan.org>
oscarzhao <oscarzhaosl at gmail.com>
Paul Bonser <misterpib at gmail.com>
Peter Schultz <peter.schultz at classmarkets.com>
Rebecca Chin <rchin at pivotal.io>
Expand Down
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -19,6 +19,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac
* [LOAD DATA LOCAL INFILE support](#load-data-local-infile-support)
* [time.Time support](#timetime-support)
* [Unicode support](#unicode-support)
* [context.Context Support](#contextcontext-support)
* [Testing / Development](#testing--development)
* [License](#license)

Expand Down Expand Up @@ -443,6 +444,9 @@ Version 1.0 of the driver recommended adding `&charset=utf8` (alias for `SET NAM

See http://dev.mysql.com/doc/refman/5.7/en/charset-unicode.html for more details on MySQL's Unicode support.

## `context.Context` Support
Go 1.8 added `database/sql` support for `context.Context`. This driver supports query timeouts and cancellation via contexts.
See [context support in the database/sql package](https://golang.org/doc/go1.8#database_sql) for more details.

## Testing / Development
To run the driver tests you may need to adjust the configuration. See the [Testing Wiki-Page](https://github.com/go-sql-driver/mysql/wiki/Testing "Testing") for details.
Expand Down
93 changes: 93 additions & 0 deletions benchmark_go18_test.go
@@ -0,0 +1,93 @@
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package
//
// Copyright 2017 The Go-MySQL-Driver Authors. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.

// +build go1.8

package mysql

import (
"context"
"database/sql"
"fmt"
"runtime"
"testing"
)

func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db.SetMaxIdleConns(p * runtime.GOMAXPROCS(0))

tb := (*TB)(b)
stmt := tb.checkStmt(db.PrepareContext(ctx, "SELECT val FROM foo WHERE id=?"))
defer stmt.Close()

b.SetParallelism(p)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var got string
for pb.Next() {
tb.check(stmt.QueryRow(1).Scan(&got))
if got != "one" {
b.Fatalf("query = %q; want one", got)
}
}
})
}

func BenchmarkQueryContext(b *testing.B) {
db := initDB(b,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
`INSERT INTO foo VALUES (2, "two")`,
)
defer db.Close()
for _, p := range []int{1, 2, 3, 4} {
b.Run(fmt.Sprintf("%d", p), func(b *testing.B) {
benchmarkQueryContext(b, db, p)
})
}
}

func benchmarkExecContext(b *testing.B, db *sql.DB, p int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db.SetMaxIdleConns(p * runtime.GOMAXPROCS(0))

tb := (*TB)(b)
stmt := tb.checkStmt(db.PrepareContext(ctx, "DO 1"))
defer stmt.Close()

b.SetParallelism(p)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if _, err := stmt.ExecContext(ctx); err != nil {
b.Fatal(err)
}
}
})
}

func BenchmarkExecContext(b *testing.B) {
db := initDB(b,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
`INSERT INTO foo VALUES (2, "two")`,
)
defer db.Close()
for _, p := range []int{1, 2, 3, 4} {
b.Run(fmt.Sprintf("%d", p), func(b *testing.B) {
benchmarkQueryContext(b, db, p)
})
}
}
95 changes: 84 additions & 11 deletions connection.go
Expand Up @@ -14,9 +14,21 @@ import (
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

// a copy of context.Context for Go 1.7 and later.
type mysqlContext interface {
Done() <-chan struct{}
Err() error

// They are defined in context.Context, but go-mysql-driver does not use them.
// Deadline() (deadline time.Time, ok bool)
// Value(key interface{}) interface{}
}

type mysqlConn struct {
buf buffer
netConn net.Conn
Expand All @@ -31,6 +43,19 @@ type mysqlConn struct {
sequence uint8
parseTime bool
strict bool

// for context support (From Go 1.8)
watching bool
watcher chan<- mysqlContext
closech chan struct{}
finished chan<- struct{}

// set non-zero when conn is closed, before closech is closed.
// accessed atomically.
closed int32

mu sync.Mutex // guards following fields
canceledErr error // set non-nil if conn is canceled
}

// Handles parameters set in DSN after the connection is established
Expand Down Expand Up @@ -64,7 +89,7 @@ func (mc *mysqlConn) handleParams() (err error) {
}

func (mc *mysqlConn) Begin() (driver.Tx, error) {
if mc.netConn == nil {
if mc.isBroken() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
Expand All @@ -78,7 +103,7 @@ func (mc *mysqlConn) Begin() (driver.Tx, error) {

func (mc *mysqlConn) Close() (err error) {
// Makes Close idempotent
if mc.netConn != nil {
if !mc.isBroken() {
err = mc.writeCommandPacket(comQuit)
}

Expand All @@ -92,19 +117,36 @@ func (mc *mysqlConn) Close() (err error) {
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
if atomic.SwapInt32(&mc.closed, 1) != 0 {
return
}

// Makes cleanup idempotent
if mc.netConn != nil {
if err := mc.netConn.Close(); err != nil {
errLog.Print(err)
close(mc.closech)
if mc.netConn == nil {
return
}
if err := mc.netConn.Close(); err != nil {
errLog.Print(err)
}
}

func (mc *mysqlConn) isBroken() bool {
return atomic.LoadInt32(&mc.closed) != 0
}

func (mc *mysqlConn) error() error {
if mc.isBroken() {
if err := mc.canceled(); err != nil {
return err
}
mc.netConn = nil
return ErrInvalidConn
}
mc.cfg = nil
mc.buf.nc = nil
return nil
}

func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) {
if mc.netConn == nil {
if mc.isBroken() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
Expand Down Expand Up @@ -258,7 +300,7 @@ func (mc *mysqlConn) interpolateParams(query string, args []driver.Value) (strin
}

func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, error) {
if mc.netConn == nil {
if mc.isBroken() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
Expand Down Expand Up @@ -315,7 +357,11 @@ func (mc *mysqlConn) exec(query string) error {
}

func (mc *mysqlConn) Query(query string, args []driver.Value) (driver.Rows, error) {
if mc.netConn == nil {
return mc.query(query, args)
}

func (mc *mysqlConn) query(query string, args []driver.Value) (*textRows, error) {
if mc.isBroken() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
Expand Down Expand Up @@ -387,3 +433,30 @@ func (mc *mysqlConn) getSystemVar(name string) ([]byte, error) {
}
return nil, err
}

// finish is called when the query has canceled.
func (mc *mysqlConn) cancel(err error) {
mc.mu.Lock()
mc.canceledErr = err
mc.mu.Unlock()
mc.cleanup()
}

// canceled returns non-nil if the connection was closed due to context cancelation.
func (mc *mysqlConn) canceled() error {
mc.mu.Lock()
defer mc.mu.Unlock()
return mc.canceledErr
}

// finish is called when the query has succeeded.
func (mc *mysqlConn) finish() {
if !mc.watching || mc.finished == nil {
return
}
select {
case mc.finished <- struct{}{}:
mc.watching = false
case <-mc.closech:
}
}