Skip to content

Commit

Permalink
fixed race condiction and map locking
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanDolphin committed Apr 4, 2016
1 parent a2dfe73 commit 773b741
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 31 deletions.
40 changes: 32 additions & 8 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package horsefeather

import (
"reflect"
"sync"

"golang.org/x/net/context"
"google.golang.org/appengine/datastore"
Expand Down Expand Up @@ -155,13 +156,17 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error
value := reflect.Indirect(reflect.ValueOf(dst))

result := map[*datastore.Key]interface{}{}
lock := sync.Mutex{}
workers := make([]cogger.Cog, 0, len(keys))
createMemoryLoaders := cogs.Simple(ctx, func() error {
if len(keys) != value.Len() {
return ErrInvalidEntityType
}

for i := 0; i < len(keys); i++ {
lock.Lock()
result[keys[i]] = nil
lock.Unlock()
func(i int) {

found := false
Expand All @@ -170,8 +175,9 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error
encodedKey := keys[i].Encode()
if stash.Has(ctx, encodedKey) {
data := stash.Get(ctx, encodedKey)

lock.Lock()
result[keys[i]] = data
lock.Unlock()
found = true
}
return nil
Expand All @@ -187,7 +193,9 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error
data := reflect.New(item).Interface()
err := mc(ctx).Get(ctx, keys[i], &data)
if err == nil {
lock.Lock()
result[keys[i]] = data
lock.Unlock()
}

found = err == nil
Expand All @@ -202,6 +210,7 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error

}(i)
}

return nil
})

Expand All @@ -214,11 +223,13 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error

remainingKeys := make([]*datastore.Key, 0, len(keys))
findMissingItems := cogs.Simple(ctx, func() error {
lock.Lock()
for key, item := range result {
if item == nil {
remainingKeys = append(remainingKeys, key)
}
}
lock.Unlock()

return nil
})
Expand All @@ -227,22 +238,35 @@ func GetMulti(ctx context.Context, keys []*datastore.Key, dst interface{}) error
func() bool { return len(remainingKeys) > 0 && IsDatastoreAllowed(ctx) },
cogs.Simple(ctx, func() error {
l := len(remainingKeys)

remainingItems := reflect.MakeSlice(value.Type(), l, l)

err := ds(ctx).GetMulti(ctx, remainingKeys, remainingItems.Interface())
ds(ctx).GetMulti(ctx, remainingKeys, remainingItems.Interface())
lock.Lock()
for i := 0; i < remainingItems.Len(); i++ {
result[remainingKeys[i]] = remainingItems.Index(i).Interface()
itemValue := remainingItems.Index(i)
key := remainingKeys[i]
if key != nil && itemValue.IsValid() {
result[key] = itemValue.Interface()
}
}

return err
lock.Unlock()
return nil
}),
)

