Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What: refactor for redis proxy #7

Merged
merged 1 commit into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions commander.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package redis

// The ServerCommander interface is an abstraction used to extend redis command.
type ServerCommander interface {
LookupCommanders() map[string]Commander
}

// A Commander responds to a Redis CMD.
//
// Except for reading the argument list, handlers should not modify the provided
// Request.
type Commander interface {
ServeCommand(tripper RoundTripper, hashing ServerRing, w ResponseWriter, r *Request)
}

// The CommanderFunc type is an adapter to allow the use of ordinary functions as
// redis commanders. If fn is a function with the appropriate signature.
type CommanderFunc func(tripper RoundTripper, hashing ServerRing, w ResponseWriter, r *Request)

func (fn CommanderFunc) ServeCommand(tripper RoundTripper, hashing ServerRing, w ResponseWriter, r *Request) {
fn(tripper, hashing, w, r)
}
17 changes: 17 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package redis

import "errors"

var (
// ErrServerClosed is returned by Server.Serve when the server is closed.
ErrNil = errors.New("redis: nil")
ErrNilArgs = errors.New("cannot parse values from a nil argument list")
ErrServerClosed = errors.New("redis: Server closed")
ErrNegativeStreamCount = errors.New("invalid call to redis.ResponseWriter.WriteStream with a negative value")
ErrWriteStreamCalledAfterWrite = errors.New("invalid call to redis.ResponseWriter.WriteStream after redis.ResponseWriter.Write was called")
ErrWriteStreamCalledTooManyTimes = errors.New("multiple calls to ResponseWriter.WriteStream")
ErrWriteCalledTooManyTimes = errors.New("too many calls to redis.ResponseWriter.Write")
ErrWriteCalledNotEnoughTimes = errors.New("not enough calls to redis.ResponseWriter.Write")
ErrHijacked = errors.New("invalid use of a hijacked redis.ResponseWriter")
ErrNotHijackable = errors.New("the response writer is not hijackable")
)
29 changes: 29 additions & 0 deletions handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package redis

// The ServerHandler interface is an abstraction used to extend redis handler.
type ServerHandler interface {
LookupHandlers() map[string]Handler
}

// A Handler responds to a Redis request.
//
// ServeRedis should write reply headers and data to the ResponseWriter and then
// return. Returning signals that the request is finished; it is not valid to
// use the ResponseWriter or read from the Request.Args after or concurrently with
// the completion of the ServeRedis call.
//
// Except for reading the argument list, handlers should not modify the provided
// Request.
type Handler interface {
// ServeRedis is called by a Redis server to handle requests.
ServeRedis(ResponseWriter, *Request)
}

// The HandlerFunc type is an adapter to allow the use of ordinary functions as
// Redis handlers. If f is a function with the appropriate signature.
type HandlerFunc func(ResponseWriter, *Request)

