Skip to content

Commit

Permalink
Merge pull request #85 from feiguoL/master
Browse files Browse the repository at this point in the history
fix: fix cacheManager use error with redis and pegasus
  • Loading branch information
eko committed May 3, 2021
2 parents 1a9de67 + 4699aca commit ade5d74
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 162 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ freecacheStore := store.NewFreecache(freecache.NewCache(1000), &Options{
cacheManager := cache.New(freecacheStore)
err := cacheManager.Set("by-key", []byte("my-value"), opts)
if err != nil {
panic(err)
panic(err)
}

value := cacheManager.Get("my-key")
Expand All @@ -152,17 +152,17 @@ pegasusStore, err := store.NewPegasus(&store.OptionsPegasus{
MetaServers: []string{"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"},
})

if err != nil{
if err != nil {
fmt.Println(err)
return
}

cacheManager := cache.New(pegasusStore)
err = cacheManager.Set("my-key", "my-value"), store.Options{
err = cacheManager.Set("my-key", "my-value", &store.Options{
Expiration: 10 * time.Second,
})
if err != nil {
panic(err)
panic(err)
}

value, _ := cacheManager.Get("my-key")
Expand Down
2 changes: 1 addition & 1 deletion store/bigcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"strings"
time "time"
"time"
)

// BigcacheClientInterface represents a allegro/bigcache client
Expand Down
2 changes: 1 addition & 1 deletion store/bigcache_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"math"
"testing"
time "time"
"time"

"github.com/allegro/bigcache/v2"
)
Expand Down
2 changes: 1 addition & 1 deletion store/go_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package store
import (
"errors"
"fmt"
time "time"
"time"
)

const (
Expand Down
1 change: 1 addition & 0 deletions store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"time"
)

// StoreInterface is the interface for all available stores
type StoreInterface interface {
Get(key interface{}) (interface{}, error)
Expand Down
2 changes: 1 addition & 1 deletion store/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"strings"
time "time"
"time"

"github.com/bradfitz/gomemcache/memcache"
)
Expand Down
2 changes: 1 addition & 1 deletion store/memcache_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"math"
"testing"
time "time"
"time"

"github.com/bradfitz/gomemcache/memcache"
)
Expand Down
9 changes: 9 additions & 0 deletions store/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"time"
)

Expand All @@ -15,6 +16,9 @@ type Options struct {

// Tags allows to specify associated tags to the current value
Tags []string

// Ctx pass context for control timeout for all operations
Ctx context.Context
}

