Skip to content

Commit

Permalink
What: refactor response with retry
Browse files Browse the repository at this point in the history
Why:

  * try to support redis cluster with smart client

How:

  * nop
  • Loading branch information
mcspring committed Jul 13, 2019
1 parent 13b58a9 commit e5fee5f
Show file tree
Hide file tree
Showing 15 changed files with 525 additions and 264 deletions.
22 changes: 15 additions & 7 deletions args.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ func MultiArgs(args ...Args) Args {

type multiArgs struct {
args []Args
argn int
err error
}

func (m *multiArgs) Close() (err error) {
for _, a := range m.args {
if e := a.Close(); e != nil && err == nil {
err = e
for _, arg := range m.args {
if cerr := arg.Close(); cerr != nil && err == nil {
err = cerr
}
}

Expand All @@ -112,16 +113,19 @@ func (m *multiArgs) Len() (n int) {
}

func (m *multiArgs) Next(dst interface{}) bool {
if len(m.args) == 0 || m.err != nil {
if m.argn >= len(m.args) || m.err != nil {
return false
}

for !m.args[0].Next(dst) {
if err := m.args[0].Close(); err != nil {
for !m.args[m.argn].Next(dst) {
if err := m.args[m.argn].Close(); err != nil {
m.err = err
return false
}
if m.args = m.args[1:]; len(m.args) == 0 {

m.argn++

if m.argn >= len(m.args) {
return false
}
}
Expand Down Expand Up @@ -317,6 +321,10 @@ func (args *byteArgs) Next(dst interface{}) (ok bool) {
}

func (args *byteArgs) next(v reflect.Value, a []byte) error {
for v.Kind() == reflect.Ptr {
v = v.Elem()
}

switch v.Kind() {
case reflect.Bool:
return args.parseBool(v, a)
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *Client) Query(ctx context.Context, cmd string, args ...interface{}) Arg

r, err := c.Do(&Request{
Addr: addr,
Cmds: []Command{{cmd, List(args...)}},
Cmds: []Command{{Cmd: cmd, Args: List(args...)}},
Context: ctx,
})
if err != nil {
Expand Down
64 changes: 47 additions & 17 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Command struct {
// For server request, Args is never nil, even if there are no values in
// the argument list.
Args Args

// for retry
args [][]byte
argn int
}

// ParseArgs parses the list of arguments from the command into the destination
Expand All @@ -30,6 +34,16 @@ func (cmd *Command) ParseArgs(dsts ...interface{}) error {
return ParseArgs(cmd.Args, dsts...)
}

// retry resets args of command for request reuse.
//
// NOTE: It CANNOT be exported cause it should ensure the command is idempotent, see Response.Retry() for details!
func (cmd *Command) New() Command {
return Command{
Cmd: cmd.Cmd,
Args: &byteArgs{args: cmd.args},
}
}

func (cmd *Command) getKeys(keys []string) []string {
lastIndex := len(keys)
keys = append(keys, "")
Expand Down Expand Up @@ -64,18 +78,25 @@ func (cmd *Command) loadByteArgs() {
}
}

func (cmd *Command) appendArg(arg []byte) {
cmd.args[cmd.argn] = make([]byte, len(arg))
copy(cmd.args[cmd.argn], arg)

cmd.argn++
}

// CommandReader is a type produced by the Conn.ReadCommands method to read a
// single command or a sequence of commands belonging to the same transaction.
type CommandReader struct {
mutex sync.Mutex
conn *Conn
decoder objconv.StreamDecoder
multi bool
done bool
err error
mutex sync.Mutex
conn *Conn
dec objconv.StreamDecoder
multi bool
done bool
err error
}

// Close closes the comand reader, it must be called when all commands have been
// Close closes the command reader, it must be called when all commands have been
// read from the reader in order to release the parent connection's read lock.
func (r *CommandReader) Close() error {
r.mutex.Lock()
Expand All @@ -102,7 +123,6 @@ func (r *CommandReader) Close() error {
// The method returns true if a command could be read, or false if there were
// no more commands to read from the reader.
func (r *CommandReader) Read(cmd *Command) bool {
*cmd = Command{}
r.mutex.Lock()
r.resetDecoder()

Expand All @@ -111,8 +131,10 @@ func (r *CommandReader) Read(cmd *Command) bool {
return false
}

if err := r.decoder.Decode(&cmd.Cmd); err != nil {
r.err = r.decoder.Err()
*cmd = Command{}

if err := r.dec.Decode(&cmd.Cmd); err != nil {
r.err = r.dec.Err()
r.done = true
r.mutex.Unlock()
return false
Expand All @@ -125,24 +147,26 @@ func (r *CommandReader) Read(cmd *Command) bool {
r.done = !r.multi
}

cmd.Args = newCmdArgsReader(r.decoder, r)
cmd.args = make([][]byte, r.dec.Len())
cmd.Args = newCmdArgsReader(r.dec, r, cmd)
return true
}

func (r *CommandReader) resetDecoder() {
r.decoder = objconv.StreamDecoder{Parser: r.decoder.Parser}
r.dec = objconv.StreamDecoder{Parser: r.dec.Parser}
}

func newCmdArgsReader(d objconv.StreamDecoder, r *CommandReader) *cmdArgsReader {
args := &cmdArgsReader{dec: d, r: r}
func newCmdArgsReader(d objconv.StreamDecoder, r *CommandReader, cmd *Command) *cmdArgsReader {
args := &cmdArgsReader{cmd: cmd, dec: d, r: r}
args.b = args.a[:0]
return args
}

type cmdArgsReader struct {
once sync.Once
err error
cmd *Command
dec objconv.StreamDecoder
err error
r *CommandReader
b []byte
a [128]byte
Expand Down Expand Up @@ -178,8 +202,6 @@ func (args *cmdArgsReader) Len() int {
}

func (args *cmdArgsReader) Next(val interface{}) bool {
args.b = args.b[:0]

if args.err != nil {
return false
}
Expand All @@ -191,6 +213,8 @@ func (args *cmdArgsReader) Next(val interface{}) bool {
}
}

args.b = args.b[:0]

if err := args.dec.Decode(&args.b); err != nil {
args.err = args.dec.Err()
return false
Expand All @@ -201,6 +225,8 @@ func (args *cmdArgsReader) Next(val interface{}) bool {
args.err = err
return false
}

args.cmd.appendArg(args.b[:])
}

return true
Expand Down Expand Up @@ -240,6 +266,7 @@ func (args *cmdArgsReader) parseBool(v reflect.Value) error {
if err != nil {
return err
}

v.SetBool(i != 0)
return nil
}
Expand All @@ -249,6 +276,7 @@ func (args *cmdArgsReader) parseInt(v reflect.Value) error {
if err != nil {
return err
}

v.SetInt(i)
return nil
}
Expand All @@ -258,6 +286,7 @@ func (args *cmdArgsReader) parseUint(v reflect.Value) error {
if err != nil {
return err
}

v.SetUint(u)
return nil
}
Expand All @@ -267,6 +296,7 @@ func (args *cmdArgsReader) parseFloat(v reflect.Value) error {
if err != nil {
return err
}

v.SetFloat(f)
return nil
}
Expand Down
Loading

0 comments on commit e5fee5f

Please sign in to comment.