Skip to content

Commit

Permalink
Add local cache
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jul 11, 2018
1 parent 61ff342 commit 7c4c14d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ install:
- go get github.com/iron-io/iron_go3/mq
- go get github.com/aws/aws-sdk-go/service/sqs
- go get github.com/golang/snappy
- go get github.com/hashicorp/golang-lru/simplelru
2 changes: 1 addition & 1 deletion message.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func timeSlot(period time.Duration) int64 {

func hashArgs(args []interface{}) []byte {
b, _ := msgpack.Marshal(args...)
if len(b) <= 128 {
if len(b) <= 64 {
return b
}

Expand Down
44 changes: 41 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package msgqueue

import (
"runtime"
"sync"
"time"

"github.com/go-redis/redis"
"github.com/go-redis/redis_rate"
"github.com/hashicorp/golang-lru/simplelru"
"golang.org/x/time/rate"
)

Expand All @@ -27,13 +29,23 @@ type Storage interface {
}

type redisStorage struct {
Redis
redis Redis
}

var _ Storage = (*redisStorage)(nil)

func newRedisStorage(redis Redis) redisStorage {
return redisStorage{
redis: redis,
}
}

func (s redisStorage) Exists(key string) bool {
val, err := s.SetNX(key, "", 24*time.Hour).Result()
if localCacheExists(key) {
return true
}

val, err := s.redis.SetNX(key, "", 24*time.Hour).Result()
if err != nil {
return true
}
Expand Down Expand Up @@ -167,7 +179,7 @@ func (opt *Options) Init() {
}

if opt.Storage == nil {
opt.Storage = redisStorage{opt.Redis}
opt.Storage = newRedisStorage(opt.Redis)
}

if opt.RateLimit != rate.Inf && opt.RateLimiter == nil && opt.Redis != nil {
Expand All @@ -176,3 +188,29 @@ func (opt *Options) Init() {
opt.RateLimiter = limiter
}
}

var (
mu sync.Mutex
cache *simplelru.LRU
)

func localCacheExists(key string) bool {
mu.Lock()
defer mu.Unlock()

if cache == nil {
var err error
cache, err = simplelru.NewLRU(128000, nil)
if err != nil {
panic(err)
}
}

_, ok := cache.Get(key)
if ok {
return true
}

cache.Add(key, nil)
return false
}

0 comments on commit 7c4c14d

Please sign in to comment.