From 740137062524e5f9b1fd7a1fcad2a7ca1fee3385 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 24 Oct 2016 13:57:45 +0200 Subject: [PATCH 1/3] updated go-mysql library adapted code accordingly; will test thoroughly --- go/binlog/gomysql_reader.go | 19 +- vendor/github.com/ngaut/log/LICENSE | 165 ++++++++ vendor/github.com/ngaut/log/crash_unix.go | 18 + vendor/github.com/ngaut/log/crash_win.go | 37 ++ vendor/github.com/ngaut/log/log.go | 380 ++++++++++++++++++ .../siddontang/go-mysql/client/client_test.go | 8 +- .../siddontang/go-mysql/mysql/mysql_test.go | 18 +- .../siddontang/go-mysql/mysql/resultset.go | 58 ++- .../siddontang/go-mysql/mysql/util.go | 16 + .../siddontang/go-mysql/replication/backup.go | 19 +- .../go-mysql/replication/binlogstreamer.go | 35 +- .../go-mysql/replication/binlogsyncer.go | 377 ++++++++--------- .../siddontang/go-mysql/replication/bitmap.go | 38 -- .../siddontang/go-mysql/replication/parser.go | 22 +- .../go-mysql/replication/parser_test.go | 2 +- .../go-mysql/replication/replication_test.go | 54 ++- .../go-mysql/replication/row_event.go | 48 ++- .../go-mysql/replication/row_event_test.go | 61 ++- vendor/golang.org/x/net | 1 + 19 files changed, 1055 insertions(+), 321 deletions(-) create mode 100644 vendor/github.com/ngaut/log/LICENSE create mode 100644 vendor/github.com/ngaut/log/crash_unix.go create mode 100644 vendor/github.com/ngaut/log/crash_win.go create mode 100644 vendor/github.com/ngaut/log/log.go delete mode 100644 vendor/github.com/siddontang/go-mysql/replication/bitmap.go create mode 160000 vendor/golang.org/x/net diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index ac6d8909a..14aa61965 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -9,6 +9,8 @@ import ( "fmt" "sync" + "golang.org/x/net/context" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -40,7 +42,16 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G binlogSyncer: nil, binlogStreamer: nil, } - binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") + binlogSyncerConfig := &replication.BinlogSyncerConfig{ + ServerID: serverId, + Flavor: "mysql", + Host: connectionConfig.Key.Hostname, + Port: uint16(connectionConfig.Key.Port), + User: connectionConfig.User, + Password: connectionConfig.Password, + } + + binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig) return binlogReader, err } @@ -50,10 +61,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin if coordinates.IsEmpty() { return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()") } - log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port)) - if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil { - return err - } this.currentCoordinates = coordinates log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates) @@ -124,7 +131,7 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if canStopStreaming() { break } - ev, err := this.binlogStreamer.GetEvent() + ev, err := this.binlogStreamer.GetEvent(context.Background()) if err != nil { return err } diff --git a/vendor/github.com/ngaut/log/LICENSE b/vendor/github.com/ngaut/log/LICENSE new file mode 100644 index 000000000..6600f1c98 --- /dev/null +++ b/vendor/github.com/ngaut/log/LICENSE @@ -0,0 +1,165 @@ +GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/vendor/github.com/ngaut/log/crash_unix.go b/vendor/github.com/ngaut/log/crash_unix.go new file mode 100644 index 000000000..37f407def --- /dev/null +++ b/vendor/github.com/ngaut/log/crash_unix.go @@ -0,0 +1,18 @@ +// +build freebsd openbsd netbsd dragonfly darwin linux + +package log + +import ( + "log" + "os" + "syscall" +) + +func CrashLog(file string) { + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Println(err.Error()) + } else { + syscall.Dup2(int(f.Fd()), 2) + } +} diff --git a/vendor/github.com/ngaut/log/crash_win.go b/vendor/github.com/ngaut/log/crash_win.go new file mode 100644 index 000000000..7d612eea6 --- /dev/null +++ b/vendor/github.com/ngaut/log/crash_win.go @@ -0,0 +1,37 @@ +// +build windows + +package log + +import ( + "log" + "os" + "syscall" +) + +var ( + kernel32 = syscall.MustLoadDLL("kernel32.dll") + procSetStdHandle = kernel32.MustFindProc("SetStdHandle") +) + +func setStdHandle(stdhandle int32, handle syscall.Handle) error { + r0, _, e1 := syscall.Syscall(procSetStdHandle.Addr(), 2, uintptr(stdhandle), uintptr(handle), 0) + if r0 == 0 { + if e1 != 0 { + return error(e1) + } + return syscall.EINVAL + } + return nil +} + +func CrashLog(file string) { + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Println(err.Error()) + } else { + err = setStdHandle(syscall.STD_ERROR_HANDLE, syscall.Handle(f.Fd())) + if err != nil { + log.Println(err.Error()) + } + } +} diff --git a/vendor/github.com/ngaut/log/log.go b/vendor/github.com/ngaut/log/log.go new file mode 100644 index 000000000..896b39309 --- /dev/null +++ b/vendor/github.com/ngaut/log/log.go @@ -0,0 +1,380 @@ +//high level log wrapper, so it can output different log based on level +package log + +import ( + "fmt" + "io" + "log" + "os" + "runtime" + "sync" + "time" +) + +const ( + Ldate = log.Ldate + Llongfile = log.Llongfile + Lmicroseconds = log.Lmicroseconds + Lshortfile = log.Lshortfile + LstdFlags = log.LstdFlags + Ltime = log.Ltime +) + +type ( + LogLevel int + LogType int +) + +const ( + LOG_FATAL = LogType(0x1) + LOG_ERROR = LogType(0x2) + LOG_WARNING = LogType(0x4) + LOG_INFO = LogType(0x8) + LOG_DEBUG = LogType(0x10) +) + +const ( + LOG_LEVEL_NONE = LogLevel(0x0) + LOG_LEVEL_FATAL = LOG_LEVEL_NONE | LogLevel(LOG_FATAL) + LOG_LEVEL_ERROR = LOG_LEVEL_FATAL | LogLevel(LOG_ERROR) + LOG_LEVEL_WARN = LOG_LEVEL_ERROR | LogLevel(LOG_WARNING) + LOG_LEVEL_INFO = LOG_LEVEL_WARN | LogLevel(LOG_INFO) + LOG_LEVEL_DEBUG = LOG_LEVEL_INFO | LogLevel(LOG_DEBUG) + LOG_LEVEL_ALL = LOG_LEVEL_DEBUG +) + +const FORMAT_TIME_DAY string = "20060102" +const FORMAT_TIME_HOUR string = "2006010215" + +var _log *logger = New() + +func init() { + SetFlags(Ldate | Ltime | Lshortfile) + SetHighlighting(runtime.GOOS != "windows") +} + +func Logger() *log.Logger { + return _log._log +} + +func SetLevel(level LogLevel) { + _log.SetLevel(level) +} +func GetLogLevel() LogLevel { + return _log.level +} + +func SetOutput(out io.Writer) { + _log.SetOutput(out) +} + +func SetOutputByName(path string) error { + return _log.SetOutputByName(path) +} + +func SetFlags(flags int) { + _log._log.SetFlags(flags) +} + +func Info(v ...interface{}) { + _log.Info(v...) +} + +func Infof(format string, v ...interface{}) { + _log.Infof(format, v...) +} + +func Debug(v ...interface{}) { + _log.Debug(v...) +} + +func Debugf(format string, v ...interface{}) { + _log.Debugf(format, v...) +} + +func Warn(v ...interface{}) { + _log.Warning(v...) +} + +func Warnf(format string, v ...interface{}) { + _log.Warningf(format, v...) +} + +func Warning(v ...interface{}) { + _log.Warning(v...) +} + +func Warningf(format string, v ...interface{}) { + _log.Warningf(format, v...) +} + +func Error(v ...interface{}) { + _log.Error(v...) +} + +func Errorf(format string, v ...interface{}) { + _log.Errorf(format, v...) +} + +func Fatal(v ...interface{}) { + _log.Fatal(v...) +} + +func Fatalf(format string, v ...interface{}) { + _log.Fatalf(format, v...) +} + +func SetLevelByString(level string) { + _log.SetLevelByString(level) +} + +func SetHighlighting(highlighting bool) { + _log.SetHighlighting(highlighting) +} + +func SetRotateByDay() { + _log.SetRotateByDay() +} + +func SetRotateByHour() { + _log.SetRotateByHour() +} + +type logger struct { + _log *log.Logger + level LogLevel + highlighting bool + + dailyRolling bool + hourRolling bool + + fileName string + logSuffix string + fd *os.File + + lock sync.Mutex +} + +func (l *logger) SetHighlighting(highlighting bool) { + l.highlighting = highlighting +} + +func (l *logger) SetLevel(level LogLevel) { + l.level = level +} + +func (l *logger) SetLevelByString(level string) { + l.level = StringToLogLevel(level) +} + +func (l *logger) SetRotateByDay() { + l.dailyRolling = true + l.logSuffix = genDayTime(time.Now()) +} + +func (l *logger) SetRotateByHour() { + l.hourRolling = true + l.logSuffix = genHourTime(time.Now()) +} + +func (l *logger) rotate() error { + l.lock.Lock() + defer l.lock.Unlock() + + var suffix string + if l.dailyRolling { + suffix = genDayTime(time.Now()) + } else if l.hourRolling { + suffix = genHourTime(time.Now()) + } else { + return nil + } + + // Notice: if suffix is not equal to l.LogSuffix, then rotate + if suffix != l.logSuffix { + err := l.doRotate(suffix) + if err != nil { + return err + } + } + + return nil +} + +func (l *logger) doRotate(suffix string) error { + // Notice: Not check error, is this ok? + l.fd.Close() + + lastFileName := l.fileName + "." + l.logSuffix + err := os.Rename(l.fileName, lastFileName) + if err != nil { + return err + } + + err = l.SetOutputByName(l.fileName) + if err != nil { + return err + } + + l.logSuffix = suffix + + return nil +} + +func (l *logger) SetOutput(out io.Writer) { + l._log = log.New(out, l._log.Prefix(), l._log.Flags()) +} + +func (l *logger) SetOutputByName(path string) error { + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) + if err != nil { + log.Fatal(err) + } + + l.SetOutput(f) + + l.fileName = path + l.fd = f + + return err +} + +func (l *logger) log(t LogType, v ...interface{}) { + if l.level|LogLevel(t) != l.level { + return + } + + err := l.rotate() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) + return + } + + v1 := make([]interface{}, len(v)+2) + logStr, logColor := LogTypeToString(t) + if l.highlighting { + v1[0] = "\033" + logColor + "m[" + logStr + "]" + copy(v1[1:], v) + v1[len(v)+1] = "\033[0m" + } else { + v1[0] = "[" + logStr + "]" + copy(v1[1:], v) + v1[len(v)+1] = "" + } + + s := fmt.Sprintln(v1...) + l._log.Output(4, s) +} + +func (l *logger) logf(t LogType, format string, v ...interface{}) { + if l.level|LogLevel(t) != l.level { + return + } + + err := l.rotate() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) + return + } + + logStr, logColor := LogTypeToString(t) + var s string + if l.highlighting { + s = "\033" + logColor + "m[" + logStr + "] " + fmt.Sprintf(format, v...) + "\033[0m" + } else { + s = "[" + logStr + "] " + fmt.Sprintf(format, v...) + } + l._log.Output(4, s) +} + +func (l *logger) Fatal(v ...interface{}) { + l.log(LOG_FATAL, v...) + os.Exit(-1) +} + +func (l *logger) Fatalf(format string, v ...interface{}) { + l.logf(LOG_FATAL, format, v...) + os.Exit(-1) +} + +func (l *logger) Error(v ...interface{}) { + l.log(LOG_ERROR, v...) +} + +func (l *logger) Errorf(format string, v ...interface{}) { + l.logf(LOG_ERROR, format, v...) +} + +func (l *logger) Warning(v ...interface{}) { + l.log(LOG_WARNING, v...) +} + +func (l *logger) Warningf(format string, v ...interface{}) { + l.logf(LOG_WARNING, format, v...) +} + +func (l *logger) Debug(v ...interface{}) { + l.log(LOG_DEBUG, v...) +} + +func (l *logger) Debugf(format string, v ...interface{}) { + l.logf(LOG_DEBUG, format, v...) +} + +func (l *logger) Info(v ...interface{}) { + l.log(LOG_INFO, v...) +} + +func (l *logger) Infof(format string, v ...interface{}) { + l.logf(LOG_INFO, format, v...) +} + +func StringToLogLevel(level string) LogLevel { + switch level { + case "fatal": + return LOG_LEVEL_FATAL + case "error": + return LOG_LEVEL_ERROR + case "warn": + return LOG_LEVEL_WARN + case "warning": + return LOG_LEVEL_WARN + case "debug": + return LOG_LEVEL_DEBUG + case "info": + return LOG_LEVEL_INFO + } + return LOG_LEVEL_ALL +} + +func LogTypeToString(t LogType) (string, string) { + switch t { + case LOG_FATAL: + return "fatal", "[0;31" + case LOG_ERROR: + return "error", "[0;31" + case LOG_WARNING: + return "warning", "[0;33" + case LOG_DEBUG: + return "debug", "[0;36" + case LOG_INFO: + return "info", "[0;37" + } + return "unknown", "[0;37" +} + +func genDayTime(t time.Time) string { + return t.Format(FORMAT_TIME_DAY) +} + +func genHourTime(t time.Time) string { + return t.Format(FORMAT_TIME_HOUR) +} + +func New() *logger { + return Newlogger(os.Stderr, "") +} + +func Newlogger(w io.Writer, prefix string) *logger { + return &logger{_log: log.New(w, prefix, LstdFlags), level: LOG_LEVEL_ALL, highlighting: true} +} diff --git a/vendor/github.com/siddontang/go-mysql/client/client_test.go b/vendor/github.com/siddontang/go-mysql/client/client_test.go index dbf5a70d6..40e7a6af8 100644 --- a/vendor/github.com/siddontang/go-mysql/client/client_test.go +++ b/vendor/github.com/siddontang/go-mysql/client/client_test.go @@ -5,12 +5,13 @@ import ( "fmt" "testing" - . "gopkg.in/check.v1" + . "github.com/pingcap/check" "github.com/siddontang/go-mysql/mysql" ) -var testAddr = flag.String("addr", "127.0.0.1:3306", "MySQL server address") +var testHost = flag.String("host", "127.0.0.1", "MySQL server host") +var testPort = flag.Int("port", 3306, "MySQL server port") var testUser = flag.String("user", "root", "MySQL user") var testPassword = flag.String("pass", "", "MySQL password") var testDB = flag.String("db", "test", "MySQL test database") @@ -27,7 +28,8 @@ var _ = Suite(&clientTestSuite{}) func (s *clientTestSuite) SetUpSuite(c *C) { var err error - s.c, err = Connect(*testAddr, *testUser, *testPassword, *testDB) + addr := fmt.Sprintf("%s:%d", *testHost, *testPort) + s.c, err = Connect(addr, *testUser, *testPassword, *testDB) if err != nil { c.Fatal(err) } diff --git a/vendor/github.com/siddontang/go-mysql/mysql/mysql_test.go b/vendor/github.com/siddontang/go-mysql/mysql/mysql_test.go index ce251ee72..8fbaaabc6 100644 --- a/vendor/github.com/siddontang/go-mysql/mysql/mysql_test.go +++ b/vendor/github.com/siddontang/go-mysql/mysql/mysql_test.go @@ -3,7 +3,7 @@ package mysql import ( "testing" - "gopkg.in/check.v1" + "github.com/pingcap/check" ) func Test(t *testing.T) { @@ -114,40 +114,40 @@ func (t *mysqlTestSuite) TestMysqlParseBinaryUint8(c *check.C) { func (t *mysqlTestSuite) TestMysqlParseBinaryInt16(c *check.C) { i16 := ParseBinaryInt16([]byte{1, 128}) - c.Assert(i16, check.Equals, int16(-128*256 + 1)) + c.Assert(i16, check.Equals, int16(-128*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryUint16(c *check.C) { u16 := ParseBinaryUint16([]byte{1, 128}) - c.Assert(u16, check.Equals, uint16(128*256 + 1)) + c.Assert(u16, check.Equals, uint16(128*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryInt24(c *check.C) { i32 := ParseBinaryInt24([]byte{1, 2, 128}) - c.Assert(i32, check.Equals, int32(-128*65536 + 2*256 + 1)) + c.Assert(i32, check.Equals, int32(-128*65536+2*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryUint24(c *check.C) { u32 := ParseBinaryUint24([]byte{1, 2, 128}) - c.Assert(u32, check.Equals, uint32(128*65536 + 2*256 + 1)) + c.Assert(u32, check.Equals, uint32(128*65536+2*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryInt32(c *check.C) { i32 := ParseBinaryInt32([]byte{1, 2, 3, 128}) - c.Assert(i32, check.Equals, int32(-128*16777216 + 3*65536 + 2*256 + 1)) + c.Assert(i32, check.Equals, int32(-128*16777216+3*65536+2*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryUint32(c *check.C) { u32 := ParseBinaryUint32([]byte{1, 2, 3, 128}) - c.Assert(u32, check.Equals, uint32(128*16777216 + 3*65536 + 2*256 + 1)) + c.Assert(u32, check.Equals, uint32(128*16777216+3*65536+2*256+1)) } func (t *mysqlTestSuite) TestMysqlParseBinaryInt64(c *check.C) { i64 := ParseBinaryInt64([]byte{1, 2, 3, 4, 5, 6, 7, 128}) - c.Assert(i64, check.Equals, -128*int64(72057594037927936) + 7*int64(281474976710656) + 6*int64(1099511627776) + 5*int64(4294967296) + 4*16777216 + 3*65536 + 2*256 + 1) + c.Assert(i64, check.Equals, -128*int64(72057594037927936)+7*int64(281474976710656)+6*int64(1099511627776)+5*int64(4294967296)+4*16777216+3*65536+2*256+1) } func (t *mysqlTestSuite) TestMysqlParseBinaryUint64(c *check.C) { u64 := ParseBinaryUint64([]byte{1, 2, 3, 4, 5, 6, 7, 128}) - c.Assert(u64, check.Equals, 128*uint64(72057594037927936) + 7*uint64(281474976710656) + 6*uint64(1099511627776) + 5*uint64(4294967296) + 4*16777216 + 3*65536 + 2*256 + 1) + c.Assert(u64, check.Equals, 128*uint64(72057594037927936)+7*uint64(281474976710656)+6*uint64(1099511627776)+5*uint64(4294967296)+4*16777216+3*65536+2*256+1) } diff --git a/vendor/github.com/siddontang/go-mysql/mysql/resultset.go b/vendor/github.com/siddontang/go-mysql/mysql/resultset.go index 1e2ade590..a50d9f89c 100644 --- a/vendor/github.com/siddontang/go-mysql/mysql/resultset.go +++ b/vendor/github.com/siddontang/go-mysql/mysql/resultset.go @@ -1,6 +1,7 @@ package mysql import ( + "fmt" "strconv" "github.com/juju/errors" @@ -141,7 +142,7 @@ func (p RowData) ParseBinary(f []*Field) ([]interface{}, error) { continue case MYSQL_TYPE_DOUBLE: - data[i] = ParseBinaryFloat64(p[pos : pos+4]) + data[i] = ParseBinaryFloat64(p[pos : pos+8]) pos += 8 continue @@ -292,10 +293,28 @@ func (r *Resultset) GetUint(row, column int) (uint64, error) { } switch v := d.(type) { - case uint64: - return v, nil + case int: + return uint64(v), nil + case int8: + return uint64(v), nil + case int16: + return uint64(v), nil + case int32: + return uint64(v), nil case int64: return uint64(v), nil + case uint: + return uint64(v), nil + case uint8: + return uint64(v), nil + case uint16: + return uint64(v), nil + case uint32: + return uint64(v), nil + case uint64: + return uint64(v), nil + case float32: + return uint64(v), nil case float64: return uint64(v), nil case string: @@ -342,12 +361,30 @@ func (r *Resultset) GetFloat(row, column int) (float64, error) { } switch v := d.(type) { - case float64: - return v, nil - case uint64: + case int: + return float64(v), nil + case int8: + return float64(v), nil + case int16: + return float64(v), nil + case int32: return float64(v), nil case int64: return float64(v), nil + case uint: + return float64(v), nil + case uint8: + return float64(v), nil + case uint16: + return float64(v), nil + case uint32: + return float64(v), nil + case uint64: + return float64(v), nil + case float32: + return float64(v), nil + case float64: + return v, nil case string: return strconv.ParseFloat(v, 64) case []byte: @@ -378,10 +415,11 @@ func (r *Resultset) GetString(row, column int) (string, error) { return v, nil case []byte: return hack.String(v), nil - case int64: - return strconv.FormatInt(v, 10), nil - case uint64: - return strconv.FormatUint(v, 10), nil + case int, int8, int16, int32, int64, + uint, uint8, uint16, uint32, uint64: + return fmt.Sprintf("%d", v), nil + case float32: + return strconv.FormatFloat(float64(v), 'f', -1, 64), nil case float64: return strconv.FormatFloat(v, 'f', -1, 64), nil case nil: diff --git a/vendor/github.com/siddontang/go-mysql/mysql/util.go b/vendor/github.com/siddontang/go-mysql/mysql/util.go index fa874de5d..7fe41fa21 100644 --- a/vendor/github.com/siddontang/go-mysql/mysql/util.go +++ b/vendor/github.com/siddontang/go-mysql/mysql/util.go @@ -314,6 +314,22 @@ func GetNetProto(addr string) string { } } +// ErrorEqual returns a boolean indicating whether err1 is equal to err2. +func ErrorEqual(err1, err2 error) bool { + e1 := errors.Cause(err1) + e2 := errors.Cause(err2) + + if e1 == e2 { + return true + } + + if e1 == nil || e2 == nil { + return e1 == e2 + } + + return e1.Error() == e2.Error() +} + var encodeRef = map[byte]byte{ '\x00': '0', '\'': '\'', diff --git a/vendor/github.com/siddontang/go-mysql/replication/backup.go b/vendor/github.com/siddontang/go-mysql/replication/backup.go index f1fda2192..744c38ce5 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/backup.go +++ b/vendor/github.com/siddontang/go-mysql/replication/backup.go @@ -6,6 +6,8 @@ import ( "path" "time" + "golang.org/x/net/context" + "github.com/juju/errors" . "github.com/siddontang/go-mysql/mysql" ) @@ -18,7 +20,8 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du timeout = 30 * 3600 * 24 * time.Second } - b.SetRawMode(true) + // Force use raw mode + b.parser.SetRawMode(true) os.MkdirAll(backupDir, 0755) @@ -38,11 +41,15 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du }() for { - e, err := s.GetEventTimeout(timeout) - if errors.Cause(err) == ErrGetEventTimeout { - // for backward compatibility - return ErrGetEventTimeout - } else if err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + e, err := s.GetEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + return nil + } + + if err != nil { return errors.Trace(err) } diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go index 706fe08c5..e5b165c24 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go @@ -1,24 +1,27 @@ package replication import ( - "time" + "golang.org/x/net/context" "github.com/juju/errors" + "github.com/ngaut/log" ) var ( - ErrGetEventTimeout = errors.New("Get event timeout, try get later") - ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") - ErrSyncClosed = errors.New("Sync was closed") + ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") + ErrSyncClosed = errors.New("Sync was closed") ) +// BinlogStreamer gets the streaming event. type BinlogStreamer struct { ch chan *BinlogEvent ech chan error err error } -func (s *BinlogStreamer) GetEvent() (*BinlogEvent, error) { +// GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL +// or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block. +func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) { if s.err != nil { return nil, ErrNeedSyncAgain } @@ -28,23 +31,8 @@ func (s *BinlogStreamer) GetEvent() (*BinlogEvent, error) { return c, nil case s.err = <-s.ech: return nil, s.err - } -} - -// if timeout, ErrGetEventTimeout will returns -// timeout value won't be set too large, otherwise it may waste lots of memory -func (s *BinlogStreamer) GetEventTimeout(d time.Duration) (*BinlogEvent, error) { - if s.err != nil { - return nil, ErrNeedSyncAgain - } - - select { - case c := <-s.ch: - return c, nil - case s.err = <-s.ech: - return nil, s.err - case <-time.After(d): - return nil, ErrGetEventTimeout + case <-ctx.Done(): + return nil, ctx.Err() } } @@ -56,6 +44,7 @@ func (s *BinlogStreamer) closeWithError(err error) { if err == nil { err = ErrSyncClosed } + log.Errorf("close sync with err: %v", err) select { case s.ech <- err: default: @@ -65,7 +54,7 @@ func (s *BinlogStreamer) closeWithError(err error) { func newBinlogStreamer() *BinlogStreamer { s := new(BinlogStreamer) - s.ch = make(chan *BinlogEvent, 1024) + s.ch = make(chan *BinlogEvent, 10240) s.ech = make(chan error, 4) return s diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index bcea08af2..e6d5590ed 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -7,32 +7,52 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/juju/errors" - "github.com/satori/go.uuid" + "github.com/ngaut/log" "github.com/siddontang/go-mysql/client" . "github.com/siddontang/go-mysql/mysql" ) var ( - errSyncRunning = errors.New("Sync is running, must Close first") - errNotRegistered = errors.New("Syncer is not registered as a slave") + errSyncRunning = errors.New("Sync is running, must Close first") ) -type BinlogSyncer struct { - m sync.Mutex - - flavor string +// BinlogSyncerConfig is the configuration for BinlogSyncer. +type BinlogSyncerConfig struct { + // ServerID is the unique ID in cluster. + ServerID uint32 + // Flavor is "mysql" or "mariadb", if not set, use "mysql" default. + Flavor string + + // Host is for MySQL server host. + Host string + // Port is for MySQL server port. + Port uint16 + // User is for MySQL user. + User string + // Password is for MySQL password. + Password string + + // Localhost is local hostname if register salve. + // If not set, use os.Hostname() instead. + Localhost string + + // SemiSyncEnabled enables semi-sync or not. + SemiSyncEnabled bool + + // RawModeEanbled is for not parsing binlog event. + RawModeEanbled bool +} - c *client.Conn - serverID uint32 +// BinlogSyncer syncs binlog event from server. +type BinlogSyncer struct { + m sync.RWMutex - localhost string - host string - port uint16 - user string - password string + cfg *BinlogSyncerConfig - masterID uint32 + c *client.Conn wg sync.WaitGroup @@ -40,45 +60,29 @@ type BinlogSyncer struct { nextPos Position - running bool - semiSyncEnabled bool + running bool - stopCh chan struct{} + ctx context.Context + cancel context.CancelFunc } -func NewBinlogSyncer(serverID uint32, flavor string) *BinlogSyncer { - b := new(BinlogSyncer) - b.flavor = flavor - - b.serverID = serverID +// NewBinlogSyncer creates the BinlogSyncer with cfg. +func NewBinlogSyncer(cfg *BinlogSyncerConfig) *BinlogSyncer { + log.Infof("create BinlogSyncer with config %v", cfg) - b.masterID = 0 + b := new(BinlogSyncer) + b.cfg = cfg b.parser = NewBinlogParser() + b.parser.SetRawMode(b.cfg.RawModeEanbled) b.running = false - b.semiSyncEnabled = false - - b.stopCh = make(chan struct{}, 1) + b.ctx, b.cancel = context.WithCancel(context.Background()) return b } -// LocalHostname returns the hostname that register slave would register as. -func (b *BinlogSyncer) LocalHostname() string { - - if b.localhost == "" { - h, _ := os.Hostname() - return h - } - return b.localhost -} - -// SetLocalHostname set's the hostname that register salve would register as. -func (b *BinlogSyncer) SetLocalHostname(name string) { - b.localhost = name -} - +// Close closes the BinlogSyncer. func (b *BinlogSyncer) Close() { b.m.Lock() defer b.m.Unlock() @@ -87,99 +91,45 @@ func (b *BinlogSyncer) Close() { } func (b *BinlogSyncer) close() { - if b.c != nil { - b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) + if b.isClosed() { + return } - select { - case b.stopCh <- struct{}{}: - default: - } - - b.wg.Wait() - - if b.c != nil { - b.c.Close() - } + log.Info("syncer is closing...") b.running = false - b.c = nil -} + b.cancel() -func (b *BinlogSyncer) checkExec() error { - if b.running { - return errSyncRunning - } else if b.c == nil { - return errNotRegistered + if b.c != nil { + b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) } - return nil -} - -func (b *BinlogSyncer) GetMasterUUID() (uuid.UUID, error) { - b.m.Lock() - defer b.m.Unlock() + b.wg.Wait() - if err := b.checkExec(); err != nil { - return uuid.UUID{}, err + if b.c != nil { + b.c.Close() } - if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'SERVER_UUID'"); err != nil { - return uuid.UUID{}, err - } else { - s, _ := r.GetString(0, 1) - if s == "" || s == "NONE" { - return uuid.UUID{}, nil - } else { - return uuid.FromString(s) - } - } + log.Info("syncer is closed") } -// You must register slave at first before you do other operations -// This function will close old replication sync if exists -func (b *BinlogSyncer) RegisterSlave(host string, port uint16, user string, password string) error { - b.m.Lock() - defer b.m.Unlock() - - // first, close old replication sync - b.close() - - b.host = host - b.port = port - b.user = user - b.password = password - - err := b.registerSlave() - if err != nil { - b.close() +func (b *BinlogSyncer) isClosed() bool { + select { + case <-b.ctx.Done(): + return true + default: + return false } - return errors.Trace(err) } -// If you close sync before and want to restart again, you can call this before other operations -// This function will close old replication sync if exists -func (b *BinlogSyncer) ReRegisterSlave() error { - b.m.Lock() - defer b.m.Unlock() - - if len(b.host) == 0 || len(b.user) == 0 { - return errors.Errorf("empty host and user, you must register slave before") - } - - b.close() - - err := b.registerSlave() - if err != nil { - b.close() +func (b *BinlogSyncer) registerSlave() error { + if b.c != nil { + b.c.Close() } - return errors.Trace(err) -} - -func (b *BinlogSyncer) registerSlave() error { + log.Infof("register slave for master server %s:%d", b.cfg.Host, b.cfg.Port) var err error - b.c, err = client.Connect(fmt.Sprintf("%s:%d", b.host, b.port), b.user, b.password, "") + b.c, err = client.Connect(fmt.Sprintf("%s:%d", b.cfg.Host, b.cfg.Port), b.cfg.User, b.cfg.Password, "") if err != nil { return errors.Trace(err) } @@ -221,12 +171,9 @@ func (b *BinlogSyncer) registerSlave() error { return nil } -func (b *BinlogSyncer) EnableSemiSync() error { - b.m.Lock() - defer b.m.Unlock() - - if err := b.checkExec(); err != nil { - return errors.Trace(err) +func (b *BinlogSyncer) enalbeSemiSync() error { + if !b.cfg.SemiSyncEnabled { + return nil } if r, err := b.c.Execute("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';"); err != nil { @@ -234,20 +181,38 @@ func (b *BinlogSyncer) EnableSemiSync() error { } else { s, _ := r.GetString(0, 1) if s != "ON" { - return errors.Errorf("master does not support semi synchronous replication") + log.Errorf("master does not support semi synchronous replication, use no semi-sync") + b.cfg.SemiSyncEnabled = false + return nil } } _, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`) if err != nil { - b.semiSyncEnabled = true + return errors.Trace(err) } - return errors.Trace(err) + + return nil +} + +func (b *BinlogSyncer) prepare() error { + if b.isClosed() { + return errors.Trace(ErrSyncClosed) + } + + if err := b.registerSlave(); err != nil { + return errors.Trace(err) + } + + if err := b.enalbeSemiSync(); err != nil { + return errors.Trace(err) + } + + return nil } func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { b.running = true - b.stopCh = make(chan struct{}, 1) s := newBinlogStreamer() @@ -256,66 +221,45 @@ func (b *BinlogSyncer) startDumpStream() *BinlogStreamer { return s } +// StartSync starts syncing from the `pos` position. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { + log.Infof("begin to sync binlog from position %s", pos) + b.m.Lock() defer b.m.Unlock() - if err := b.checkExec(); err != nil { - return nil, err - } - - //always start from position 4 - if pos.Pos < 4 { - pos.Pos = 4 + if b.running { + return nil, errors.Trace(errSyncRunning) } - err := b.writeBinglogDumpCommand(pos) - if err != nil { - return nil, err + if err := b.prepareSyncPos(pos); err != nil { + return nil, errors.Trace(err) } return b.startDumpStream(), nil } -func (b *BinlogSyncer) SetRawMode(mode bool) error { - b.m.Lock() - defer b.m.Unlock() - - if err := b.checkExec(); err != nil { - return errors.Trace(err) - } - - b.parser.SetRawMode(mode) - return nil -} +// StartSyncGTID starts syncing from the `gset` GTIDSet. +func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { + log.Infof("begin to sync binlog from GTID %s", gset) -func (b *BinlogSyncer) ExecuteSql(query string, args ...interface{}) (*Result, error) { b.m.Lock() defer b.m.Unlock() - if err := b.checkExec(); err != nil { - return nil, err + if b.running { + return nil, errors.Trace(errSyncRunning) } - return b.c.Execute(query, args...) -} - -func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { - b.m.Lock() - defer b.m.Unlock() - - if err := b.checkExec(); err != nil { - return nil, err + if err := b.prepare(); err != nil { + return nil, errors.Trace(err) } var err error - switch b.flavor { - case MySQLFlavor: + if b.cfg.Flavor != MariaDBFlavor { + // default use MySQL err = b.writeBinlogDumpMysqlGTIDCommand(gset) - case MariaDBFlavor: + } else { err = b.writeBinlogDumpMariadbGTIDCommand(gset) - default: - err = fmt.Errorf("invalid flavor %s", b.flavor) } if err != nil { @@ -340,7 +284,7 @@ func (b *BinlogSyncer) writeBinglogDumpCommand(p Position) error { binary.LittleEndian.PutUint16(data[pos:], BINLOG_DUMP_NEVER_STOP) pos += 2 - binary.LittleEndian.PutUint32(data[pos:], b.serverID) + binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) pos += 4 copy(data[pos:], p.Name) @@ -362,7 +306,7 @@ func (b *BinlogSyncer) writeBinlogDumpMysqlGTIDCommand(gset GTIDSet) error { binary.LittleEndian.PutUint16(data[pos:], 0) pos += 2 - binary.LittleEndian.PutUint32(data[pos:], b.serverID) + binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) pos += 4 binary.LittleEndian.PutUint32(data[pos:], uint32(len(p.Name))) @@ -414,19 +358,28 @@ func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error { return b.writeBinglogDumpCommand(Position{"", 0}) } +// localHostname returns the hostname that register slave would register as. +func (b *BinlogSyncer) localHostname() string { + if len(b.cfg.Localhost) == 0 { + h, _ := os.Hostname() + return h + } + return b.cfg.Localhost +} + func (b *BinlogSyncer) writeRegisterSlaveCommand() error { b.c.ResetSequence() - hostname := b.LocalHostname() + hostname := b.localHostname() // This should be the name of slave host not the host we are connecting to. - data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.user)+1+len(b.password)+2+4+4) + data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.cfg.User)+1+len(b.cfg.Password)+2+4+4) pos := 4 data[pos] = COM_REGISTER_SLAVE pos++ - binary.LittleEndian.PutUint32(data[pos:], b.serverID) + binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID) pos += 4 // This should be the name of slave hostname not the host we are connecting to. @@ -435,24 +388,25 @@ func (b *BinlogSyncer) writeRegisterSlaveCommand() error { n := copy(data[pos:], hostname) pos += n - data[pos] = uint8(len(b.user)) + data[pos] = uint8(len(b.cfg.User)) pos++ - n = copy(data[pos:], b.user) + n = copy(data[pos:], b.cfg.User) pos += n - data[pos] = uint8(len(b.password)) + data[pos] = uint8(len(b.cfg.Password)) pos++ - n = copy(data[pos:], b.password) + n = copy(data[pos:], b.cfg.Password) pos += n - binary.LittleEndian.PutUint16(data[pos:], b.port) + binary.LittleEndian.PutUint16(data[pos:], b.cfg.Port) pos += 2 //replication rank, not used binary.LittleEndian.PutUint32(data[pos:], 0) pos += 4 - binary.LittleEndian.PutUint32(data[pos:], b.masterID) + // master ID, 0 is OK + binary.LittleEndian.PutUint32(data[pos:], 0) return b.c.WritePacket(data) } @@ -482,6 +436,37 @@ func (b *BinlogSyncer) replySemiSyncACK(p Position) error { return errors.Trace(err) } +func (b *BinlogSyncer) retrySync() error { + b.m.Lock() + defer b.m.Unlock() + + log.Infof("begin to re-sync from %s", b.nextPos) + + b.parser.Reset() + if err := b.prepareSyncPos(b.nextPos); err != nil { + return errors.Trace(err) + } + + return nil +} + +func (b *BinlogSyncer) prepareSyncPos(pos Position) error { + // always start from position 4 + if pos.Pos < 4 { + pos.Pos = 4 + } + + if err := b.prepare(); err != nil { + return errors.Trace(err) + } + + if err := b.writeBinglogDumpCommand(pos); err != nil { + return errors.Trace(err) + } + + return nil +} + func (b *BinlogSyncer) onStream(s *BinlogStreamer) { defer func() { if e := recover(); e != nil { @@ -493,8 +478,34 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { for { data, err := b.c.ReadPacket() if err != nil { - s.closeWithError(err) - return + log.Error(err) + + // we meet connection error, should re-connect again with + // last nextPos we got. + if len(b.nextPos.Name) == 0 { + // we can't get the correct position, close. + s.closeWithError(err) + return + } + + // TODO: add a max retry count. + for { + select { + case <-b.ctx.Done(): + s.close() + return + case <-time.After(time.Second): + if err = b.retrySync(); err != nil { + log.Errorf("retry sync err: %v, wait 1s and retry again", err) + continue + } + } + + break + } + + // we connect the server and begin to re-sync again. + continue } switch data[0] { @@ -519,7 +530,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { data = data[1:] needACK := false - if b.semiSyncEnabled && (data[0] == SemiSyncIndicator) { + if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { needACK = (data[1] == 0x01) //skip semi sync header data = data[2:] @@ -530,17 +541,21 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { return errors.Trace(err) } - b.nextPos.Pos = e.Header.LogPos + if e.Header.LogPos > 0 { + // Some events like FormatDescriptionEvent return 0, ignore. + b.nextPos.Pos = e.Header.LogPos + } if re, ok := e.Event.(*RotateEvent); ok { b.nextPos.Name = string(re.NextLogName) b.nextPos.Pos = uint32(re.Position) + log.Infof("rotate to %s", b.nextPos) } needStop := false select { case s.ch <- e: - case <-b.stopCh: + case <-b.ctx.Done(): needStop = true } diff --git a/vendor/github.com/siddontang/go-mysql/replication/bitmap.go b/vendor/github.com/siddontang/go-mysql/replication/bitmap.go deleted file mode 100644 index 5de993891..000000000 --- a/vendor/github.com/siddontang/go-mysql/replication/bitmap.go +++ /dev/null @@ -1,38 +0,0 @@ -package replication - -var bitCountInByte = [256]uint8{ - 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, - 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, - 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, - 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, - 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, - 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, - 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, - 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8, -} - -func bitCount(bitmap []byte) int { - var n uint32 = 0 - - for _, b := range bitmap { - n += uint32(bitCountInByte[b]) - } - - return int(n) -} - -func bitGet(bitmap []byte, i int) byte { - return bitmap[i/8] & (1 << (uint(i) & 7)) -} - -func isNullSet(nullBitmap []byte, i int) bool { - return nullBitmap[i/8]&(1<<(uint(i)%8)) > 0 -} diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 4610ab9c2..cfc97e473 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -26,6 +26,10 @@ func NewBinlogParser() *BinlogParser { return p } +func (p *BinlogParser) Reset() { + p.format = nil +} + type OnEventFunc func(*BinlogEvent) error func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error { @@ -50,12 +54,11 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) return errors.Errorf("seek %s to %d error %v", name, offset, err) } - return p.ParseReader(f, onEvent) + return p.parseReader(f, onEvent) } -func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error { - p.tables = make(map[uint64]*TableMapEvent) - p.format = nil +func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error { + p.Reset() var err error var n int64 @@ -193,12 +196,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) { p.tables[te.TableID] = te } - //If MySQL restart, it may use the same table id for different tables. - //We must clear the table map before parsing new events. - //We have no better way to known whether the event is before or after restart, - //So we have to clear the table map on every rotate event. - if _, ok := e.(*RotateEvent); ok { - p.tables = make(map[uint64]*TableMapEvent) + if re, ok := e.(*RowsEvent); ok { + if (re.Flags & RowsEventStmtEndFlag) > 0 { + // Refer https://github.com/alibaba/canal/blob/38cc81b7dab29b51371096fb6763ca3a8432ffee/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogEvent.java#L176 + p.tables = make(map[uint64]*TableMapEvent) + } } return e, nil diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser_test.go b/vendor/github.com/siddontang/go-mysql/replication/parser_test.go index ba1c96e87..52f5f1fc4 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser_test.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser_test.go @@ -1,7 +1,7 @@ package replication import ( - . "gopkg.in/check.v1" + . "github.com/pingcap/check" ) func (t *testSyncerSuite) TestIndexOutOfRange(c *C) { diff --git a/vendor/github.com/siddontang/go-mysql/replication/replication_test.go b/vendor/github.com/siddontang/go-mysql/replication/replication_test.go index 4061089f4..e9f3e37ef 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/replication_test.go +++ b/vendor/github.com/siddontang/go-mysql/replication/replication_test.go @@ -8,15 +8,18 @@ import ( "testing" "time" + "golang.org/x/net/context" + + . "github.com/pingcap/check" + uuid "github.com/satori/go.uuid" "github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/mysql" - . "gopkg.in/check.v1" ) // Use docker mysql to test, mysql is 3306, mariadb is 3316 var testHost = flag.String("host", "127.0.0.1", "MySQL master host") -var testOutputLogs = flag.Bool("out", true, "output binlog event") +var testOutputLogs = flag.Bool("out", false, "output binlog event") func TestBinLogSyncer(t *testing.T) { TestingT(t) @@ -69,15 +72,19 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) { return } + eventCount := 0 for { - e, err := s.GetEventTimeout(2 * time.Second) - if err != nil { - if err != ErrGetEventTimeout { - c.Fatal(err) - } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + e, err := s.GetEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + eventCount += 1 return } + c.Assert(err, IsNil) + if *testOutputLogs { e.Dump(os.Stdout) os.Stdout.Sync() @@ -164,10 +171,16 @@ func (t *testSyncerSuite) setupTest(c *C, flavor string) { t.b.Close() } - t.b = NewBinlogSyncer(100, flavor) + cfg := BinlogSyncerConfig{ + ServerID: 100, + Flavor: flavor, + Host: *testHost, + Port: port, + User: "root", + Password: "", + } - err = t.b.RegisterSlave(*testHost, port, "root", "") - c.Assert(err, IsNil) + t.b = NewBinlogSyncer(&cfg) } func (t *testSyncerSuite) testPositionSync(c *C) { @@ -180,6 +193,11 @@ func (t *testSyncerSuite) testPositionSync(c *C) { s, err := t.b.StartSync(mysql.Position{binFile, uint32(binPos)}) c.Assert(err, IsNil) + // Test re-sync. + time.Sleep(100 * time.Millisecond) + t.b.c.SetReadDeadline(time.Now().Add(time.Millisecond)) + time.Sleep(100 * time.Millisecond) + t.testSync(c, s) } @@ -198,9 +216,15 @@ func (t *testSyncerSuite) TestMysqlGTIDSync(c *C) { c.Skip("GTID mode is not ON") } - masterUuid, err := t.b.GetMasterUUID() + r, err = t.c.Execute("SHOW GLOBAL VARIABLES LIKE 'SERVER_UUID'") c.Assert(err, IsNil) + var masterUuid uuid.UUID + if s, _ := r.GetString(0, 1); len(s) > 0 && s != "NONE" { + masterUuid, err = uuid.FromString(s) + c.Assert(err, IsNil) + } + set, _ := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d-%d", masterUuid.String(), 1, 2)) s, err := t.b.StartSyncGTID(set) @@ -234,10 +258,7 @@ func (t *testSyncerSuite) TestMariadbGTIDSync(c *C) { func (t *testSyncerSuite) TestMysqlSemiPositionSync(c *C) { t.setupTest(c, mysql.MySQLFlavor) - err := t.b.EnableSemiSync() - if err != nil { - c.Skip(fmt.Sprintf("mysql doest not support semi synchronous replication %v", err)) - } + t.b.cfg.SemiSyncEnabled = true t.testPositionSync(c) } @@ -249,6 +270,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec(c *C) { var wg sync.WaitGroup wg.Add(1) + defer wg.Wait() go func() { defer wg.Done() @@ -263,7 +285,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec(c *C) { os.RemoveAll("./var") err := t.b.StartBackup("./var", mysql.Position{"", uint32(0)}, 2*time.Second) - c.Check(err, Equals, ErrGetEventTimeout) + c.Assert(err, IsNil) p := NewBinlogParser() diff --git a/vendor/github.com/siddontang/go-mysql/replication/row_event.go b/vendor/github.com/siddontang/go-mysql/replication/row_event.go index 9506b1e95..f565832b6 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/row_event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/row_event.go @@ -10,6 +10,7 @@ import ( "time" "github.com/juju/errors" + "github.com/ngaut/log" . "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/hack" ) @@ -180,6 +181,7 @@ func (e *TableMapEvent) decodeMeta(data []byte) error { func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintf(w, "TableID: %d\n", e.TableID) + fmt.Fprintf(w, "TableID size: %d\n", e.tableIDSize) fmt.Fprintf(w, "Flags: %d\n", e.Flags) fmt.Fprintf(w, "Schema: %s\n", e.Schema) fmt.Fprintf(w, "Table: %s\n", e.Table) @@ -189,6 +191,9 @@ func (e *TableMapEvent) Dump(w io.Writer) { fmt.Fprintln(w) } +// RowsEventStmtEndFlag is set in the end of the statement. +const RowsEventStmtEndFlag = 0x01 + type RowsEvent struct { //0, 1, 2 Version int @@ -257,13 +262,13 @@ func (e *RowsEvent) Decode(data []byte) error { var err error // ... repeat rows until event-end - // Why using pos < len(data) - 1 instead of origin pos < len(data) to check? - // See mysql - // https://github.com/mysql/mysql-server/blob/5.7/sql/log_event.cc#L9006 - // https://github.com/mysql/mysql-server/blob/5.7/sql/log_event.cc#L2492 - // A user panics here but he can't give me more information, and using len(data) - 1 - // fixes this panic, so I will try to construct a test case for this later. - for pos < len(data)-1 { + defer func() { + if r := recover(); r != nil { + log.Fatalf("parse rows event panic %v, data %q, parsed rows %#v, table map %#v", r, data, e, e.Table) + } + }() + + for pos < len(data) { if n, err = e.decodeRows(data[pos:], e.Table, e.ColumnBitmap1); err != nil { return errors.Trace(err) } @@ -280,12 +285,23 @@ func (e *RowsEvent) Decode(data []byte) error { return nil } +func isBitSet(bitmap []byte, i int) bool { + return bitmap[i>>3]&(1<<(uint(i)&7)) > 0 +} + func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) (int, error) { row := make([]interface{}, e.ColumnCount) pos := 0 - count := (bitCount(bitmap) + 7) / 8 + // refer: https://github.com/alibaba/canal/blob/c3e38e50e269adafdd38a48c63a1740cde304c67/dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/RowsLogBuffer.java#L63 + count := 0 + for i := 0; i < int(e.ColumnCount); i++ { + if isBitSet(bitmap, i) { + count++ + } + } + count = (count + 7) / 8 nullBitmap := data[pos : pos+count] pos += count @@ -295,15 +311,15 @@ func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) var n int var err error for i := 0; i < int(e.ColumnCount); i++ { - if bitGet(bitmap, i) == 0 { + if !isBitSet(bitmap, i) { continue } isNull := (uint32(nullBitmap[nullbitIndex/8]) >> uint32(nullbitIndex%8)) & 0x01 + nullbitIndex++ if isNull > 0 { row[i] = nil - nullbitIndex++ continue } @@ -313,8 +329,6 @@ func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte) return 0, nil } pos += n - - nullbitIndex++ } e.Rows = append(e.Rows, row) @@ -436,10 +450,10 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{ err = fmt.Errorf("Unknown ENUM packlen=%d", l) } case MYSQL_TYPE_SET: - nbits := meta & 0xFF - n = int(nbits+7) / 8 + n = int(meta & 0xFF) + nbits := n * 8 - v, err = decodeBit(data, int(nbits), n) + v, err = decodeBit(data, nbits, n) case MYSQL_TYPE_BLOB: switch meta { case 1: @@ -615,7 +629,7 @@ func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) { const DATETIMEF_INT_OFS int64 = 0x8000000000 -func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { +func decodeDatetime2(data []byte, dec uint16) (string, int, error) { //get datetime binary length n := int(5 + (dec+1)/2) @@ -657,7 +671,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { minute := int((hms >> 6) % (1 << 6)) hour := int((hms >> 12)) - return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame` + return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil } const TIMEF_OFS int64 = 0x800000000000 diff --git a/vendor/github.com/siddontang/go-mysql/replication/row_event_test.go b/vendor/github.com/siddontang/go-mysql/replication/row_event_test.go index 539bab680..5d02c87fe 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/row_event_test.go +++ b/vendor/github.com/siddontang/go-mysql/replication/row_event_test.go @@ -3,7 +3,7 @@ package replication import ( "fmt" - . "gopkg.in/check.v1" + . "github.com/pingcap/check" ) type testDecodeSuite struct{} @@ -326,3 +326,62 @@ func (_ *testDecodeSuite) TestDecodeDecimal(c *C) { c.Assert(value, DecodeDecimalsEquals, pos, err, tc.Expected, tc.ExpectedPos, tc.ExpectedErr, i) } } + +func (_ *testDecodeSuite) TestLastNull(c *C) { + // Table format: + // desc funnytable; + // +-------+------------+------+-----+---------+-------+ + // | Field | Type | Null | Key | Default | Extra | + // +-------+------------+------+-----+---------+-------+ + // | value | tinyint(4) | YES | | NULL | | + // +-------+------------+------+-----+---------+-------+ + + // insert into funnytable values (1), (2), (null); + // insert into funnytable values (1), (null), (2); + // all must get 3 rows + + tableMapEventData := []byte("\xd3\x01\x00\x00\x00\x00\x01\x00\x04test\x00\nfunnytable\x00\x01\x01\x00\x01") + + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + err := tableMapEvent.Decode(tableMapEventData) + c.Assert(err, IsNil) + + rows := new(RowsEvent) + rows.tableIDSize = 6 + rows.tables = make(map[uint64]*TableMapEvent) + rows.tables[tableMapEvent.TableID] = tableMapEvent + rows.Version = 2 + + tbls := [][]byte{ + []byte("\xd3\x01\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xfe\x01\xff\xfe\x02"), + []byte("\xd3\x01\x00\x00\x00\x00\x01\x00\x02\x00\x01\xff\xfe\x01\xfe\x02\xff"), + } + + for _, tbl := range tbls { + rows.Rows = nil + err = rows.Decode(tbl) + c.Assert(err, IsNil) + c.Assert(rows.Rows, HasLen, 3) + } +} + +func (_ *testDecodeSuite) TestParseRowPanic(c *C) { + tableMapEvent := new(TableMapEvent) + tableMapEvent.tableIDSize = 6 + tableMapEvent.TableID = 1810 + tableMapEvent.ColumnType = []byte{3, 15, 15, 15, 9, 15, 15, 252, 3, 3, 3, 15, 3, 3, 3, 15, 3, 15, 1, 15, 3, 1, 252, 15, 15, 15} + tableMapEvent.ColumnMeta = []uint16{0, 108, 60, 765, 0, 765, 765, 4, 0, 0, 0, 765, 0, 0, 0, 3, 0, 3, 0, 765, 0, 0, 2, 108, 108, 108} + + rows := new(RowsEvent) + rows.tableIDSize = 6 + rows.tables = make(map[uint64]*TableMapEvent) + rows.tables[tableMapEvent.TableID] = tableMapEvent + rows.Version = 2 + + data := []byte{18, 7, 0, 0, 0, 0, 1, 0, 2, 0, 26, 1, 1, 16, 252, 248, 142, 63, 0, 0, 13, 0, 0, 0, 13, 0, 0, 0} + + err := rows.Decode(data) + c.Assert(err, IsNil) + c.Assert(rows.Rows[0][0], Equals, int32(16270)) +} diff --git a/vendor/golang.org/x/net b/vendor/golang.org/x/net new file mode 160000 index 000000000..644ffc062 --- /dev/null +++ b/vendor/golang.org/x/net @@ -0,0 +1 @@ +Subproject commit 644ffc06205c045a7d696b9a212191615d30b77b From a02476f80c27355f68f64e4bc6d8267f4a02dafb Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 24 Oct 2016 13:59:05 +0200 Subject: [PATCH 2/3] release version externalized --- RELEASE_VERSION | 0 build.sh | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 RELEASE_VERSION diff --git a/RELEASE_VERSION b/RELEASE_VERSION new file mode 100644 index 000000000..e69de29bb diff --git a/build.sh b/build.sh index c12b6a1e0..3e1ce6f03 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.26" +RELEASE_VERSION=$(cat RELEASE_VERSION) function build { osname=$1 From 7ea571665af5f317c9dc456eccfd30661a684b2d Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 27 Oct 2016 15:41:06 +0200 Subject: [PATCH 3/3] adding vendor/golang.org/x/net --- vendor/golang.org/x/net | 1 - vendor/golang.org/x/net/LICENSE | 27 ++ vendor/golang.org/x/net/PATENTS | 22 + vendor/golang.org/x/net/context/context.go | 447 +++++++++++++++++++++ 4 files changed, 496 insertions(+), 1 deletion(-) delete mode 160000 vendor/golang.org/x/net create mode 100644 vendor/golang.org/x/net/LICENSE create mode 100644 vendor/golang.org/x/net/PATENTS create mode 100644 vendor/golang.org/x/net/context/context.go diff --git a/vendor/golang.org/x/net b/vendor/golang.org/x/net deleted file mode 160000 index 644ffc062..000000000 --- a/vendor/golang.org/x/net +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 644ffc06205c045a7d696b9a212191615d30b77b diff --git a/vendor/golang.org/x/net/LICENSE b/vendor/golang.org/x/net/LICENSE new file mode 100644 index 000000000..6a66aea5e --- /dev/null +++ b/vendor/golang.org/x/net/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/net/PATENTS b/vendor/golang.org/x/net/PATENTS new file mode 100644 index 000000000..733099041 --- /dev/null +++ b/vendor/golang.org/x/net/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/net/context/context.go b/vendor/golang.org/x/net/context/context.go new file mode 100644 index 000000000..77b64d0c6 --- /dev/null +++ b/vendor/golang.org/x/net/context/context.go @@ -0,0 +1,447 @@ +// Copyright 2014 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package context defines the Context type, which carries deadlines, +// cancelation signals, and other request-scoped values across API boundaries +// and between processes. +// +// Incoming requests to a server should create a Context, and outgoing calls to +// servers should accept a Context. The chain of function calls between must +// propagate the Context, optionally replacing it with a modified copy created +// using WithDeadline, WithTimeout, WithCancel, or WithValue. +// +// Programs that use Contexts should follow these rules to keep interfaces +// consistent across packages and enable static analysis tools to check context +// propagation: +// +// Do not store Contexts inside a struct type; instead, pass a Context +// explicitly to each function that needs it. The Context should be the first +// parameter, typically named ctx: +// +// func DoSomething(ctx context.Context, arg Arg) error { +// // ... use ctx ... +// } +// +// Do not pass a nil Context, even if a function permits it. Pass context.TODO +// if you are unsure about which Context to use. +// +// Use context Values only for request-scoped data that transits processes and +// APIs, not for passing optional parameters to functions. +// +// The same Context may be passed to functions running in different goroutines; +// Contexts are safe for simultaneous use by multiple goroutines. +// +// See http://blog.golang.org/context for example code for a server that uses +// Contexts. +package context // import "golang.org/x/net/context" + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// A Context carries a deadline, a cancelation signal, and other values across +// API boundaries. +// +// Context's methods may be called by multiple goroutines simultaneously. +type Context interface { + // Deadline returns the time when work done on behalf of this context + // should be canceled. Deadline returns ok==false when no deadline is + // set. Successive calls to Deadline return the same results. + Deadline() (deadline time.Time, ok bool) + + // Done returns a channel that's closed when work done on behalf of this + // context should be canceled. Done may return nil if this context can + // never be canceled. Successive calls to Done return the same value. + // + // WithCancel arranges for Done to be closed when cancel is called; + // WithDeadline arranges for Done to be closed when the deadline + // expires; WithTimeout arranges for Done to be closed when the timeout + // elapses. + // + // Done is provided for use in select statements: + // + // // Stream generates values with DoSomething and sends them to out + // // until DoSomething returns an error or ctx.Done is closed. + // func Stream(ctx context.Context, out <-chan Value) error { + // for { + // v, err := DoSomething(ctx) + // if err != nil { + // return err + // } + // select { + // case <-ctx.Done(): + // return ctx.Err() + // case out <- v: + // } + // } + // } + // + // See http://blog.golang.org/pipelines for more examples of how to use + // a Done channel for cancelation. + Done() <-chan struct{} + + // Err returns a non-nil error value after Done is closed. Err returns + // Canceled if the context was canceled or DeadlineExceeded if the + // context's deadline passed. No other values for Err are defined. + // After Done is closed, successive calls to Err return the same value. + Err() error + + // Value returns the value associated with this context for key, or nil + // if no value is associated with key. Successive calls to Value with + // the same key returns the same result. + // + // Use context values only for request-scoped data that transits + // processes and API boundaries, not for passing optional parameters to + // functions. + // + // A key identifies a specific value in a Context. Functions that wish + // to store values in Context typically allocate a key in a global + // variable then use that key as the argument to context.WithValue and + // Context.Value. A key can be any type that supports equality; + // packages should define keys as an unexported type to avoid + // collisions. + // + // Packages that define a Context key should provide type-safe accessors + // for the values stores using that key: + // + // // Package user defines a User type that's stored in Contexts. + // package user + // + // import "golang.org/x/net/context" + // + // // User is the type of value stored in the Contexts. + // type User struct {...} + // + // // key is an unexported type for keys defined in this package. + // // This prevents collisions with keys defined in other packages. + // type key int + // + // // userKey is the key for user.User values in Contexts. It is + // // unexported; clients use user.NewContext and user.FromContext + // // instead of using this key directly. + // var userKey key = 0 + // + // // NewContext returns a new Context that carries value u. + // func NewContext(ctx context.Context, u *User) context.Context { + // return context.WithValue(ctx, userKey, u) + // } + // + // // FromContext returns the User value stored in ctx, if any. + // func FromContext(ctx context.Context) (*User, bool) { + // u, ok := ctx.Value(userKey).(*User) + // return u, ok + // } + Value(key interface{}) interface{} +} + +// Canceled is the error returned by Context.Err when the context is canceled. +var Canceled = errors.New("context canceled") + +// DeadlineExceeded is the error returned by Context.Err when the context's +// deadline passes. +var DeadlineExceeded = errors.New("context deadline exceeded") + +// An emptyCtx is never canceled, has no values, and has no deadline. It is not +// struct{}, since vars of this type must have distinct addresses. +type emptyCtx int + +func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { + return +} + +func (*emptyCtx) Done() <-chan struct{} { + return nil +} + +func (*emptyCtx) Err() error { + return nil +} + +func (*emptyCtx) Value(key interface{}) interface{} { + return nil +} + +func (e *emptyCtx) String() string { + switch e { + case background: + return "context.Background" + case todo: + return "context.TODO" + } + return "unknown empty Context" +} + +var ( + background = new(emptyCtx) + todo = new(emptyCtx) +) + +// Background returns a non-nil, empty Context. It is never canceled, has no +// values, and has no deadline. It is typically used by the main function, +// initialization, and tests, and as the top-level Context for incoming +// requests. +func Background() Context { + return background +} + +// TODO returns a non-nil, empty Context. Code should use context.TODO when +// it's unclear which Context to use or it is not yet available (because the +// surrounding function has not yet been extended to accept a Context +// parameter). TODO is recognized by static analysis tools that determine +// whether Contexts are propagated correctly in a program. +func TODO() Context { + return todo +} + +// A CancelFunc tells an operation to abandon its work. +// A CancelFunc does not wait for the work to stop. +// After the first call, subsequent calls to a CancelFunc do nothing. +type CancelFunc func() + +// WithCancel returns a copy of parent with a new Done channel. The returned +// context's Done channel is closed when the returned cancel function is called +// or when the parent context's Done channel is closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { + c := newCancelCtx(parent) + propagateCancel(parent, &c) + return &c, func() { c.cancel(true, Canceled) } +} + +// newCancelCtx returns an initialized cancelCtx. +func newCancelCtx(parent Context) cancelCtx { + return cancelCtx{ + Context: parent, + done: make(chan struct{}), + } +} + +// propagateCancel arranges for child to be canceled when parent is. +func propagateCancel(parent Context, child canceler) { + if parent.Done() == nil { + return // parent is never canceled + } + if p, ok := parentCancelCtx(parent); ok { + p.mu.Lock() + if p.err != nil { + // parent has already been canceled + child.cancel(false, p.err) + } else { + if p.children == nil { + p.children = make(map[canceler]bool) + } + p.children[child] = true + } + p.mu.Unlock() + } else { + go func() { + select { + case <-parent.Done(): + child.cancel(false, parent.Err()) + case <-child.Done(): + } + }() + } +} + +// parentCancelCtx follows a chain of parent references until it finds a +// *cancelCtx. This function understands how each of the concrete types in this +// package represents its parent. +func parentCancelCtx(parent Context) (*cancelCtx, bool) { + for { + switch c := parent.(type) { + case *cancelCtx: + return c, true + case *timerCtx: + return &c.cancelCtx, true + case *valueCtx: + parent = c.Context + default: + return nil, false + } + } +} + +// removeChild removes a context from its parent. +func removeChild(parent Context, child canceler) { + p, ok := parentCancelCtx(parent) + if !ok { + return + } + p.mu.Lock() + if p.children != nil { + delete(p.children, child) + } + p.mu.Unlock() +} + +// A canceler is a context type that can be canceled directly. The +// implementations are *cancelCtx and *timerCtx. +type canceler interface { + cancel(removeFromParent bool, err error) + Done() <-chan struct{} +} + +// A cancelCtx can be canceled. When canceled, it also cancels any children +// that implement canceler. +type cancelCtx struct { + Context + + done chan struct{} // closed by the first cancel call. + + mu sync.Mutex + children map[canceler]bool // set to nil by the first cancel call + err error // set to non-nil by the first cancel call +} + +func (c *cancelCtx) Done() <-chan struct{} { + return c.done +} + +func (c *cancelCtx) Err() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.err +} + +func (c *cancelCtx) String() string { + return fmt.Sprintf("%v.WithCancel", c.Context) +} + +// cancel closes c.done, cancels each of c's children, and, if +// removeFromParent is true, removes c from its parent's children. +func (c *cancelCtx) cancel(removeFromParent bool, err error) { + if err == nil { + panic("context: internal error: missing cancel error") + } + c.mu.Lock() + if c.err != nil { + c.mu.Unlock() + return // already canceled + } + c.err = err + close(c.done) + for child := range c.children { + // NOTE: acquiring the child's lock while holding parent's lock. + child.cancel(false, err) + } + c.children = nil + c.mu.Unlock() + + if removeFromParent { + removeChild(c.Context, c) + } +} + +// WithDeadline returns a copy of the parent context with the deadline adjusted +// to be no later than d. If the parent's deadline is already earlier than d, +// WithDeadline(parent, d) is semantically equivalent to parent. The returned +// context's Done channel is closed when the deadline expires, when the returned +// cancel function is called, or when the parent context's Done channel is +// closed, whichever happens first. +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete. +func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) { + if cur, ok := parent.Deadline(); ok && cur.Before(deadline) { + // The current deadline is already sooner than the new one. + return WithCancel(parent) + } + c := &timerCtx{ + cancelCtx: newCancelCtx(parent), + deadline: deadline, + } + propagateCancel(parent, c) + d := deadline.Sub(time.Now()) + if d <= 0 { + c.cancel(true, DeadlineExceeded) // deadline has already passed + return c, func() { c.cancel(true, Canceled) } + } + c.mu.Lock() + defer c.mu.Unlock() + if c.err == nil { + c.timer = time.AfterFunc(d, func() { + c.cancel(true, DeadlineExceeded) + }) + } + return c, func() { c.cancel(true, Canceled) } +} + +// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to +// implement Done and Err. It implements cancel by stopping its timer then +// delegating to cancelCtx.cancel. +type timerCtx struct { + cancelCtx + timer *time.Timer // Under cancelCtx.mu. + + deadline time.Time +} + +func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { + return c.deadline, true +} + +func (c *timerCtx) String() string { + return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now())) +} + +func (c *timerCtx) cancel(removeFromParent bool, err error) { + c.cancelCtx.cancel(false, err) + if removeFromParent { + // Remove this timerCtx from its parent cancelCtx's children. + removeChild(c.cancelCtx.Context, c) + } + c.mu.Lock() + if c.timer != nil { + c.timer.Stop() + c.timer = nil + } + c.mu.Unlock() +} + +// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)). +// +// Canceling this context releases resources associated with it, so code should +// call cancel as soon as the operations running in this Context complete: +// +// func slowOperationWithTimeout(ctx context.Context) (Result, error) { +// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) +// defer cancel() // releases resources if slowOperation completes before timeout elapses +// return slowOperation(ctx) +// } +func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { + return WithDeadline(parent, time.Now().Add(timeout)) +} + +// WithValue returns a copy of parent in which the value associated with key is +// val. +// +// Use context Values only for request-scoped data that transits processes and +// APIs, not for passing optional parameters to functions. +func WithValue(parent Context, key interface{}, val interface{}) Context { + return &valueCtx{parent, key, val} +} + +// A valueCtx carries a key-value pair. It implements Value for that key and +// delegates all other calls to the embedded Context. +type valueCtx struct { + Context + key, val interface{} +} + +func (c *valueCtx) String() string { + return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val) +} + +func (c *valueCtx) Value(key interface{}) interface{} { + if c.key == key { + return c.val + } + return c.Context.Value(key) +}