diff --git a/go.mod b/go.mod index f2d1242..e29edf8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 + github.com/hashicorp/golang-lru v0.5.4 github.com/stretchr/testify v1.4.0 google.golang.org/grpc v1.27.0 ) diff --git a/go.sum b/go.sum index efb40ba..3a3e140 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= diff --git a/pkg/client/client.go b/pkg/client/client.go index 28703bf..b3d494f 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -380,8 +380,8 @@ func (d *Database) GetLog(ctx context.Context, name string) (log.Log, error) { } // GetMap gets or creates a Map with the given name -func (d *Database) GetMap(ctx context.Context, name string) (_map.Map, error) { - return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions) +func (d *Database) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) { + return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions, opts...) } // GetSet gets or creates a Set with the given name @@ -442,8 +442,8 @@ func (g *PartitionGroup) GetLog(ctx context.Context, name string) (log.Log, erro } // GetMap gets or creates a Map with the given name -func (g *PartitionGroup) GetMap(ctx context.Context, name string) (_map.Map, error) { - return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions) +func (g *PartitionGroup) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) { + return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions, opts...) } // GetSet gets or creates a Set with the given name diff --git a/pkg/client/map/cache.go b/pkg/client/map/cache.go new file mode 100644 index 0000000..c79c98e --- /dev/null +++ b/pkg/client/map/cache.go @@ -0,0 +1,205 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map //nolint:golint + +import ( + "context" + "github.com/hashicorp/golang-lru" + "sync" +) + +// newCachingMap returns a decorated map that caches updates to the given map +func newCachingMap(_map Map, size int) (Map, error) { + cache, err := lru.New(size) + if err != nil { + return nil, err + } + cachingMap := &cachingMap{ + delegatingMap: newDelegatingMap(_map), + pending: make(map[string]*cachedEntry), + cache: cache, + } + if err := cachingMap.open(); err != nil { + return nil, err + } + return cachingMap, nil +} + +// cachingMap is an implementation of the Map interface that caches entries +type cachingMap struct { + *delegatingMap + cancel context.CancelFunc + pending map[string]*cachedEntry + cache *lru.Cache + cacheVersion int64 + mu sync.RWMutex +} + +// open opens the map listeners +func (m *cachingMap) open() error { + ch := make(chan *Event) + ctx, cancel := context.WithCancel(context.Background()) + m.mu.Lock() + m.cancel = cancel + m.mu.Unlock() + if err := m.delegatingMap.Watch(ctx, ch, WithReplay()); err != nil { + return err + } + go func() { + for event := range ch { + m.cacheUpdate(event.Entry, event.Type == EventRemoved) + } + }() + return nil +} + +// cacheUpdate caches the given updated entry +func (m *cachingMap) cacheUpdate(update *Entry, tombstone bool) { + m.mu.Lock() + defer m.mu.Unlock() + + // If the update version is less than the cache version, the cache contains + // more recent updates. Ignore the update. + if update.Version <= m.cacheVersion { + return + } + + // If the pending entry is newer than the update entry, the update can be ignored. + // Otherwise, remove the entry from the pending cache if present. + if pending, ok := m.pending[update.Key]; ok { + if pending.Version > update.Version { + return + } + delete(m.pending, update.Key) + } + + // If the entry is a tombstone, remove it from the cache, otherwise insert it. + if tombstone { + m.cache.Remove(update.Key) + } else { + m.cache.Add(update.Key, update) + } + + // Update the cache version. + m.cacheVersion = update.Version +} + +// cacheRead caches the given read entry +func (m *cachingMap) cacheRead(read *Entry, tombstone bool) { + m.mu.Lock() + defer m.mu.Unlock() + + // If the entry version is less than the cache version, ignore the update. The entry will + // have been cached as an update. + if read.Version <= m.cacheVersion { + return + } + + // The pending cache contains the most recent known state for the entry. + // If the read entry is newer than the pending entry for the key, update + // the pending cache. + if pending, ok := m.pending[read.Key]; !ok || read.Version > pending.Version { + m.pending[read.Key] = &cachedEntry{ + Entry: read, + tombstone: tombstone, + } + } +} + +// getCache gets a cached entry +func (m *cachingMap) getCache(key string) (*Entry, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + // The pending cache contains the most recent known states. If the entry is present + // in the pending cache, return it rather than using the LRU cache. + if entry, ok := m.pending[key]; ok { + if entry.tombstone { + return nil, true + } + return entry.Entry, true + } + + // If the entry is present in the LRU cache, return it. + if entry, ok := m.cache.Get(key); ok { + return entry.(*Entry), true + } + return nil, false +} + +func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { + // If the entry is already in the cache, return it + if entry, ok := m.getCache(key); ok { + return entry, nil + } + + // Otherwise, fetch the entry from the underlying map + entry, err := m.delegatingMap.Get(ctx, key, opts...) + if err != nil { + return nil, err + } + + // Update the cache if necessary + if err != nil { + return nil, err + } + m.cacheRead(entry, entry.Value == nil) + return entry, nil +} + +func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { + // Put the entry in the map using the underlying map delegate + entry, err := m.delegatingMap.Put(ctx, key, value, opts...) + if err != nil { + return nil, err + } + + // Update the cache if necessary + if err != nil { + return nil, err + } + m.cacheRead(entry, false) + return entry, nil +} + +func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { + // Remove the entry from the map using the underlying map delegate + entry, err := m.delegatingMap.Remove(ctx, key, opts...) + if err != nil { + return nil, err + } + + // Update the cache if necessary + if err != nil { + return nil, err + } + m.cacheRead(entry, true) + return entry, nil +} + +func (m *cachingMap) Close(ctx context.Context) error { + m.mu.Lock() + if m.cancel != nil { + m.cancel() + } + m.mu.Unlock() + return m.delegatingMap.Close(ctx) +} + +// cachedEntry is a cached entry +type cachedEntry struct { + *Entry + tombstone bool +} diff --git a/pkg/client/map/cache_test.go b/pkg/client/map/cache_test.go new file mode 100644 index 0000000..6f90029 --- /dev/null +++ b/pkg/client/map/cache_test.go @@ -0,0 +1,226 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map //nolint:golint + +import ( + "context" + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/test" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCachedMapOperations(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + name := primitive.NewName("default", "test", "default", "test") + _map, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + kv, err := _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.Nil(t, kv) + + size, err := _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + kv, err = _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, "bar", string(kv.Value)) + version := kv.Version + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 1, size) + + kv, err = _map.Remove(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, "bar", string(kv.Value)) + assert.Equal(t, version, kv.Version) + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + kv, err = _map.Put(context.Background(), "bar", []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + kv, err = _map.Put(context.Background(), "foo", []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + err = _map.Clear(context.Background()) + assert.NoError(t, err) + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + + kv1, err := _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + + _, err = _map.Put(context.Background(), "foo", []byte("baz"), IfVersion(1)) + assert.Error(t, err) + + kv2, err := _map.Put(context.Background(), "foo", []byte("baz"), IfVersion(kv1.Version)) + assert.NoError(t, err) + assert.NotEqual(t, kv1.Version, kv2.Version) + assert.Equal(t, "baz", string(kv2.Value)) + + _, err = _map.Remove(context.Background(), "foo", IfVersion(1)) + assert.Error(t, err) + + removed, err := _map.Remove(context.Background(), "foo", IfVersion(kv2.Version)) + assert.NoError(t, err) + assert.NotNil(t, removed) + assert.Equal(t, kv2.Version, removed.Version) +} + +func TestCachedMapStreams(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + name := primitive.NewName("default", "test", "default", "test") + _map, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + kv, err := _map.Put(context.Background(), "foo", []byte{1}) + assert.NoError(t, err) + assert.NotNil(t, kv) + + c := make(chan *Event) + latch := make(chan struct{}) + go func() { + e := <-c + assert.Equal(t, "foo", e.Entry.Key) + assert.Equal(t, byte(2), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "bar", e.Entry.Key) + assert.Equal(t, byte(3), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "baz", e.Entry.Key) + assert.Equal(t, byte(4), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "foo", e.Entry.Key) + assert.Equal(t, byte(5), e.Entry.Value[0]) + latch <- struct{}{} + }() + + err = _map.Watch(context.Background(), c) + assert.NoError(t, err) + + keyCh := make(chan *Event) + err = _map.Watch(context.Background(), keyCh, WithFilter(Filter{ + Key: "foo", + })) + assert.NoError(t, err) + + kv, err = _map.Put(context.Background(), "foo", []byte{2}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, byte(2), kv.Value[0]) + + event := <-keyCh + assert.NotNil(t, event) + assert.Equal(t, "foo", event.Entry.Key) + assert.Equal(t, kv.Version, event.Entry.Version) + + kv, err = _map.Put(context.Background(), "bar", []byte{3}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", kv.Key) + assert.Equal(t, byte(3), kv.Value[0]) + + kv, err = _map.Put(context.Background(), "baz", []byte{4}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", kv.Key) + assert.Equal(t, byte(4), kv.Value[0]) + + kv, err = _map.Put(context.Background(), "foo", []byte{5}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, byte(5), kv.Value[0]) + + event = <-keyCh + assert.NotNil(t, event) + assert.Equal(t, "foo", event.Entry.Key) + assert.Equal(t, kv.Version, event.Entry.Version) + + <-latch + + err = _map.Close(context.Background()) + assert.NoError(t, err) + + map1, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + map2, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + size, err := map1.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 3, size) + + err = map1.Close(context.Background()) + assert.NoError(t, err) + + err = map1.Delete(context.Background()) + assert.NoError(t, err) + + err = map2.Delete(context.Background()) + assert.NoError(t, err) + + _map, err = New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + size, err = _map.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 0, size) +} diff --git a/pkg/client/map/delegate.go b/pkg/client/map/delegate.go new file mode 100644 index 0000000..b1361ff --- /dev/null +++ b/pkg/client/map/delegate.go @@ -0,0 +1,72 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map //nolint:golint + +import ( + "context" + "github.com/atomix/go-client/pkg/client/primitive" +) + +// newDelegatingMap returns a Map that delegates all method calls to the given Map +func newDelegatingMap(_map Map) *delegatingMap { + return &delegatingMap{ + delegate: _map, + } +} + +// delegatingMap is a Map that delegates method calls to an underlying Map +type delegatingMap struct { + delegate Map +} + +func (m *delegatingMap) Name() primitive.Name { + return m.delegate.Name() +} + +func (m *delegatingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { + return m.delegate.Put(ctx, key, value, opts...) +} + +func (m *delegatingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { + return m.delegate.Get(ctx, key, opts...) +} + +func (m *delegatingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { + return m.delegate.Remove(ctx, key, opts...) +} + +func (m *delegatingMap) Len(ctx context.Context) (int, error) { + return m.delegate.Len(ctx) +} + +func (m *delegatingMap) Clear(ctx context.Context) error { + return m.delegate.Clear(ctx) +} + +func (m *delegatingMap) Entries(ctx context.Context, ch chan<- *Entry) error { + return m.delegate.Entries(ctx, ch) +} + +func (m *delegatingMap) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error { + return m.delegate.Watch(ctx, ch, opts...) +} + +func (m *delegatingMap) Close(ctx context.Context) error { + return m.delegate.Close(ctx) +} + +func (m *delegatingMap) Delete(ctx context.Context) error { + return m.delegate.Delete(ctx) +} diff --git a/pkg/client/map/map.go b/pkg/client/map/map.go index ba88702..b762f0f 100644 --- a/pkg/client/map/map.go +++ b/pkg/client/map/map.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/atomix/go-client/pkg/client/primitive" "github.com/atomix/go-client/pkg/client/util" + "math" "sync" "time" ) @@ -29,7 +30,7 @@ const Type primitive.Type = "Map" // Client provides an API for creating Maps type Client interface { // GetMap gets the Map instance of the given name - GetMap(ctx context.Context, name string) (Map, error) + GetMap(ctx context.Context, name string, opts ...Option) (Map, error) } // Map is a distributed set of keys and values @@ -112,8 +113,16 @@ type Event struct { } // New creates a new partitioned Map -func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session) (Map, error) { +func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session, opts ...Option) (Map, error) { + options := &options{} + for _, opt := range opts { + opt.apply(options) + } + results, err := util.ExecuteOrderedAsync(len(sessions), func(i int) (interface{}, error) { + if options.cached { + return newPartition(ctx, name, sessions[i], WithCache(int(math.Max(float64(options.cacheSize/len(sessions)), 1)))) + } return newPartition(ctx, name, sessions[i]) }) if err != nil { @@ -162,7 +171,13 @@ func (m *_map) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, if err != nil { return nil, err } - return session.Get(ctx, key, opts...) + entry, err := session.Get(ctx, key, opts...) + if err != nil { + return nil, err + } else if entry.Value == nil { + return nil, nil + } + return entry, nil } func (m *_map) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { diff --git a/pkg/client/map/options.go b/pkg/client/map/options.go index 4ea4ae0..03c7ec4 100644 --- a/pkg/client/map/options.go +++ b/pkg/client/map/options.go @@ -18,6 +18,39 @@ import ( api "github.com/atomix/api/proto/atomix/map" ) +// Option is an option for a Map instance +type Option interface { + apply(options *options) +} + +// options is a set of map options +type options struct { + cached bool + cacheSize int +} + +// WithCache returns an option that enables caching for a Map +func WithCache(size int) Option { + if size <= 0 { + panic("cache size must be positive") + } + return &cacheOption{ + enabled: true, + size: size, + } +} + +// cacheOption is a cache enable option +type cacheOption struct { + enabled bool + size int +} + +func (o *cacheOption) apply(options *options) { + options.cached = o.enabled + options.cacheSize = o.size +} + // PutOption is an option for the Put method type PutOption interface { beforePut(request *api.PutRequest) diff --git a/pkg/client/map/partition.go b/pkg/client/map/partition.go index fbb57df..f3f3cf5 100644 --- a/pkg/client/map/partition.go +++ b/pkg/client/map/partition.go @@ -23,15 +23,28 @@ import ( "google.golang.org/grpc" ) -func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session) (Map, error) { +func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session, opts ...Option) (Map, error) { + options := &options{} + for _, opt := range opts { + opt.apply(options) + } + instance, err := primitive.NewInstance(ctx, name, session, &primitiveHandler{}) if err != nil { return nil, err } - return &mapPartition{ + var partition Map = &mapPartition{ name: name, instance: instance, - }, nil + } + if options.cached { + cached, err := newCachingMap(partition, options.cacheSize) + if err != nil { + return nil, err + } + partition = cached + } + return partition, nil } type mapPartition struct { @@ -122,7 +135,12 @@ func (m *mapPartition) Get(ctx context.Context, key string, opts ...GetOption) ( Updated: response.Updated, }, nil } - return nil, nil + + // Return a non-empty nil-value Entry when response version is 0 + return &Entry{ + Key: key, + Version: int64(response.Header.Index), + }, nil } func (m *mapPartition) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { @@ -259,23 +277,28 @@ func (m *mapPartition) Watch(ctx context.Context, ch chan<- *Event, opts ...Watc defer close(ch) for event := range stream { response := event.(*api.EventResponse) + var version int64 var t EventType switch response.Type { case api.EventResponse_NONE: t = EventNone + version = response.Version case api.EventResponse_INSERTED: t = EventInserted + version = response.Version case api.EventResponse_UPDATED: t = EventUpdated + version = response.Version case api.EventResponse_REMOVED: t = EventRemoved + version = int64(response.Header.Index) } ch <- &Event{ Type: t, Entry: &Entry{ Key: response.Key, Value: response.Value, - Version: response.Version, + Version: version, Created: response.Created, Updated: response.Updated, },