setResultsToDst := cogs.Simple(ctx, func() error {
for i, key := range keys {
item := result[key]
if item != nil && value.Index(i).Type().AssignableTo(reflect.TypeOf(item)) {
value.Index(i).Set(reflect.ValueOf(item))
lock.Lock()
item, ok := result[key]
lock.Unlock()
if ok && item != nil {

itemValue := reflect.ValueOf(item)

if itemValue.IsValid() && value.Index(i).Type().AssignableTo(reflect.TypeOf(item)) {
value.Index(i).Set(itemValue)
}
}
}
return nil
Expand Down
30 changes: 17 additions & 13 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ var _ = Describe("Store", func() {
})

Context("when dealing with multiple keys", func() {
var data = []string{"t1", "t2", "t3"}
var data = []map[string]interface{}{
map[string]interface{}{"value": "t1"},
map[string]interface{}{"value": "t2"},
map[string]interface{}{"value": "t3"},
}
Context("when deleting", func() {
It("should delete all the keys", func() {
_, err := PutMulti(ctx, keys, data)
Expand Down Expand Up @@ -274,10 +278,10 @@ var _ = Describe("Store", func() {
Context("when prefetching results", func() {
It("should load data on multiple items", func() {
PutMulti(ctx, keys, data)
ctx = Prefetch(ctx, keys, data)
store.Clear()
cache.Clear()
var result = make([]string, len(keys))
// ctx = Prefetch(ctx, keys, data)
// store.Clear()
// cache.Clear()
var result = make([]map[string]interface{}, len(keys))
err := GetMulti(ctx, keys, result)
Expect(err).ToNot(HaveOccurred())
for i := 0; i < len(keys); i++ {
Expand All @@ -292,7 +296,7 @@ var _ = Describe("Store", func() {
cache.Clear()

for i, key := range keys {
var result string
var result map[string]interface{}
err := Get(ctx, key, &result)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(Equal(data[i]))
Expand All @@ -305,7 +309,7 @@ var _ = Describe("Store", func() {
_, err := PutMulti(ctx, keys, &data)
Expect(err).ToNot(HaveOccurred())

var results = make([]string, len(keys))
var results = make([]map[string]interface{}, len(keys))
err = GetMulti(ctx, keys, &results)
Expect(err).ToNot(HaveOccurred())
Expect(results).To(HaveLen(len(data)))
Expand All @@ -322,7 +326,7 @@ var _ = Describe("Store", func() {
cache.Delete(ctx, key)
}

var results = make([]string, len(keys))
var results = make([]map[string]interface{}, len(keys))
err = GetMulti(ctx, keys, &results)
Expect(err).ToNot(HaveOccurred())
Expect(results).To(HaveLen(len(data)))
Expand All @@ -343,7 +347,7 @@ var _ = Describe("Store", func() {
}
}

var results = make([]string, len(keys))
var results = make([]map[string]interface{}, len(keys))
err = GetMulti(ctx, keys, &results)
Expect(err).ToNot(HaveOccurred())
Expect(results).To(HaveLen(len(data)))
Expand All @@ -352,15 +356,15 @@ var _ = Describe("Store", func() {
}
})

It("should work with arrays to pointers", func() {
pts := []*string{}
XIt("should work with arrays to pointers", func() {
pts := []*map[string]interface{}{}
for _, d := range data {
pts = append(pts, &d)
}
_, err := PutMulti(ctx, keys, pts)
Expect(err).ToNot(HaveOccurred())

var results = make([]*string, len(keys))
var results = make([]*map[string]interface{}, len(keys))
err = GetMulti(ctx, keys, &results)
Expect(err).ToNot(HaveOccurred())
Expect(results).To(HaveLen(len(data)))
Expand All @@ -371,7 +375,7 @@ var _ = Describe("Store", func() {
})

It("should error on things that cannot be gotton", func() {
var result []string
var result []map[string]interface{}
Expect(GetMulti(ctx, keys, &result)).To(HaveOccurred())
})
})
Expand Down
21 changes: 11 additions & 10 deletions test/datastore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package test

import (
"encoding/json"
"errors"
"reflect"

Expand All @@ -11,12 +10,12 @@ import (

func NewStore() *store {
return &store{
items: map[string][]byte{},
items: map[string]interface{}{},
}
}

type store struct {
items map[string][]byte
items map[string]interface{}
}

func (ds *store) Delete(ctx context.Context, key *datastore.Key) error {
Expand All @@ -40,7 +39,9 @@ func (ds *store) DeleteMulti(ctx context.Context, keys []*datastore.Key) error {
func (ds *store) Get(ctx context.Context, key *datastore.Key, dst interface{}) error {
data, ok := ds.items[key.Encode()]
if ok {
return json.Unmarshal(data, &dst)
reflect.Indirect(reflect.ValueOf(dst)).Set(reflect.Indirect(reflect.ValueOf(data)))
// reflect.ValueOf(dst).Set(reflect.Indirect(reflect.ValueOf(data)))
return nil
}

return errors.New("entity does not exist")
Expand All @@ -54,6 +55,7 @@ func (ds *store) GetMulti(ctx context.Context, keys []*datastore.Key, dst interf
if err != nil {
return err
}

value.Index(i).Set(reflect.ValueOf(item))
}
return nil
Expand All @@ -63,11 +65,10 @@ func (ds *store) Put(ctx context.Context, key *datastore.Key, src interface{}) (
if src == nil {
return key, errors.New("item is nil")
}
data, err := json.Marshal(src)
if err == nil {
ds.items[key.Encode()] = data
}
return key, err

ds.items[key.Encode()] = src

return key, nil
}

func (ds *store) PutMulti(ctx context.Context, keys []*datastore.Key, src interface{}) ([]*datastore.Key, error) {
Expand Down Expand Up @@ -97,5 +98,5 @@ func (ds *store) Contains(key *datastore.Key) bool {
}

func (ds *store) Clear() {
ds.items = map[string][]byte{}
ds.items = map[string]interface{}{}
}

0 comments on commit 773b741

Please sign in to comment.