From 4fc166d0861030038345828798def97402debcc1 Mon Sep 17 00:00:00 2001 From: Lefteris Zafiris Date: Thu, 25 Sep 2025 20:31:17 +0300 Subject: [PATCH 1/2] Protocol compression updates switched to a faster zlib implementation added support for user defined compression level --- README.md | 8 +++++++ compress.go | 63 +++++++++++++++++++---------------------------------- dsn.go | 23 ++++++++++++++++--- dsn_test.go | 6 +++++ go.mod | 2 ++ go.sum | 2 ++ 6 files changed, 61 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 1d07bde1c..c1ea02856 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,14 @@ Default: false Toggles zlib compression. false by default. +##### `compressLevel` + +``` +Type: decimal number +Valid Values: 1-9 +Default: 2 +``` + ##### `interpolateParams` ``` diff --git a/compress.go b/compress.go index 38bfa000e..13aa80657 100644 --- a/compress.go +++ b/compress.go @@ -10,73 +10,56 @@ package mysql import ( "bytes" - "compress/zlib" "fmt" "io" - "sync" -) -var ( - zrPool *sync.Pool // Do not use directly. Use zDecompress() instead. - zwPool *sync.Pool // Do not use directly. Use zCompress() instead. + "github.com/klauspost/compress/zlib" ) -func init() { - zrPool = &sync.Pool{ - New: func() any { return nil }, - } - zwPool = &sync.Pool{ - New: func() any { - zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2) - if err != nil { - panic(err) // compress/zlib return non-nil error only if level is invalid - } - return zw - }, - } -} - -func zDecompress(src []byte, dst *bytes.Buffer) (int, error) { +func (c *compIO) zDecompress(src []byte) (int, error) { br := bytes.NewReader(src) - var zr io.ReadCloser var err error - - if a := zrPool.Get(); a == nil { - if zr, err = zlib.NewReader(br); err != nil { + if c.zr == nil { + c.zr, err = zlib.NewReader(br) + if err != nil { return 0, err } } else { - zr = a.(io.ReadCloser) - if err := zr.(zlib.Resetter).Reset(br, nil); err != nil { + err = c.zr.(zlib.Resetter).Reset(br, nil) + if err != nil { return 0, err } } - - n, _ := dst.ReadFrom(zr) // ignore err because zr.Close() will return it again. - err = zr.Close() // zr.Close() may return chuecksum error. - zrPool.Put(zr) + n, _ := c.buff.ReadFrom(c.zr) // ignore err because zr.Close() will return it again. + err = c.zr.Close() // zr.Close() may return chuecksum error. return int(n), err } -func zCompress(src []byte, dst io.Writer) error { - zw := zwPool.Get().(*zlib.Writer) - zw.Reset(dst) - if _, err := zw.Write(src); err != nil { +func (c *compIO) zCompress(src []byte) error { + c.zw.Reset(&c.buff) + if _, err := c.zw.Write(src); err != nil { return err } - err := zw.Close() - zwPool.Put(zw) + err := c.zw.Close() return err } type compIO struct { mc *mysqlConn buff bytes.Buffer + zw *zlib.Writer + zr io.ReadCloser } func newCompIO(mc *mysqlConn) *compIO { + w, err := zlib.NewWriterLevel(new(bytes.Buffer), mc.cfg.compressLevel) + if err != nil { + panic(err) // compress/zlib return non-nil error only if level is invalid + } return &compIO{ mc: mc, + zw: w, + zr: nil, } } @@ -133,7 +116,7 @@ func (c *compIO) readCompressedPacket() error { // use existing capacity in bytesBuf if possible c.buff.Grow(uncompressedLength) - nread, err := zDecompress(comprData, &c.buff) + nread, err := c.zDecompress(comprData) if err != nil { return err } @@ -167,7 +150,7 @@ func (c *compIO) writePackets(packets []byte) (int, error) { buf.Write(payload) uncompressedLen = 0 } else { - err := zCompress(payload, buf) + err := c.zCompress(payload) if debug && err != nil { fmt.Printf("zCompress error: %v", err) } diff --git a/dsn.go b/dsn.go index 89556bfba..d881dd91e 100644 --- a/dsn.go +++ b/dsn.go @@ -24,6 +24,8 @@ import ( "time" ) +const defaultCompressionLevel = 2 + var ( errInvalidDSNUnescaped = errors.New("invalid DSN: did you forget to escape a param value?") errInvalidDSNAddr = errors.New("invalid DSN: network address not terminated (missing closing brace)") @@ -75,7 +77,8 @@ type Config struct { // unexported fields. new options should be come here. // boolean first. alphabetical order. - compress bool // Enable zlib compression + compress bool // Enable zlib compression + compressLevel int // Compression level beforeConnect func(context.Context, *Config) error // Invoked before a connection is established pubKey *rsa.PublicKey // Server public key @@ -95,6 +98,7 @@ func NewConfig() *Config { Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true, + compressLevel: defaultCompressionLevel, } return cfg } @@ -127,10 +131,14 @@ func BeforeConnect(fn func(context.Context, *Config) error) Option { } } -// EnableCompress sets the compression mode. -func EnableCompression(yes bool) Option { +// EnableCompress sets the compression mode and level. +func EnableCompression(yes bool, level int) Option { return func(cfg *Config) error { cfg.compress = yes + cfg.compressLevel = defaultCompressionLevel + if level > 0 { + cfg.compressLevel = level + } return nil } } @@ -563,6 +571,15 @@ func parseDSNParams(cfg *Config, params string) (err error) { if !isBool { return errors.New("invalid bool value: " + value) } + // Compression level + case "compressLevel": + cfg.compressLevel, err = strconv.Atoi(value) + if err != nil { + return + } + if cfg.compressLevel < 0 || cfg.compressLevel > 9 { + return errors.New("invalid compress level: " + value) + } // Enable client side placeholder substitution case "interpolateParams": diff --git a/dsn_test.go b/dsn_test.go index 436f77992..c2b1acce1 100644 --- a/dsn_test.go +++ b/dsn_test.go @@ -71,6 +71,12 @@ var testDSNs = []struct { }, { "tcp(127.0.0.1)/dbname", &Config{Net: "tcp", Addr: "127.0.0.1:3306", DBName: "dbname", Loc: time.UTC, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true}, +}, { + "tcp(127.0.0.1)/dbname?compress=true,compressLevel=6", + &Config{Net: "tcp", Addr: "127.0.0.1:3306", DBName: "dbname", compress: true, compressLevel: 6, Loc: time.UTC, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true}, +}, { + "tcp(127.0.0.1)/dbname?compress=true", + &Config{Net: "tcp", Addr: "127.0.0.1:3306", DBName: "dbname", compress: true, compressLevel: defaultCompressionLevel, Loc: time.UTC, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true}, }, { "tcp(de:ad:be:ef::ca:fe)/dbname", &Config{Net: "tcp", Addr: "[de:ad:be:ef::ca:fe]:3306", DBName: "dbname", Loc: time.UTC, MaxAllowedPacket: defaultMaxAllowedPacket, Logger: defaultLogger, AllowNativePasswords: true, CheckConnLiveness: true}, diff --git a/go.mod b/go.mod index f17666dc8..5be5fd959 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module github.com/go-sql-driver/mysql go 1.22.0 require filippo.io/edwards25519 v1.1.0 + +require github.com/klauspost/compress v1.18.0 diff --git a/go.sum b/go.sum index 359ca94b4..5d07b39a5 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= From 767f52edc4c6f5620954ef6a55552b40eea66c16 Mon Sep 17 00:00:00 2001 From: Lefteris Zafiris Date: Thu, 25 Sep 2025 20:35:39 +0300 Subject: [PATCH 2/2] Updated AUTHORS --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 05e71df48..6a83b5cb5 100644 --- a/AUTHORS +++ b/AUTHORS @@ -82,6 +82,7 @@ Kevin Malachowski Kieron Woodhouse Lance Tian Lennart Rudolph +Lefteris Zafiris Leonardo YongUk Kim Linh Tran Tuan Lion Yang