Skip to content

Commit

Permalink
Merge 154fefd into 1ae9442
Browse files Browse the repository at this point in the history
  • Loading branch information
ldez committed Jan 21, 2020
2 parents 1ae9442 + 154fefd commit 7898a90
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 31 deletions.
40 changes: 40 additions & 0 deletions store/redis/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package redis

import (
"encoding/json"

"github.com/abronan/valkeyrie/store"
)

type Codec interface {
Encode(kv *store.KVPair) (string, error)
Decode(b []byte, kv *store.KVPair) error
}

type RawCodec struct{}

func (c RawCodec) Encode(kv *store.KVPair) (string, error) {
if kv == nil {
return "", nil
}

return string(kv.Value), nil
}

func (c RawCodec) Decode(b []byte, kv *store.KVPair) error {
// kv.Key = "" // FIXME
kv.Value = b

return nil
}

type JSONCodec struct{}

func (c JSONCodec) Encode(kv *store.KVPair) (string, error) {
b, err := json.Marshal(kv)
return string(b), err
}

func (c JSONCodec) Decode(b []byte, kv *store.KVPair) error {
return json.Unmarshal(b, kv)
}
66 changes: 36 additions & 30 deletions store/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package redis

import (
"encoding/json"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -36,21 +35,23 @@ func Register() {
// New creates a new Redis client given a list
// of endpoints and optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
var password string
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
if options != nil && options.TLS != nil {
return nil, ErrTLSUnsupported
}

var password string
if options != nil && options.Password != "" {
password = options.Password
}
return newRedis(endpoints, password)

return newRedis(endpoints, password, &RawCodec{})
}

func newRedis(endpoints []string, password string) (*Redis, error) {
// TODO: use *redis.ClusterClient if we support miltiple endpoints
func newRedis(endpoints []string, password string, codec Codec) (*Redis, error) {
// TODO: use *redis.ClusterClient if we support multiple endpoints
client := redis.NewClient(&redis.Options{
Addr: endpoints[0],
DialTimeout: 5 * time.Second,
Expand All @@ -62,29 +63,23 @@ func newRedis(endpoints []string, password string) (*Redis, error) {
// Listen to Keyspace events
client.ConfigSet("notify-keyspace-events", "KEA")

var c Codec = &JSONCodec{}
if codec != nil {
c = codec
}

return &Redis{
client: client,
script: redis.NewScript(luaScript()),
codec: defaultCodec{},
codec: c,
}, nil
}

type defaultCodec struct{}

func (c defaultCodec) encode(kv *store.KVPair) (string, error) {
b, err := json.Marshal(kv)
return string(b), err
}

func (c defaultCodec) decode(b string, kv *store.KVPair) error {
return json.Unmarshal([]byte(b), kv)
}

// Redis implements valkeyrie.Store interface with redis backend
type Redis struct {
client *redis.Client
script *redis.Script
codec defaultCodec
codec Codec
}

const (
Expand All @@ -107,7 +102,7 @@ func (r *Redis) Put(key string, value []byte, options *store.WriteOptions) error
}

func (r *Redis) setTTL(key string, val *store.KVPair, ttl time.Duration) error {
valStr, err := r.codec.encode(val)
valStr, err := r.codec.Encode(val)
if err != nil {
return err
}
Expand All @@ -129,9 +124,14 @@ func (r *Redis) get(key string) (*store.KVPair, error) {
return nil, err
}
val := store.KVPair{}
if err := r.codec.decode(string(reply), &val); err != nil {
if err := r.codec.Decode(reply, &val); err != nil {
return nil, err
}

if val.Key == "" {
val.Key = key
}

return &val, nil
}

Expand Down Expand Up @@ -505,7 +505,7 @@ func (r *Redis) mget(directory string, keys ...string) ([]*store.KVPair, error)
}

pairs := []*store.KVPair{}
for _, reply := range replies {
for i, reply := range replies {
var sreply string
if _, ok := reply.(string); ok {
sreply = reply.(string)
Expand All @@ -515,12 +515,17 @@ func (r *Redis) mget(directory string, keys ...string) ([]*store.KVPair, error)
continue
}

newkv := &store.KVPair{}
if err := r.codec.decode(sreply, newkv); err != nil {
pair := &store.KVPair{}
if err := r.codec.Decode([]byte(sreply), pair); err != nil {
return nil, err
}
if normalize(newkv.Key) != directory {
pairs = append(pairs, newkv)

if pair.Key == "" {
pair.Key = keys[i]
}

if normalize(pair.Key) != directory {
pairs = append(pairs, pair)
}
}
return pairs, nil
Expand Down Expand Up @@ -575,7 +580,7 @@ func (r *Redis) AtomicPut(key string, value []byte, previous *store.KVPair, opti
}

func (r *Redis) setNX(key string, val *store.KVPair, expirationAfter time.Duration) error {
valBlob, err := r.codec.encode(val)
valBlob, err := r.codec.Encode(val)
if err != nil {
return err
}
Expand All @@ -587,12 +592,12 @@ func (r *Redis) setNX(key string, val *store.KVPair, expirationAfter time.Durati
}

func (r *Redis) cas(key string, old, new *store.KVPair, secInStr string) error {
newVal, err := r.codec.encode(new)
newVal, err := r.codec.Encode(new)
if err != nil {
return err
}

oldVal, err := r.codec.encode(old)
oldVal, err := r.codec.Encode(old)
if err != nil {
return err
}
Expand All @@ -616,7 +621,7 @@ func (r *Redis) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
}

func (r *Redis) cad(key string, old *store.KVPair) error {
oldVal, err := r.codec.encode(old)
oldVal, err := r.codec.Encode(old)
if err != nil {
return err
}
Expand Down Expand Up @@ -653,7 +658,8 @@ func (r *Redis) runScript(args ...interface{}) error {
}

func normalize(key string) string {
return store.Normalize(key)
key = store.Normalize(key)
return strings.TrimPrefix(key, "/")
}

func formatSec(dur time.Duration) string {
Expand Down
2 changes: 1 addition & 1 deletion store/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
)

func makeRedisClient(t *testing.T) store.Store {
kv, err := newRedis([]string{client}, "")
kv, err := newRedis([]string{client}, "", nil)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
Expand Down

0 comments on commit 7898a90

Please sign in to comment.