Skip to content

Commit

Permalink
Add Get State Common (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuangTung97 committed Oct 11, 2023
1 parent a2abedc commit 20f04b1
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 63 deletions.
189 changes: 126 additions & 63 deletions item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,11 @@ func New[T Value, K Key](
options ...Option,
) *Item[T, K] {
return &Item[T, K]{
options: computeOptions(options),
sess: pipeline.LowerSession(),
pipeline: pipeline,
common: itemCommon{
options: computeOptions(options),
sess: pipeline.LowerSession(),
pipeline: pipeline,
},

unmarshaler: unmarshaler,
filler: filler,
Expand All @@ -213,25 +215,31 @@ func New[T Value, K Key](
// Item is NOT thread safe and, it contains a cached keys
// once a key is cached in memory, it will return the same value unless call **Reset**
type Item[T Value, K Key] struct {
options *itemOptions
sess memproxy.Session
pipeline memproxy.Pipeline
unmarshaler Unmarshaler[T]
filler Filler[T, K]

getKeys map[K]*getResultType[T]

stats Stats
common itemCommon
}

func (i *Item[T, K]) addNextCall(fn func(obj unsafe.Pointer)) {
type itemCommon struct {
options *itemOptions
sess memproxy.Session
pipeline memproxy.Pipeline
stats Stats

unmarshalAndSet func(data []byte)
}

func (i *itemCommon) addNextCall(fn func(obj unsafe.Pointer)) {
i.sess.AddNextCall(memproxy.CallbackFunc{
Object: nil,
Func: fn,
})
}

func (i *Item[T, K]) addDelayedCall(d time.Duration, fn func(obj unsafe.Pointer)) {
func (i *itemCommon) addDelayedCall(d time.Duration, fn func(obj unsafe.Pointer)) {
i.sess.AddDelayedCall(d, memproxy.CallbackFunc{
Object: nil,
Func: fn,
Expand All @@ -244,13 +252,18 @@ type getResultType[T any] struct {
}

func (s *GetState[T, K]) handleLeaseGranted(cas uint64) {
fillFn := s.it.filler(s.ctx, s.key)
s.it.addNextCall(func(_ unsafe.Pointer) {
it := s.getItem()

fillFn := it.filler(s.common.ctx, s.key)

it.common.addNextCall(func(_ unsafe.Pointer) {
it := s.common.item

fillResp, err := fillFn()

if err == ErrNotFound {
s.setResponse(fillResp)
s.it.pipeline.Delete(s.keyStr, memproxy.DeleteOptions{})
it.pipeline.Delete(s.common.keyStr, memproxy.DeleteOptions{})
return
}

Expand All @@ -267,54 +280,98 @@ func (s *GetState[T, K]) handleLeaseGranted(cas uint64) {
s.setResponse(fillResp)

if cas > 0 {
_ = s.it.pipeline.LeaseSet(s.keyStr, data, cas, memproxy.LeaseSetOptions{})
s.it.addNextCall(func(obj unsafe.Pointer) {
s.it.pipeline.Execute()
_ = it.pipeline.LeaseSet(s.common.keyStr, data, cas, memproxy.LeaseSetOptions{})
it.addNextCall(func(obj unsafe.Pointer) {
s.common.item.pipeline.Execute()
})
}
})
}

// GetState store intermediate state when getting item
type GetState[T Value, K Key] struct {
type getStateMethods interface {
setResponseError(err error)
doFillFunc(cas uint64)
unmarshalAndSet(data []byte)
}

type getStateCommon struct {
ctx context.Context
key K

it *Item[T, K]
itemRoot unsafe.Pointer
item *itemCommon

retryCount int
keyStr string

leaseGetResult memproxy.LeaseGetResult

methods getStateMethods
}

// GetState store intermediate state when getting item
type GetState[T Value, K Key] struct {
common *getStateCommon
key K
result getResultType[T]
}

func (s *GetState[T, K]) getItem() *Item[T, K] {
return (*Item[T, K])(s.common.itemRoot)
}

func (s *GetState[T, K]) unmarshalAndSet(data []byte) {
it := s.getItem()
resp, err := it.unmarshaler(data)

memcache.ReleaseGetResponseData(data)

if err != nil {
s.setResponseError(err)
return
}
s.setResponse(resp)
}

func (s *GetState[T, K]) setResponseError(err error) {
s.it.options.errorLogger(err)
s.it.getKeys[s.key].err = err
it := s.getItem()

s.common.item.options.errorLogger(err)
it.getKeys[s.key].err = err
}

func (s *GetState[T, K]) setResponse(resp T) {
s.it.getKeys[s.key].resp = resp
it := s.getItem()
it.getKeys[s.key].resp = resp
}

func (s *GetState[T, K]) doFillFunc(cas uint64) {
s.it.stats.FillCount++
s.common.item.stats.FillCount++
s.handleLeaseGranted(cas)
}

func (s *GetState[T, K]) handleCacheError(err error) {
s.it.stats.LeaseGetError++
if s.it.options.fillingOnCacheError {
s.it.options.errorLogger(err)
s.doFillFunc(0)
func (s *getStateCommon) handleCacheError(err error) {
s.item.stats.LeaseGetError++
if s.item.options.fillingOnCacheError {
s.item.options.errorLogger(err)
s.methods.doFillFunc(0)
} else {
s.setResponseError(err)
s.methods.setResponseError(err)
}
}

func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) {
func (s *getStateCommon) newNextCallback() memproxy.CallbackFunc {
return memproxy.CallbackFunc{
Object: unsafe.Pointer(s),
Func: stateCommonNextCallback,
}
}

func stateCommonNextCallback(obj unsafe.Pointer) {
s := (*getStateCommon)(obj)
s.nextFunc()
}

func (s *getStateCommon) nextFunc() {
leaseGetResp, err := s.leaseGetResult.Result()

s.leaseGetResult = nil
Expand All @@ -324,46 +381,40 @@ func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) {
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusFound {
s.it.stats.HitCount++
s.it.stats.TotalBytesRecv += uint64(len(leaseGetResp.Data))

resp, err := s.it.unmarshaler(leaseGetResp.Data)
it := s.item

memcache.ReleaseGetResponseData(leaseGetResp.Data)
if leaseGetResp.Status == memproxy.LeaseGetStatusFound {
it.stats.HitCount++
it.stats.TotalBytesRecv += uint64(len(leaseGetResp.Data))

if err != nil {
s.setResponseError(err)
return
}
s.setResponse(resp)
s.methods.unmarshalAndSet(leaseGetResp.Data)
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseGranted {
s.doFillFunc(leaseGetResp.CAS)
s.methods.doFillFunc(leaseGetResp.CAS)
return
}

if leaseGetResp.Status == memproxy.LeaseGetStatusLeaseRejected {
s.it.increaseRejectedCount(s.retryCount)
it.increaseRejectedCount(s.retryCount)

if s.retryCount < len(s.it.options.sleepDurations) {
s.it.addDelayedCall(s.it.options.sleepDurations[s.retryCount], func(_ unsafe.Pointer) {
if s.retryCount < len(it.options.sleepDurations) {
it.addDelayedCall(it.options.sleepDurations[s.retryCount], func(_ unsafe.Pointer) {
s.retryCount++

s.leaseGetResult = s.it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
s.it.addNextCall(s.nextFunc)
s.leaseGetResult = it.pipeline.LeaseGet(s.keyStr, memproxy.LeaseGetOptions{})
it.sess.AddNextCall(s.newNextCallback())
})
return
}

if !s.it.options.errorOnRetryLimit {
s.doFillFunc(leaseGetResp.CAS)
if !it.options.errorOnRetryLimit {
s.methods.doFillFunc(leaseGetResp.CAS)
return
}

s.setResponseError(ErrExceededRejectRetryLimit)
s.methods.setResponseError(ErrExceededRejectRetryLimit)
return
}

Expand All @@ -372,9 +423,13 @@ func (s *GetState[T, K]) nextFunc(_ unsafe.Pointer) {

// Result returns result
func (s *GetState[T, K]) Result() (T, error) {
s.it.sess.Execute()
it := s.getItem()
it.common.sess.Execute()

putGetStateCommon(s.common)
s.common = nil

result := s.it.getKeys[s.key]
result := it.getKeys[s.key]
return result.resp, result.err
}

Expand All @@ -387,30 +442,38 @@ func (i *Item[T, K]) Get(ctx context.Context, key K) func() (T, error) {
func (i *Item[T, K]) GetFast(ctx context.Context, key K) *GetState[T, K] {
keyStr := key.String()

state := &GetState[T, K]{
ctx: ctx,
key: key,
// init get state common
sc := newGetStateCommon()

sc.ctx = ctx

sc.itemRoot = unsafe.Pointer(i)
sc.item = &i.common

it: i,
sc.keyStr = keyStr
// end init get state common

retryCount: 0,
keyStr: keyStr,
state := &GetState[T, K]{
common: sc,
key: key,
}

sc.methods = state

_, existed := i.getKeys[key]
if existed {
return state
}
i.getKeys[key] = &state.result

state.leaseGetResult = i.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})
sc.leaseGetResult = i.common.pipeline.LeaseGet(keyStr, memproxy.LeaseGetOptions{})

i.addNextCall(state.nextFunc)
i.common.sess.AddNextCall(sc.newNextCallback())

return state
}

func (i *Item[T, K]) increaseRejectedCount(retryCount int) {
func (i *itemCommon) increaseRejectedCount(retryCount int) {
i.stats.TotalRejectedCount++

switch retryCount {
Expand All @@ -425,7 +488,7 @@ func (i *Item[T, K]) increaseRejectedCount(retryCount int) {

// LowerSession ...
func (i *Item[T, K]) LowerSession() memproxy.Session {
return i.sess.GetLower()
return i.common.sess.GetLower()
}

// Reset clear in-memory cached values
Expand All @@ -450,5 +513,5 @@ type Stats struct {

// GetStats ...
func (i *Item[T, K]) GetStats() Stats {
return i.stats
return i.common.stats
}
5 changes: 5 additions & 0 deletions item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -1203,3 +1204,7 @@ func TestItem_WithFakePipeline(t *testing.T) {

assert.Equal(t, 1, fillCalls)
}

func TestSizeOfStateCommon(t *testing.T) {
assert.Equal(t, uintptr(88), unsafe.Sizeof(getStateCommon{}))
}
20 changes: 20 additions & 0 deletions item/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package item

import (
"sync"
)

var getStateCommonPool = sync.Pool{
New: func() any {
return &getStateCommon{}
},
}

func newGetStateCommon() *getStateCommon {
return getStateCommonPool.Get().(*getStateCommon)
}

func putGetStateCommon(s *getStateCommon) {
*s = getStateCommon{}
getStateCommonPool.Put(s)
}
1 change: 1 addition & 0 deletions item/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package item

0 comments on commit 20f04b1

Please sign in to comment.