diff --git a/command.go b/command.go index 1abadad..b21501f 100644 --- a/command.go +++ b/command.go @@ -135,8 +135,12 @@ func (cmd *Command) loadByteArgs() { } func (cmd *Command) appendArg(arg []byte) { - cmd.args[cmd.argn] = make([]byte, len(arg)) - copy(cmd.args[cmd.argn], arg) + if len(cmd.args) >= cmd.argn { + cmd.args = append(cmd.args, arg) + } else { + cmd.args[cmd.argn] = make([]byte, len(arg)) + copy(cmd.args[cmd.argn], arg) + } cmd.argn++ } @@ -149,6 +153,7 @@ type CommandReader struct { dec objconv.StreamDecoder multi bool done bool + retry bool err error } @@ -203,7 +208,6 @@ func (r *CommandReader) Read(cmd *Command) bool { r.done = !r.multi } - cmd.args = make([][]byte, r.dec.Len()) cmd.Args = newCmdArgsReader(r, cmd) return true } @@ -217,6 +221,10 @@ func (r *CommandReader) resetDecoder() { } func newCmdArgsReader(r *CommandReader, cmd *Command) *cmdArgsReader { + if r.retry { + cmd.args = make([][]byte, 0, r.dec.Len()) + } + args := &cmdArgsReader{r: r, cmd: cmd} args.b = args.a[:0] return args @@ -289,7 +297,9 @@ func (args *cmdArgsReader) Next(val interface{}) bool { return false } - args.cmd.appendArg(args.b[:]) + if args.r.retry { + args.cmd.appendArg(args.b[:]) + } } return true diff --git a/conn.go b/conn.go index 7f60365..f6ecb16 100644 --- a/conn.go +++ b/conn.go @@ -192,14 +192,15 @@ func (c *Conn) Flush() error { // The new CommandReader holds the connection's read lock, which is released // only when its Close method is called, so a program must make sure to call // that method or the connection will be left in an unusable state. -func (c *Conn) ReadCommands() *CommandReader { +func (c *Conn) ReadCommands(retry bool) *CommandReader { c.rmutex.Lock() c.resetDecoder() return &CommandReader{ - conn: c, - dec: c.decoder, + conn: c, + dec: c.decoder, + retry: retry, } } diff --git a/conn_test.go b/conn_test.go index 7d85300..e04f362 100644 --- a/conn_test.go +++ b/conn_test.go @@ -827,7 +827,7 @@ func writeCommands(t *testing.T, conn *redis.Conn, cmds ...redis.Command) { } func readCommands(t *testing.T, conn *redis.Conn, expectErr error, cmds ...redis.Command) { - r := conn.ReadCommands() + r := conn.ReadCommands(false) c := redis.Command{} i := 0 diff --git a/redistest/server.go b/redistest/server.go index d919a7a..d800193 100644 --- a/redistest/server.go +++ b/redistest/server.go @@ -128,11 +128,13 @@ func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (srv *redis } srv = &redis.Server{ - Handler: handler, - ReadTimeout: 3 * time.Second, - WriteTimeout: 5 * time.Second, - IdleTimeout: timeout, - ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND), + Handler: handler, + ReadTimeout: 3 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: timeout, + EnableRetry: true, + EnablePipeline: true, + ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND), } go func() { diff --git a/server.go b/server.go index eb92abf..27f6fa1 100644 --- a/server.go +++ b/server.go @@ -70,6 +70,10 @@ type Server struct { // Handler invoked to handle Redis requests, must not be nil. Handler Handler + // features of command retry and pipeline + EnableRetry bool + EnablePipeline bool + // ReadTimeout is the maximum duration for reading the entire request, // including the reading the argument list. ReadTimeout time.Duration @@ -205,6 +209,7 @@ func (s *Server) Serve(l net.Listener) error { idleTimeout: s.IdleTimeout, readTimeout: s.ReadTimeout, writeTimeout: s.WriteTimeout, + retryable: s.EnableRetry, } if config.idleTimeout == 0 { @@ -273,7 +278,7 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf } c.setTimeout(config.readTimeout) - cmdReader := c.ReadCommands() + cmdReader := c.ReadCommands(config.retryable) cmds := make([]Command, 0, 4) cmds = append(cmds, Command{}) @@ -367,7 +372,7 @@ func (s *Server) serveCommands(c *Conn, addr string, cmds []Command, config serv // is this a pipeline? reqErr := req.Close() - if err == nil && reqErr == nil { + if s.EnablePipeline && err == nil && reqErr == nil { pipeErr := s.servePipeline(c, addr, cmds, config) if pipeErr != ErrNotPipeline { err = pipeErr @@ -575,6 +580,7 @@ type serverConfig struct { idleTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration + retryable bool } func backoff(attempt int, minDelay time.Duration, maxDelay time.Duration) time.Duration {