Skip to content

Commit

Permalink
Merge pull request #16 from dolab/pipeline
Browse files Browse the repository at this point in the history
What: reset conn read timeout for pipeline
  • Loading branch information
mcspring committed Jul 25, 2019
2 parents 9b1dff7 + f78f546 commit 917f71c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 26 deletions.
19 changes: 12 additions & 7 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func (cmd *Command) newCommand() Command {

// pipeCommand returns a new Command for for pipeline, or an error of ErrNotPipeline indicates no pipeline.
func (cmd *Command) pipeCommand() (pipe Command, err error) {
if cmd.Args == nil {
err = ErrNotPipeline
return
}

// shortcut for ping cmd
if cmd.Cmd == "PING" {
err = ErrNotPipeline
Expand All @@ -60,7 +65,7 @@ func (cmd *Command) pipeCommand() (pipe Command, err error) {
switch cmd.Args.(type) {
case *cmdArgsReader:
cmdArgs := cmd.Args.(*cmdArgsReader)
if !cmdArgs.nextCommand(&pipe) {
if !cmdArgs.parseCommand(&pipe) {
err = ErrNotPipeline
}

Expand All @@ -82,7 +87,7 @@ func (cmd *Command) pipeCommand() (pipe Command, err error) {
if cmdArgs == nil {
err = ErrNotPipeline
} else {
if !cmdArgs.nextCommand(&pipe) {
if !cmdArgs.parseCommand(&pipe) {
err = ErrNotPipeline
}
}
Expand Down Expand Up @@ -203,10 +208,8 @@ func (r *CommandReader) Read(cmd *Command) bool {
return true
}

func (r *CommandReader) resetReader(cmd *Command) bool {
func (r *CommandReader) resetReader() {
r.done = false

return r.Read(cmd)
}

func (r *CommandReader) resetDecoder() {
Expand Down Expand Up @@ -292,8 +295,10 @@ func (args *cmdArgsReader) Next(val interface{}) bool {
return true
}

func (args *cmdArgsReader) nextCommand(cmd *Command) bool {
return args.r.resetReader(cmd)
func (args *cmdArgsReader) parseCommand(cmd *Command) bool {
args.r.resetReader()

return args.r.Read(cmd)
}

func (args *cmdArgsReader) parse(v reflect.Value) error {
Expand Down
19 changes: 6 additions & 13 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/dolab/objconv/resp"
"github.com/dolab/redis-go"
"github.com/dolab/redis-go/metrics"
"github.com/dolab/redis-go/redistest"
"github.com/golib/assert"
"github.com/google/uuid"

"github.com/dolab/redis-go"
"github.com/dolab/redis-go/redistest"
)

func TestServerMetrics(t *testing.T) {
Expand All @@ -23,24 +24,16 @@ func TestServerMetrics(t *testing.T) {
respErr := resp.NewError("ERR something went wrong")

var counter int64
srv, addr := redistest.FakeServer(redis.HandlerFunc(func(res redis.ResponseWriter, req *redis.Request) {
srv, addr := redistest.FakeMetricsServer(redis.HandlerFunc(func(res redis.ResponseWriter, req *redis.Request) {
if atomic.AddInt64(&counter, 1)%2 == 0 {
res.Write(respErr)
} else {
res.Write("OK")
}

}))
}), time.Second)
defer srv.Close()

opts := metrics.Options{
Subsystem: "proxy",
Labels: nil,
EnableServerMetrics: true,
}

srv.WithMetrics(opts)

tr := &redis.Transport{MaxIdleConns: 1}
defer tr.CloseIdleConnections()

Expand Down
55 changes: 51 additions & 4 deletions redistest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/dolab/redis-go"
"github.com/dolab/redis-go/metrics"

"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -119,22 +121,67 @@ func FakeServer(handler redis.Handler) (srv *redis.Server, url string) {
return FakeTimeoutServer(handler, 1000*time.Millisecond)
}

func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (serv *redis.Server, addr string) {
func FakeTimeoutServer(handler redis.Handler, timeout time.Duration) (srv *redis.Server, addr string) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}

srv = &redis.Server{
Handler: handler,
ReadTimeout: 3 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: timeout,
ErrorLog: log.New(os.Stdout, "[Server Timeout] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND),
}

go func() {
err := srv.Serve(l)
if err != redis.ErrServerClosed {
log.Fatalf("[Server] %v", err)
}
}()

addr = l.Addr().String()

stopCh := make(chan struct{})
wait.Until(func() {
client := redis.Client{
Addr: addr,
Timeout: 10 * time.Millisecond,
}

err := client.Exec(context.Background(), "PING")
if err == nil {
close(stopCh)
}
}, 10*time.Millisecond, stopCh)
<-stopCh

return
}

func FakeMetricsServer(handler redis.Handler, timeout time.Duration) (srv *redis.Server, addr string) {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}

serv = &redis.Server{
srv = &redis.Server{
Handler: handler,
ReadTimeout: 3 * time.Second,
WriteTimeout: 5 * time.Second,
IdleTimeout: timeout,
ErrorLog: log.New(os.Stdout, "[Test Server] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND),
ErrorLog: log.New(os.Stdout, "[Server Metrics] ", os.O_CREATE|os.O_WRONLY|os.O_APPEND),
}
srv.WithMetrics(metrics.Options{
Subsystem: "proxy",
Labels: nil,
EnableServerMetrics: true,
})

go func() {
err := serv.Serve(l)
err := srv.Serve(l)
if err != redis.ErrServerClosed {
log.Fatalf("[Server] %v", err)
}
Expand Down
3 changes: 3 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ func (s *Server) servePipeline(c *Conn, addr string, cmds []Command, config serv
}

if len(pipeCmds) > 0 {
// TODO: This is for temporary solution and it should refactor to pipeline way!
c.setTimeout(config.readTimeout)

err = s.serveCommands(c, addr, pipeCmds, config)
}

Expand Down
5 changes: 3 additions & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"time"

"github.com/dolab/objconv/resp"
"github.com/dolab/redis-go"
"github.com/dolab/redis-go/redistest"
goredis "github.com/go-redis/redis"
fuzz "github.com/google/gofuzz"
"github.com/google/uuid"

"github.com/dolab/redis-go"
"github.com/dolab/redis-go/redistest"
)

func TestServer(t *testing.T) {
Expand Down

0 comments on commit 917f71c

Please sign in to comment.