From d2c80c940ed75114ac4823342b2b5bd2b281c286 Mon Sep 17 00:00:00 2001 From: Spring MC Date: Sat, 27 Jul 2019 11:23:48 +0800 Subject: [PATCH 1/2] What: configurable of command retryable Why: * can disable command retry when creating server How: * nop --- command.go | 18 ++++++++++++++---- conn.go | 7 ++++--- conn_test.go | 2 +- redistest/server.go | 1 + server.go | 6 +++++- 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/command.go b/command.go index 1abadad..b21501f 100644 --- a/command.go +++ b/command.go @@ -135,8 +135,12 @@ func (cmd *Command) loadByteArgs() { } func (cmd *Command) appendArg(arg []byte) { - cmd.args[cmd.argn] = make([]byte, len(arg)) - copy(cmd.args[cmd.argn], arg) + if len(cmd.args) >= cmd.argn { + cmd.args = append(cmd.args, arg) + } else { + cmd.args[cmd.argn] = make([]byte, len(arg)) + copy(cmd.args[cmd.argn], arg) + } cmd.argn++ } @@ -149,6 +153,7 @@ type CommandReader struct { dec objconv.StreamDecoder multi bool done bool + retry bool err error } @@ -203,7 +208,6 @@ func (r *CommandReader) Read(cmd *Command) bool { r.done = !r.multi } - cmd.args = make([][]byte, r.dec.Len()) cmd.Args = newCmdArgsReader(r, cmd) return true } @@ -217,6 +221,10 @@ func (r *CommandReader) resetDecoder() { } func newCmdArgsReader(r *CommandReader, cmd *Command) *cmdArgsReader { + if r.retry { + cmd.args = make([][]byte, 0, r.dec.Len()) + } + args := &cmdArgsReader{r: r, cmd: cmd} args.b = args.a[:0] return args @@ -289,7 +297,9 @@ func (args *cmdArgsReader) Next(val interface{}) bool { return false } - args.cmd.appendArg(args.b[:]) + if args.r.retry { + args.cmd.appendArg(args.b[:]) + } } return true diff --git a/conn.go b/conn.go index 7f60365..f6ecb16 100644 --- a/conn.go +++ b/conn.go @@ -192,14 +192,15 @@ func (c *Conn) Flush() error { // The new CommandReader holds the connection's read lock, which is released // only when its Close method is called, so a program must make sure to call // that method or the connection will be left in an unusable state. -func (c *Conn) ReadCommands() *CommandReader { +func (c *Conn) ReadCommands(retry bool) *CommandReader { c.rmutex.Lock() c.resetDecoder() return &CommandReader{ - conn: c, - dec: c.decoder, + conn: c, + dec: c.decoder, + retry: retry, } } diff --git a/conn_test.go b/conn_test.go index 7d85300..e04f362 100644 --- a/conn_test.go +++ b/conn_test.go @@ -827,7 +827,7 @@ func writeCommands(t *testing.T, conn *redis.Conn, cmds ...redis.Command) { } func readCommands(t *testing.T, conn *redis.Conn, expectErr error, cmds ...redis.Command) { - r := conn.ReadCommands() + r := conn.ReadCommands(false) c := redis.Command{} i := 0 diff --git a/redistest/server.go b/redistest/server.go index d919a7a..3d19563 100644 --- a/redistest/server.go +++ b/redistest/server.go @@ -132,6 +132,7 @@ func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (srv *redis ReadTimeout: 3 * time.Second, WriteTimeout: 5 * time.Second, IdleTimeout: timeout, + EnableRetry: true, ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND), } diff --git a/server.go b/server.go index eb92abf..38e38ae 100644 --- a/server.go +++ b/server.go @@ -70,6 +70,8 @@ type Server struct { // Handler invoked to handle Redis requests, must not be nil. Handler Handler + EnableRetry bool + // ReadTimeout is the maximum duration for reading the entire request, // including the reading the argument list. ReadTimeout time.Duration @@ -205,6 +207,7 @@ func (s *Server) Serve(l net.Listener) error { idleTimeout: s.IdleTimeout, readTimeout: s.ReadTimeout, writeTimeout: s.WriteTimeout, + retryable: s.EnableRetry, } if config.idleTimeout == 0 { @@ -273,7 +276,7 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf } c.setTimeout(config.readTimeout) - cmdReader := c.ReadCommands() + cmdReader := c.ReadCommands(config.retryable) cmds := make([]Command, 0, 4) cmds = append(cmds, Command{}) @@ -575,6 +578,7 @@ type serverConfig struct { idleTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration + retryable bool } func backoff(attempt int, minDelay time.Duration, maxDelay time.Duration) time.Duration { From 43c45953e464becabd9917385039635a202146e5 Mon Sep 17 00:00:00 2001 From: Spring MC Date: Sat, 27 Jul 2019 11:27:54 +0800 Subject: [PATCH 2/2] What: configurable of pipeline feature Why: * can disable pipeline support by config How: * nop --- redistest/server.go | 13 +++++++------ server.go | 6 ++++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/redistest/server.go b/redistest/server.go index 3d19563..d800193 100644 --- a/redistest/server.go +++ b/redistest/server.go @@ -128,12 +128,13 @@ func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (srv *redis } srv = &redis.Server{ - Handler: handler, - ReadTimeout: 3 * time.Second, - WriteTimeout: 5 * time.Second, - IdleTimeout: timeout, - EnableRetry: true, - ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND), + Handler: handler, + ReadTimeout: 3 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: timeout, + EnableRetry: true, + EnablePipeline: true, + ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND), } go func() { diff --git a/server.go b/server.go index 38e38ae..27f6fa1 100644 --- a/server.go +++ b/server.go @@ -70,7 +70,9 @@ type Server struct { // Handler invoked to handle Redis requests, must not be nil. Handler Handler - EnableRetry bool + // features of command retry and pipeline + EnableRetry bool + EnablePipeline bool // ReadTimeout is the maximum duration for reading the entire request, // including the reading the argument list. @@ -370,7 +372,7 @@ func (s *Server) serveCommands(c *Conn, addr string, cmds []Command, config serv // is this a pipeline? reqErr := req.Close() - if err == nil && reqErr == nil { + if s.EnablePipeline && err == nil && reqErr == nil { pipeErr := s.servePipeline(c, addr, cmds, config) if pipeErr != ErrNotPipeline { err = pipeErr