// CostValue returns the allocated memory capacity
Expand All @@ -31,3 +35,8 @@ func (o Options) ExpirationValue() time.Duration {
func (o Options) TagsValue() []string {
return o.Tags
}

// CtxValue returns the ctx option value
func (o Options) CtxValue() context.Context {
return o.Ctx
}
11 changes: 11 additions & 0 deletions store/options_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -36,3 +37,13 @@ func TestOptionsTagsValue(t *testing.T) {
// When - Then
assert.Equal(t, []string{"tag1", "tag2", "tag3"}, options.TagsValue())
}

func TestOptions_CtxValue(t *testing.T) {
// Given
options := Options{
Ctx: context.Background(),
}

// When - Then
assert.Equal(t, context.Background(), options.Ctx)
}
76 changes: 43 additions & 33 deletions store/pegasus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ type PegasusStore struct {
}

// NewPegasus creates a new store to pegasus instance(s)
func NewPegasus(ctx context.Context, options *OptionsPegasus) (*PegasusStore, error) {
func NewPegasus(options *OptionsPegasus) (*PegasusStore, error) {
if options == nil {
options = &OptionsPegasus{}
}

if err := createTable(ctx, options); err != nil {
if err := createTable(options); err != nil {
return nil, err
}

client := pegasus.NewClient(pegasus.Config{
MetaServers: options.MetaServers,
})
table, err := client.OpenTable(ctx, options.TableName)
table, err := client.OpenTable(options.Ctx, options.TableName)
defer table.Close()
if err != nil {
return nil, err
Expand All @@ -85,18 +85,21 @@ func validateOptions(options *OptionsPegasus) error {
if options.TableScanNum < 1 {
options.TableScanNum = DefaultScanNum
}
if options.Ctx == nil {
options.Ctx = context.Background()
}

return nil
}

// createTable for create table by options
func createTable(ctx context.Context, options *OptionsPegasus) error {
func createTable(options *OptionsPegasus) error {
if err := validateOptions(options); err != nil {
return err
}

tableClient := admin.NewClient(admin.Config{MetaServers: options.MetaServers})
tableList, err := tableClient.ListTables(ctx)
tableList, err := tableClient.ListTables(options.Ctx)
if err != nil {
return err
}
Expand All @@ -108,13 +111,17 @@ func createTable(ctx context.Context, options *OptionsPegasus) error {
}

// if not found then create table of options
return tableClient.CreateTable(ctx, options.TableName, options.TablePartitionNum)
return tableClient.CreateTable(options.Ctx, options.TableName, options.TablePartitionNum)
}

// dropTable for drop table
func dropTable(ctx context.Context, options *OptionsPegasus) error {
func dropTable(options *OptionsPegasus) error {
if err := validateOptions(options); err != nil {
return err
}

tableClient := admin.NewClient(admin.Config{MetaServers: options.MetaServers})
return tableClient.DropTable(ctx, options.TableName)
return tableClient.DropTable(options.Ctx, options.TableName)
}

// Close when exit store
Expand All @@ -123,14 +130,14 @@ func (p *PegasusStore) Close() error {
}

// Get returns data stored from a given key
func (p *PegasusStore) Get(ctx context.Context, key interface{}) (interface{}, error) {
table, err := p.client.OpenTable(ctx, p.options.TableName)
func (p *PegasusStore) Get(key interface{}) (interface{}, error) {
table, err := p.client.OpenTable(p.options.Ctx, p.options.TableName)
defer table.Close()
if err != nil {
return nil, err
}

value, err := table.Get(ctx, []byte(cast.ToString(key)), empty)
value, err := table.Get(p.options.Ctx, []byte(cast.ToString(key)), empty)
if err != nil {
return nil, err
}
Expand All @@ -139,19 +146,19 @@ func (p *PegasusStore) Get(ctx context.Context, key interface{}) (interface{}, e
}

// GetWithTTL returns data stored from a given key and its corresponding TTL
func (p *PegasusStore) GetWithTTL(ctx context.Context, key interface{}) (interface{}, time.Duration, error) {
table, err := p.client.OpenTable(ctx, p.options.TableName)
func (p *PegasusStore) GetWithTTL(key interface{}) (interface{}, time.Duration, error) {
table, err := p.client.OpenTable(p.options.Ctx, p.options.TableName)
defer table.Close()
if err != nil {
return nil, 0, err
}

value, err := table.Get(ctx, []byte(cast.ToString(key)), empty)
value, err := table.Get(p.options.Ctx, []byte(cast.ToString(key)), empty)
if err != nil {
return nil, 0, err
}

ttl, err := table.TTL(ctx, []byte(cast.ToString(key)), empty)
ttl, err := table.TTL(p.options.Ctx, []byte(cast.ToString(key)), empty)
if err != nil {
return nil, 0, err
}
Expand All @@ -160,36 +167,39 @@ func (p *PegasusStore) GetWithTTL(ctx context.Context, key interface{}) (interfa
}

// Set defines data in Pegasus for given key identifier
func (p *PegasusStore) Set(ctx context.Context, key, value interface{}, options *Options) error {
func (p *PegasusStore) Set(key, value interface{}, options *Options) error {
if options == nil {
options = &Options{}
}
if options.Ctx == nil {
options.Ctx = context.Background()
}

table, err := p.client.OpenTable(ctx, p.options.TableName)
table, err := p.client.OpenTable(options.Ctx, p.options.TableName)
defer table.Close()
if err != nil {
return err
}

err = table.SetTTL(ctx, []byte(cast.ToString(key)), empty, []byte(cast.ToString(value)), options.Expiration)
err = table.SetTTL(options.Ctx, []byte(cast.ToString(key)), empty, []byte(cast.ToString(value)), options.Expiration)
if err != nil {
return err
}

if tags := options.TagsValue(); len(tags) > 0 {
if err = p.setTags(ctx, key, tags); err != nil {
if err = p.setTags(key, tags); err != nil {
return err
}
}
return nil
}

func (p *PegasusStore) setTags(ctx context.Context, key interface{}, tags []string) error {
func (p *PegasusStore) setTags(key interface{}, tags []string) error {
for _, tag := range tags {
var tagKey = fmt.Sprintf(PegasusTagPattern, tag)
var cacheKeys = []string{}

if result, err := p.Get(ctx, tagKey); err == nil {
if result, err := p.Get(tagKey); err == nil {
if bytes, ok := result.([]byte); ok {
cacheKeys = strings.Split(string(bytes), ",")
}
Expand All @@ -207,7 +217,7 @@ func (p *PegasusStore) setTags(ctx context.Context, key interface{}, tags []stri
cacheKeys = append(cacheKeys, key.(string))
}

if err := p.Set(ctx, tagKey, []byte(strings.Join(cacheKeys, ",")), &Options{
if err := p.Set(tagKey, []byte(strings.Join(cacheKeys, ",")), &Options{
Expiration: 720 * time.Hour,
}); err != nil {
return err
Expand All @@ -218,22 +228,22 @@ func (p *PegasusStore) setTags(ctx context.Context, key interface{}, tags []stri
}

// Delete removes data from Pegasus for given key identifier
func (p *PegasusStore) Delete(ctx context.Context, key interface{}) error {
table, err := p.client.OpenTable(ctx, p.options.TableName)
func (p *PegasusStore) Delete(key interface{}) error {
table, err := p.client.OpenTable(p.options.Ctx, p.options.TableName)
defer table.Close()
if err != nil {
return err
}

return table.Del(ctx, []byte(cast.ToString(key)), empty)
return table.Del(p.options.Ctx, []byte(cast.ToString(key)), empty)
}

// Invalidate invalidates some cache data in Pegasus for given options
func (p *PegasusStore) Invalidate(ctx context.Context, options InvalidateOptions) error {
func (p *PegasusStore) Invalidate(options InvalidateOptions) error {
if tags := options.TagsValue(); len(tags) > 0 {
for _, tag := range tags {
var tagKey = fmt.Sprintf(PegasusTagPattern, tag)
result, err := p.Get(ctx, tagKey)
result, err := p.Get(tagKey)
if err != nil {
return nil
}
Expand All @@ -244,7 +254,7 @@ func (p *PegasusStore) Invalidate(ctx context.Context, options InvalidateOptions
}

for _, cacheKey := range cacheKeys {
if err := p.Delete(ctx, cacheKey); err != nil {
if err := p.Delete(cacheKey); err != nil {
return err
}
}
Expand All @@ -255,15 +265,15 @@ func (p *PegasusStore) Invalidate(ctx context.Context, options InvalidateOptions
}

// Clear resets all data in the store
func (p *PegasusStore) Clear(ctx context.Context) error {
table, err := p.client.OpenTable(ctx, p.options.TableName)
func (p *PegasusStore) Clear() error {
table, err := p.client.OpenTable(p.options.Ctx, p.options.TableName)
defer table.Close()
if err != nil {
return err
}

// init full scan
scanners, err := table.GetUnorderedScanners(ctx, p.options.TablePartitionNum, &pegasus.ScannerOptions{
scanners, err := table.GetUnorderedScanners(p.options.Ctx, p.options.TablePartitionNum, &pegasus.ScannerOptions{
BatchSize: p.options.TableScanNum,
// Values can be optimized out during scanning to reduce the workload.
NoValue: true,
Expand All @@ -276,14 +286,14 @@ func (p *PegasusStore) Clear(ctx context.Context) error {
for _, scanner := range scanners {
// Iterates sequentially.
for true {
completed, hashKey, _, _, err := scanner.Next(ctx)
completed, hashKey, _, _, err := scanner.Next(p.options.Ctx)
if err != nil {
return err
}
if completed {
break
}
err = p.Delete(ctx, hashKey)
err = p.Delete(hashKey)
if err != nil {
return err
}
Expand Down

0 comments on commit ade5d74

Please sign in to comment.