Skip to content

Commit

Permalink
Merge pull request #17 from dolab/cmd-retry
Browse files Browse the repository at this point in the history
Cmd retry
  • Loading branch information
mcspring committed Jul 27, 2019
2 parents 917f71c + 43c4595 commit 46a7061
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 15 deletions.
18 changes: 14 additions & 4 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func (cmd *Command) loadByteArgs() {
}

func (cmd *Command) appendArg(arg []byte) {
cmd.args[cmd.argn] = make([]byte, len(arg))
copy(cmd.args[cmd.argn], arg)
if len(cmd.args) >= cmd.argn {
cmd.args = append(cmd.args, arg)
} else {
cmd.args[cmd.argn] = make([]byte, len(arg))
copy(cmd.args[cmd.argn], arg)
}

cmd.argn++
}
Expand All @@ -149,6 +153,7 @@ type CommandReader struct {
dec objconv.StreamDecoder
multi bool
done bool
retry bool
err error
}

Expand Down Expand Up @@ -203,7 +208,6 @@ func (r *CommandReader) Read(cmd *Command) bool {
r.done = !r.multi
}

cmd.args = make([][]byte, r.dec.Len())
cmd.Args = newCmdArgsReader(r, cmd)
return true
}
Expand All @@ -217,6 +221,10 @@ func (r *CommandReader) resetDecoder() {
}

func newCmdArgsReader(r *CommandReader, cmd *Command) *cmdArgsReader {
if r.retry {
cmd.args = make([][]byte, 0, r.dec.Len())
}

args := &cmdArgsReader{r: r, cmd: cmd}
args.b = args.a[:0]
return args
Expand Down Expand Up @@ -289,7 +297,9 @@ func (args *cmdArgsReader) Next(val interface{}) bool {
return false
}

args.cmd.appendArg(args.b[:])
if args.r.retry {
args.cmd.appendArg(args.b[:])
}
}

return true
Expand Down
7 changes: 4 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,15 @@ func (c *Conn) Flush() error {
// The new CommandReader holds the connection's read lock, which is released
// only when its Close method is called, so a program must make sure to call
// that method or the connection will be left in an unusable state.
func (c *Conn) ReadCommands() *CommandReader {
func (c *Conn) ReadCommands(retry bool) *CommandReader {
c.rmutex.Lock()

c.resetDecoder()

return &CommandReader{
conn: c,
dec: c.decoder,
conn: c,
dec: c.decoder,
retry: retry,
}
}

Expand Down
2 changes: 1 addition & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func writeCommands(t *testing.T, conn *redis.Conn, cmds ...redis.Command) {
}

func readCommands(t *testing.T, conn *redis.Conn, expectErr error, cmds ...redis.Command) {
r := conn.ReadCommands()
r := conn.ReadCommands(false)
c := redis.Command{}
i := 0

Expand Down
12 changes: 7 additions & 5 deletions redistest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@ func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (srv *redis
}

srv = &redis.Server{
Handler: handler,
ReadTimeout: 3 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: timeout,
ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND),
Handler: handler,
ReadTimeout: 3 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: timeout,
EnableRetry: true,
EnablePipeline: true,
ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND),
}

go func() {
Expand Down
10 changes: 8 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Server struct {
// Handler invoked to handle Redis requests, must not be nil.
Handler Handler

// features of command retry and pipeline
EnableRetry bool
EnablePipeline bool

// ReadTimeout is the maximum duration for reading the entire request,
// including the reading the argument list.
ReadTimeout time.Duration
Expand Down Expand Up @@ -205,6 +209,7 @@ func (s *Server) Serve(l net.Listener) error {
idleTimeout: s.IdleTimeout,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
retryable: s.EnableRetry,
}

if config.idleTimeout == 0 {
Expand Down Expand Up @@ -273,7 +278,7 @@ func (s *Server) serveConnection(ctx context.Context, c *Conn, config serverConf
}

c.setTimeout(config.readTimeout)
cmdReader := c.ReadCommands()
cmdReader := c.ReadCommands(config.retryable)

cmds := make([]Command, 0, 4)
cmds = append(cmds, Command{})
Expand Down Expand Up @@ -367,7 +372,7 @@ func (s *Server) serveCommands(c *Conn, addr string, cmds []Command, config serv

// is this a pipeline?
reqErr := req.Close()
if err == nil && reqErr == nil {
if s.EnablePipeline && err == nil && reqErr == nil {
pipeErr := s.servePipeline(c, addr, cmds, config)
if pipeErr != ErrNotPipeline {
err = pipeErr
Expand Down Expand Up @@ -575,6 +580,7 @@ type serverConfig struct {
idleTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
retryable bool
}

func backoff(attempt int, minDelay time.Duration, maxDelay time.Duration) time.Duration {
Expand Down

0 comments on commit 46a7061

Please sign in to comment.