// ServeRedis implements the Handler interface, calling f.
func (fn HandlerFunc) ServeRedis(res ResponseWriter, req *Request) {
fn(res, req)
}
1 change: 1 addition & 0 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (c *connList) len() int {
func (c *connList) pop() (conn *Conn) {
if len(c.popList) == 0 {
c.popList, c.pushList = c.pushList, c.popList

reverse(c.popList)
}

Expand Down
2 changes: 1 addition & 1 deletion proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (proxy *ReverseProxy) ServeRedis(w ResponseWriter, r *Request) {
}

func (proxy *ReverseProxy) serveRequest(w ResponseWriter, req *Request) {
keys := make([]string, 0, 10)
cmds := req.Cmds
keys := make([]string, 0, 10)

for i := range cmds {
keys = cmds[i].getKeys(keys)
Expand Down
20 changes: 7 additions & 13 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,32 +149,26 @@ func TestReverseProxy_ServeRedisWithOneshot(t *testing.T) {
it.Zero(response.shots)
}

var benchmarkReverseProxyOnce sync.Once

func BenchmarkReverseProxy_ServeRedis(b *testing.B) {
transport := &redis.Transport{}

validServers, _, _ := makeServerList()

benchmarkReverseProxyOnce.Do(func() {
<-redistest.TestServer(validServers)
})
<-redistest.TestServer(validServers)

proxy := &redis.ReverseProxy{
Transport: transport,
Transport: redis.DefaultTransport,
Registry: validServers,
ErrorLog: log.New(os.Stderr, "[Proxy Hash Bench] ==> ", 0),
}

request := redis.NewRequest(validServers[0].Addr, "SET", redis.List("key", "value"))
request.Context = context.TODO()

response := &responseWriter{}

b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
request := redis.NewRequest("", "SET", redis.List(uuid.New().String(), "value"))
request.Context = context.TODO()

response := &responseWriter{}

proxy.ServeRedis(response, request)
}
})
Expand Down
2 changes: 1 addition & 1 deletion redistest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestServer(serverList redis.ServerList, handlers ...func(w redis.ResponseWr
allServers[endpoint.Addr] = true

go func(addr string) {
log.Println("Starting server ", addr)
// log.Println("Starting server ", addr)

handler := TestServerHandler()
if len(handlers) > 0 {
Expand Down
3 changes: 1 addition & 2 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func (list ServerList) LookupServers(ctx context.Context) (ServerRing, error) {
// TODO: rebuilding the hash ring for every request is not efficient, we should cache and reuse the state.
ring := NewHashRing(endpoints...)

endpoint := ring.LookupServer(key)
return endpoint
return ring.LookupServer(key)
}), nil
}
}
70 changes: 23 additions & 47 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"bufio"
"context"
"errors"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -58,29 +57,6 @@ type Hijacker interface {
Hijack() (net.Conn, *bufio.ReadWriter, error)
}

// A Handler responds to a Redis request.
//
// ServeRedis should write reply headers and data to the ResponseWriter and then
// return. Returning signals that the request is finished; it is not valid to
// use the ResponseWriter or read from the Request.Args after or concurrently with
// the completion of the ServeRedis call.
//
// Except for reading the argument list, handlers should not modify the provided
// Request.
type Handler interface {
// ServeRedis is called by a Redis server to handle requests.
ServeRedis(ResponseWriter, *Request)
}

// The HandlerFunc type is an adapter to allow the use of ordinary functions as
// Redis handlers. If f is a function with the appropriate signature.
type HandlerFunc func(ResponseWriter, *Request)

// ServeRedis implements the Handler interface, calling f.
func (f HandlerFunc) ServeRedis(res ResponseWriter, req *Request) {
f(res, req)
}

// A Server defines parameters for running a Redis server.
type Server struct {
// The address to listen on, ":6379" if empty.
Expand Down Expand Up @@ -169,8 +145,10 @@ func (s *Server) Close() error {
// to idle and then shut down. If the provided context expires before the shutdown
// is complete, then the context's error is returned.
func (s *Server) Shutdown(ctx context.Context) error {
const maxPollInterval = 500 * time.Millisecond
const minPollInterval = 10 * time.Millisecond
const (
minPollInterval = 10 * time.Millisecond
maxPollInterval = 500 * time.Millisecond
)

s.mutex.Lock()

Expand Down Expand Up @@ -201,14 +179,15 @@ func (s *Server) Shutdown(ctx context.Context) error {
// Serve always returns a non-nil error. After Shutdown or Close, the returned
// error is ErrServerClosed.
func (s *Server) Serve(l net.Listener) error {
const maxBackoffDelay = 1 * time.Second
const minBackoffDelay = 10 * time.Millisecond
const (
minBackoffDelay = 10 * time.Millisecond
maxBackoffDelay = 1000 * time.Millisecond
)

defer l.Close()
defer s.untrackListener(l)

s.trackListener(l)
attempt := 0

config := serverConfig{
idleTimeout: s.IdleTimeout,
Expand All @@ -220,6 +199,8 @@ func (s *Server) Serve(l net.Listener) error {
config.idleTimeout = config.readTimeout
}

attempt := 0

for {
conn, err := l.Accept()

Expand All @@ -229,15 +210,18 @@ func (s *Server) Serve(l net.Listener) error {
case <-s.context.Done():
return ErrServerClosed
}

switch {
case isTimeout(err):
continue
case isTemporary(err):
attempt++

select {
case <-time.After(backoff(attempt, minBackoffDelay, maxBackoffDelay)):
case <-s.context.Done():
}

continue
default:
return err
Expand Down Expand Up @@ -344,6 +328,7 @@ func (s *Server) serveCommands(c *Conn, addr string, cmds []Command, config serv
}

err = s.serveRequest(res, req)

req.Close()
cancel()
return
Expand Down Expand Up @@ -379,11 +364,12 @@ func (s *Server) serveRequest(res *responseWriter, req *Request) (err error) {

if preparedRes != nil {
w = preparedRes

w.WriteStream(len(req.Cmds) + len(preparedRes.responses))
}

if req.Cmds = req.Cmds[:i]; len(req.Cmds) != 0 {
s.serveRedis(w, req)
err = s.serveRedis(w, req)
}

if err == nil && preparedRes != nil {
Expand All @@ -403,17 +389,18 @@ func (s *Server) serveRedis(res ResponseWriter, req *Request) (err error) {
err = convertPanicToError(v)
}
}()

s.Handler.ServeRedis(res, req)
return
}

func (s *Server) log(err error) {
if err != ErrHijacked {
print := log.Print
if logger := s.ErrorLog; logger != nil {
print = logger.Print
lprint := log.Print
if s.ErrorLog != nil {
lprint = s.ErrorLog.Print
}
print(err)
lprint(err)
}
}

Expand Down Expand Up @@ -505,6 +492,7 @@ func backoff(attempt int, minDelay time.Duration, maxDelay time.Duration) time.D
if d > maxDelay {
d = maxDelay
}

return d
}

Expand Down Expand Up @@ -668,6 +656,7 @@ func (res *preparedResponseWriter) Hijack() (c net.Conn, rw *bufio.ReadWriter, e
} else {
err = ErrNotHijackable
}

return
}

Expand All @@ -680,16 +669,3 @@ func (res *preparedResponseWriter) writeRemainingValues() (err error) {
res.responses = nil
return
}

var (
// ErrServerClosed is returned by Server.Serve when the server is closed.
ErrNilArgs = errors.New("cannot parse values from a nil argument list")
ErrServerClosed = errors.New("redis: Server closed")
ErrNegativeStreamCount = errors.New("invalid call to redis.ResponseWriter.WriteStream with a negative value")
ErrWriteStreamCalledAfterWrite = errors.New("invalid call to redis.ResponseWriter.WriteStream after redis.ResponseWriter.Write was called")
ErrWriteStreamCalledTooManyTimes = errors.New("multiple calls to ResponseWriter.WriteStream")
ErrWriteCalledTooManyTimes = errors.New("too many calls to redis.ResponseWriter.Write")
ErrWriteCalledNotEnoughTimes = errors.New("not enough calls to redis.ResponseWriter.Write")
ErrHijacked = errors.New("invalid use of a hijacked redis.ResponseWriter")
ErrNotHijackable = errors.New("the response writer is not hijackable")
)
Loading