From 1fa09b22401c8ad774566d0fc614decca602eae1 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 10 Feb 2020 18:33:34 -0800 Subject: [PATCH 1/6] Add support for LRU cached maps --- go.mod | 1 + go.sum | 2 + pkg/client/map/cache.go | 240 +++++++++++++++++++++++++++++++++++ pkg/client/map/cache_test.go | 226 +++++++++++++++++++++++++++++++++ pkg/client/map/delegate.go | 72 +++++++++++ pkg/client/map/map.go | 6 +- pkg/client/map/options.go | 33 +++++ pkg/client/map/partition.go | 19 ++- 8 files changed, 593 insertions(+), 6 deletions(-) create mode 100644 pkg/client/map/cache.go create mode 100644 pkg/client/map/cache_test.go create mode 100644 pkg/client/map/delegate.go diff --git a/go.mod b/go.mod index f2d1242..e29edf8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 + github.com/hashicorp/golang-lru v0.5.4 github.com/stretchr/testify v1.4.0 google.golang.org/grpc v1.27.0 ) diff --git a/go.sum b/go.sum index efb40ba..3a3e140 100644 --- a/go.sum +++ b/go.sum @@ -174,6 +174,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xC github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= diff --git a/pkg/client/map/cache.go b/pkg/client/map/cache.go new file mode 100644 index 0000000..9ce97bd --- /dev/null +++ b/pkg/client/map/cache.go @@ -0,0 +1,240 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map + +import ( + "context" + "errors" + "github.com/hashicorp/golang-lru" + "sync" +) + +// newCachingMap returns a decorated map that caches updates to the given map +func newCachingMap(_map Map, size int) (Map, error) { + cache, err := lru.New(size) + if err != nil { + return nil, err + } + cachingMap := &cachingMap{ + delegatingMap: newDelegatingMap(_map), + cache: cache, + state: &cacheState{ + waiters: make(map[int64]*sync.Cond), + mu: &sync.RWMutex{}, + }, + } + if err := cachingMap.open(); err != nil { + return nil, err + } + return cachingMap, nil +} + +// cachingMap is an implementation of the Map interface that caches entries +type cachingMap struct { + *delegatingMap + cancel context.CancelFunc + cache *lru.Cache + state *cacheState +} + +// open opens the map listeners +func (m *cachingMap) open() error { + ch := make(chan *Event) + ctx, cancel := context.WithCancel(context.Background()) + m.state.Lock() + m.cancel = cancel + m.state.Unlock() + if err := m.delegatingMap.Watch(ctx, ch, WithReplay()); err != nil { + return err + } + go func() { + for event := range ch { + switch event.Type { + case EventNone: + m.cache.Add(event.Entry.Key, event.Entry) + case EventInserted: + m.cache.Add(event.Entry.Key, event.Entry) + case EventUpdated: + m.cache.Add(event.Entry.Key, event.Entry) + case EventRemoved: + m.cache.Remove(event.Entry.Key) + } + + // Wake up goroutines waiting for this update + m.state.setMaxUpdated(event.Entry.Version) + } + }() + return nil +} + +func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { + // Put the entry in the map using the underlying map delegate + entry, err := m.delegatingMap.Put(ctx, key, value, opts...) + if err != nil { + return nil, err + } + + // If the update is successful, record the max seen version + m.state.setMaxSeen(entry.Version) + return entry, nil +} + +func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { + // Remove the entry from the map using the underlying map delegate + entry, err := m.delegatingMap.Remove(ctx, key, opts...) + if err != nil { + return nil, err + } + + // If the update is successful, update the max seen version for read-your-writes consistency + if entry != nil { + m.state.setMaxSeen(entry.Version) + } + return entry, nil +} + +func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { + // Get the current cache state + m.state.RLock() + closed := m.state.closed + current := m.state.isCurrent() + m.state.RUnlock() + + // If the cache is closed, return an error + if closed { + return nil, errors.New("cache closed") + } + + // If the client write a value at a later point than the current cache point, wait for updates + // to be propagated to the cache + if !current { + // Acquire a write lock again (double checked lock) + m.state.Lock() + + // Check whether the cache is closed again + if m.state.closed { + return nil, errors.New("cache closed") + } + + // Check the current cache point again before creating a condition + if !m.state.isCurrent() { + m.state.awaitUpdate() + } + + // Check that the cache is not closed once more - it could have been closed during a awaitUpdate() call + if m.state.closed { + return nil, errors.New("cache closed") + } + + // Release the write lock + m.state.Unlock() + } + + // If the entry is present in the cache, return it + if entry, ok := m.cache.Get(key); ok { + return entry.(*Entry), nil + } + + // Otherwise, fetch the entry from the underlying map and cache it + entry, err := m.delegatingMap.Get(ctx, key, opts...) + if err != nil { + return nil, err + } + m.cache.Add(key, entry) + return entry, nil +} + +func (m *cachingMap) Close(ctx context.Context) error { + m.state.Lock() + if m.cancel != nil { + m.cancel() + } + m.state.closed = true + m.state.Unlock() + return m.delegatingMap.Close(ctx) +} + +// cacheState contains the state of the cache +type cacheState struct { + maxSeen int64 + maxUpdated int64 + maxComplete int64 + waiters map[int64]*sync.Cond + mu *sync.RWMutex + closed bool +} + +// Lock locks the cache state +func (s *cacheState) Lock() { + s.mu.Lock() +} + +// Unlock unlocks the cache state +func (s *cacheState) Unlock() { + s.mu.Unlock() +} + +// RLock read locks the cache state +func (s *cacheState) RLock() { + s.mu.RLock() +} + +// rUnlock read unlocks the cache state +func (s *cacheState) RUnlock() { + s.mu.RUnlock() +} + +// setMaxSeen sets the max seen version +func (s *cacheState) setMaxSeen(seenVersion int64) { + s.mu.Lock() + if seenVersion > s.maxSeen { + s.maxSeen = seenVersion + } + s.mu.Unlock() +} + +// setMaxUpdated sets the max updated version and awakens waiters waiting for the update +func (s *cacheState) setMaxUpdated(updateVersion int64) { + s.mu.Lock() + if updateVersion > s.maxUpdated { + s.maxUpdated = updateVersion + for version := s.maxComplete; version <= updateVersion; version++ { + waiter, ok := s.waiters[version] + if ok { + waiter.Broadcast() + } + s.maxComplete = version + } + } + s.mu.Unlock() +} + +// awaitUpdate waits for the cache state to be propagated +func (s *cacheState) awaitUpdate() { + maxSeen := s.maxSeen + waiter, ok := s.waiters[maxSeen] + if !ok { + waiter = sync.NewCond(s.mu) + s.waiters[maxSeen] = waiter + } + for s.closed || maxSeen <= s.maxUpdated { + waiter.Wait() + } +} + +// isCurrent returns a boolean indicating whether the cache is current +func (s *cacheState) isCurrent() bool { + return s.maxSeen <= s.maxUpdated +} diff --git a/pkg/client/map/cache_test.go b/pkg/client/map/cache_test.go new file mode 100644 index 0000000..6f90029 --- /dev/null +++ b/pkg/client/map/cache_test.go @@ -0,0 +1,226 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map //nolint:golint + +import ( + "context" + "github.com/atomix/go-client/pkg/client/primitive" + "github.com/atomix/go-client/pkg/client/test" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCachedMapOperations(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + name := primitive.NewName("default", "test", "default", "test") + _map, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + kv, err := _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.Nil(t, kv) + + size, err := _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + kv, err = _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, "bar", string(kv.Value)) + version := kv.Version + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 1, size) + + kv, err = _map.Remove(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, "bar", string(kv.Value)) + assert.Equal(t, version, kv.Version) + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", string(kv.Value)) + + kv, err = _map.Put(context.Background(), "bar", []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + kv, err = _map.Put(context.Background(), "foo", []byte("baz")) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", string(kv.Value)) + + err = _map.Clear(context.Background()) + assert.NoError(t, err) + + size, err = _map.Len(context.Background()) + assert.NoError(t, err) + assert.Equal(t, 0, size) + + kv, err = _map.Put(context.Background(), "foo", []byte("bar")) + assert.NoError(t, err) + assert.NotNil(t, kv) + + kv1, err := _map.Get(context.Background(), "foo") + assert.NoError(t, err) + assert.NotNil(t, kv) + + _, err = _map.Put(context.Background(), "foo", []byte("baz"), IfVersion(1)) + assert.Error(t, err) + + kv2, err := _map.Put(context.Background(), "foo", []byte("baz"), IfVersion(kv1.Version)) + assert.NoError(t, err) + assert.NotEqual(t, kv1.Version, kv2.Version) + assert.Equal(t, "baz", string(kv2.Value)) + + _, err = _map.Remove(context.Background(), "foo", IfVersion(1)) + assert.Error(t, err) + + removed, err := _map.Remove(context.Background(), "foo", IfVersion(kv2.Version)) + assert.NoError(t, err) + assert.NotNil(t, removed) + assert.Equal(t, kv2.Version, removed.Version) +} + +func TestCachedMapStreams(t *testing.T) { + partitions, closers := test.StartTestPartitions(3) + defer test.StopTestPartitions(closers) + + sessions, err := test.OpenSessions(partitions) + assert.NoError(t, err) + defer test.CloseSessions(sessions) + + name := primitive.NewName("default", "test", "default", "test") + _map, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + kv, err := _map.Put(context.Background(), "foo", []byte{1}) + assert.NoError(t, err) + assert.NotNil(t, kv) + + c := make(chan *Event) + latch := make(chan struct{}) + go func() { + e := <-c + assert.Equal(t, "foo", e.Entry.Key) + assert.Equal(t, byte(2), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "bar", e.Entry.Key) + assert.Equal(t, byte(3), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "baz", e.Entry.Key) + assert.Equal(t, byte(4), e.Entry.Value[0]) + e = <-c + assert.Equal(t, "foo", e.Entry.Key) + assert.Equal(t, byte(5), e.Entry.Value[0]) + latch <- struct{}{} + }() + + err = _map.Watch(context.Background(), c) + assert.NoError(t, err) + + keyCh := make(chan *Event) + err = _map.Watch(context.Background(), keyCh, WithFilter(Filter{ + Key: "foo", + })) + assert.NoError(t, err) + + kv, err = _map.Put(context.Background(), "foo", []byte{2}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, byte(2), kv.Value[0]) + + event := <-keyCh + assert.NotNil(t, event) + assert.Equal(t, "foo", event.Entry.Key) + assert.Equal(t, kv.Version, event.Entry.Version) + + kv, err = _map.Put(context.Background(), "bar", []byte{3}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "bar", kv.Key) + assert.Equal(t, byte(3), kv.Value[0]) + + kv, err = _map.Put(context.Background(), "baz", []byte{4}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "baz", kv.Key) + assert.Equal(t, byte(4), kv.Value[0]) + + kv, err = _map.Put(context.Background(), "foo", []byte{5}) + assert.NoError(t, err) + assert.NotNil(t, kv) + assert.Equal(t, "foo", kv.Key) + assert.Equal(t, byte(5), kv.Value[0]) + + event = <-keyCh + assert.NotNil(t, event) + assert.Equal(t, "foo", event.Entry.Key) + assert.Equal(t, kv.Version, event.Entry.Version) + + <-latch + + err = _map.Close(context.Background()) + assert.NoError(t, err) + + map1, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + map2, err := New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + size, err := map1.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 3, size) + + err = map1.Close(context.Background()) + assert.NoError(t, err) + + err = map1.Delete(context.Background()) + assert.NoError(t, err) + + err = map2.Delete(context.Background()) + assert.NoError(t, err) + + _map, err = New(context.TODO(), name, sessions, WithCache(1)) + assert.NoError(t, err) + + size, err = _map.Len(context.TODO()) + assert.NoError(t, err) + assert.Equal(t, 0, size) +} diff --git a/pkg/client/map/delegate.go b/pkg/client/map/delegate.go new file mode 100644 index 0000000..dc77890 --- /dev/null +++ b/pkg/client/map/delegate.go @@ -0,0 +1,72 @@ +// Copyright 2019-present Open Networking Foundation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package _map + +import ( + "context" + "github.com/atomix/go-client/pkg/client/primitive" +) + +// newDelegatingMap returns a Map that delegates all method calls to the given Map +func newDelegatingMap(_map Map) *delegatingMap { + return &delegatingMap{ + delegate: _map, + } +} + +// delegatingMap is a Map that delegates method calls to an underlying Map +type delegatingMap struct { + delegate Map +} + +func (m *delegatingMap) Name() primitive.Name { + return m.delegate.Name() +} + +func (m *delegatingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { + return m.delegate.Put(ctx, key, value, opts...) +} + +func (m *delegatingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { + return m.delegate.Get(ctx, key, opts...) +} + +func (m *delegatingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { + return m.delegate.Remove(ctx, key, opts...) +} + +func (m *delegatingMap) Len(ctx context.Context) (int, error) { + return m.delegate.Len(ctx) +} + +func (m *delegatingMap) Clear(ctx context.Context) error { + return m.delegate.Clear(ctx) +} + +func (m *delegatingMap) Entries(ctx context.Context, ch chan<- *Entry) error { + return m.delegate.Entries(ctx, ch) +} + +func (m *delegatingMap) Watch(ctx context.Context, ch chan<- *Event, opts ...WatchOption) error { + return m.delegate.Watch(ctx, ch, opts...) +} + +func (m *delegatingMap) Close(ctx context.Context) error { + return m.delegate.Close(ctx) +} + +func (m *delegatingMap) Delete(ctx context.Context) error { + return m.delegate.Delete(ctx) +} diff --git a/pkg/client/map/map.go b/pkg/client/map/map.go index ba88702..9838beb 100644 --- a/pkg/client/map/map.go +++ b/pkg/client/map/map.go @@ -29,7 +29,7 @@ const Type primitive.Type = "Map" // Client provides an API for creating Maps type Client interface { // GetMap gets the Map instance of the given name - GetMap(ctx context.Context, name string) (Map, error) + GetMap(ctx context.Context, name string, opts ...Option) (Map, error) } // Map is a distributed set of keys and values @@ -112,9 +112,9 @@ type Event struct { } // New creates a new partitioned Map -func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session) (Map, error) { +func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session, opts ...Option) (Map, error) { results, err := util.ExecuteOrderedAsync(len(sessions), func(i int) (interface{}, error) { - return newPartition(ctx, name, sessions[i]) + return newPartition(ctx, name, sessions[i], opts...) }) if err != nil { return nil, err diff --git a/pkg/client/map/options.go b/pkg/client/map/options.go index 4ea4ae0..03c7ec4 100644 --- a/pkg/client/map/options.go +++ b/pkg/client/map/options.go @@ -18,6 +18,39 @@ import ( api "github.com/atomix/api/proto/atomix/map" ) +// Option is an option for a Map instance +type Option interface { + apply(options *options) +} + +// options is a set of map options +type options struct { + cached bool + cacheSize int +} + +// WithCache returns an option that enables caching for a Map +func WithCache(size int) Option { + if size <= 0 { + panic("cache size must be positive") + } + return &cacheOption{ + enabled: true, + size: size, + } +} + +// cacheOption is a cache enable option +type cacheOption struct { + enabled bool + size int +} + +func (o *cacheOption) apply(options *options) { + options.cached = o.enabled + options.cacheSize = o.size +} + // PutOption is an option for the Put method type PutOption interface { beforePut(request *api.PutRequest) diff --git a/pkg/client/map/partition.go b/pkg/client/map/partition.go index fbb57df..90e9ca0 100644 --- a/pkg/client/map/partition.go +++ b/pkg/client/map/partition.go @@ -23,15 +23,28 @@ import ( "google.golang.org/grpc" ) -func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session) (Map, error) { +func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session, opts ...Option) (Map, error) { + options := &options{} + for _, opt := range opts { + opt.apply(options) + } + instance, err := primitive.NewInstance(ctx, name, session, &primitiveHandler{}) if err != nil { return nil, err } - return &mapPartition{ + var partition Map = &mapPartition{ name: name, instance: instance, - }, nil + } + if options.cached { + cached, err := newCachingMap(partition, options.cacheSize) + if err != nil { + return nil, err + } + partition = cached + } + return partition, nil } type mapPartition struct { From 0363aa00c50e4b308308380285c7ed532ab2fdf2 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 10 Feb 2020 18:35:27 -0800 Subject: [PATCH 2/6] Add LRU cached map support --- pkg/client/map/cache.go | 2 +- pkg/client/map/delegate.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/client/map/cache.go b/pkg/client/map/cache.go index 9ce97bd..e8e277f 100644 --- a/pkg/client/map/cache.go +++ b/pkg/client/map/cache.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package _map +package _map //nolint:golint import ( "context" diff --git a/pkg/client/map/delegate.go b/pkg/client/map/delegate.go index dc77890..b1361ff 100644 --- a/pkg/client/map/delegate.go +++ b/pkg/client/map/delegate.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package _map +package _map //nolint:golint import ( "context" From 9bc3f5bf28a87242de9234f7e9da92841e8ba7ca Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 10 Feb 2020 18:37:08 -0800 Subject: [PATCH 3/6] Add map options to client API --- pkg/client/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 28703bf..b3d494f 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -380,8 +380,8 @@ func (d *Database) GetLog(ctx context.Context, name string) (log.Log, error) { } // GetMap gets or creates a Map with the given name -func (d *Database) GetMap(ctx context.Context, name string) (_map.Map, error) { - return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions) +func (d *Database) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) { + return _map.New(ctx, primitive.NewName(d.Namespace, d.Name, d.scope, name), d.sessions, opts...) } // GetSet gets or creates a Set with the given name @@ -442,8 +442,8 @@ func (g *PartitionGroup) GetLog(ctx context.Context, name string) (log.Log, erro } // GetMap gets or creates a Map with the given name -func (g *PartitionGroup) GetMap(ctx context.Context, name string) (_map.Map, error) { - return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions) +func (g *PartitionGroup) GetMap(ctx context.Context, name string, opts ..._map.Option) (_map.Map, error) { + return _map.New(ctx, primitive.NewName(g.Namespace, g.Name, g.scope, name), g.sessions, opts...) } // GetSet gets or creates a Set with the given name From 7ef511fdda10cc15dc90e7bd00ba0ad9a28d4496 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 10 Feb 2020 18:47:26 -0800 Subject: [PATCH 4/6] Simplify cached map using session guarantees --- pkg/client/map/cache.go | 162 +++++++--------------------------------- 1 file changed, 29 insertions(+), 133 deletions(-) diff --git a/pkg/client/map/cache.go b/pkg/client/map/cache.go index e8e277f..ec95a65 100644 --- a/pkg/client/map/cache.go +++ b/pkg/client/map/cache.go @@ -16,7 +16,6 @@ package _map //nolint:golint import ( "context" - "errors" "github.com/hashicorp/golang-lru" "sync" ) @@ -30,10 +29,6 @@ func newCachingMap(_map Map, size int) (Map, error) { cachingMap := &cachingMap{ delegatingMap: newDelegatingMap(_map), cache: cache, - state: &cacheState{ - waiters: make(map[int64]*sync.Cond), - mu: &sync.RWMutex{}, - }, } if err := cachingMap.open(); err != nil { return nil, err @@ -46,16 +41,16 @@ type cachingMap struct { *delegatingMap cancel context.CancelFunc cache *lru.Cache - state *cacheState + mu sync.RWMutex } // open opens the map listeners func (m *cachingMap) open() error { ch := make(chan *Event) ctx, cancel := context.WithCancel(context.Background()) - m.state.Lock() + m.mu.Lock() m.cancel = cancel - m.state.Unlock() + m.mu.Unlock() if err := m.delegatingMap.Watch(ctx, ch, WithReplay()); err != nil { return err } @@ -71,9 +66,6 @@ func (m *cachingMap) open() error { case EventRemoved: m.cache.Remove(event.Entry.Key) } - - // Wake up goroutines waiting for this update - m.state.setMaxUpdated(event.Entry.Version) } }() return nil @@ -86,8 +78,14 @@ func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ... return nil, err } - // If the update is successful, record the max seen version - m.state.setMaxSeen(entry.Version) + // If the entry in the cache is still older than the updated entry, update the entry + // This check is performed because a concurrent event could update the cached entry + m.mu.Lock() + prevEntry, ok := m.cache.Get(key) + if !ok || prevEntry.(*Entry).Version < entry.Version { + m.cache.Add(key, entry) + } + m.mu.Unlock() return entry, nil } @@ -98,143 +96,41 @@ func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOptio return nil, err } - // If the update is successful, update the max seen version for read-your-writes consistency - if entry != nil { - m.state.setMaxSeen(entry.Version) + // If the entry in the cache is still older than the removed entry, remove the entry + // This check is performed because a concurrent event could update the cached entry + m.mu.Lock() + prevEntry, ok := m.cache.Get(key) + if ok && prevEntry.(*Entry).Version <= entry.Version { + m.cache.Remove(key) } + m.mu.Unlock() return entry, nil } func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { - // Get the current cache state - m.state.RLock() - closed := m.state.closed - current := m.state.isCurrent() - m.state.RUnlock() - - // If the cache is closed, return an error - if closed { - return nil, errors.New("cache closed") + // If the entry is already in the cache, return it + m.mu.RLock() + cachedEntry, ok := m.cache.Get(key) + m.mu.RUnlock() + if ok { + return cachedEntry.(*Entry), nil } - // If the client write a value at a later point than the current cache point, wait for updates - // to be propagated to the cache - if !current { - // Acquire a write lock again (double checked lock) - m.state.Lock() - - // Check whether the cache is closed again - if m.state.closed { - return nil, errors.New("cache closed") - } - - // Check the current cache point again before creating a condition - if !m.state.isCurrent() { - m.state.awaitUpdate() - } - - // Check that the cache is not closed once more - it could have been closed during a awaitUpdate() call - if m.state.closed { - return nil, errors.New("cache closed") - } - - // Release the write lock - m.state.Unlock() - } - - // If the entry is present in the cache, return it - if entry, ok := m.cache.Get(key); ok { - return entry.(*Entry), nil - } - - // Otherwise, fetch the entry from the underlying map and cache it + // Otherwise, fetch the entry from the underlying map + // Note that we do not cache the entry here since it could have been removed, in which + // case we'd have to maintain tombstones in the cache to compare versions. entry, err := m.delegatingMap.Get(ctx, key, opts...) if err != nil { return nil, err } - m.cache.Add(key, entry) return entry, nil } func (m *cachingMap) Close(ctx context.Context) error { - m.state.Lock() + m.mu.Lock() if m.cancel != nil { m.cancel() } - m.state.closed = true - m.state.Unlock() + m.mu.Unlock() return m.delegatingMap.Close(ctx) } - -// cacheState contains the state of the cache -type cacheState struct { - maxSeen int64 - maxUpdated int64 - maxComplete int64 - waiters map[int64]*sync.Cond - mu *sync.RWMutex - closed bool -} - -// Lock locks the cache state -func (s *cacheState) Lock() { - s.mu.Lock() -} - -// Unlock unlocks the cache state -func (s *cacheState) Unlock() { - s.mu.Unlock() -} - -// RLock read locks the cache state -func (s *cacheState) RLock() { - s.mu.RLock() -} - -// rUnlock read unlocks the cache state -func (s *cacheState) RUnlock() { - s.mu.RUnlock() -} - -// setMaxSeen sets the max seen version -func (s *cacheState) setMaxSeen(seenVersion int64) { - s.mu.Lock() - if seenVersion > s.maxSeen { - s.maxSeen = seenVersion - } - s.mu.Unlock() -} - -// setMaxUpdated sets the max updated version and awakens waiters waiting for the update -func (s *cacheState) setMaxUpdated(updateVersion int64) { - s.mu.Lock() - if updateVersion > s.maxUpdated { - s.maxUpdated = updateVersion - for version := s.maxComplete; version <= updateVersion; version++ { - waiter, ok := s.waiters[version] - if ok { - waiter.Broadcast() - } - s.maxComplete = version - } - } - s.mu.Unlock() -} - -// awaitUpdate waits for the cache state to be propagated -func (s *cacheState) awaitUpdate() { - maxSeen := s.maxSeen - waiter, ok := s.waiters[maxSeen] - if !ok { - waiter = sync.NewCond(s.mu) - s.waiters[maxSeen] = waiter - } - for s.closed || maxSeen <= s.maxUpdated { - waiter.Wait() - } -} - -// isCurrent returns a boolean indicating whether the cache is current -func (s *cacheState) isCurrent() bool { - return s.maxSeen <= s.maxUpdated -} From ae8f370108358da56842ae43c3c2b18e947ed5ba Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Mon, 10 Feb 2020 18:50:12 -0800 Subject: [PATCH 5/6] Cache entire map to ensure consistent cache size --- pkg/client/map/map.go | 19 ++++++++++++++++--- pkg/client/map/partition.go | 19 +++---------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/client/map/map.go b/pkg/client/map/map.go index 9838beb..89a7971 100644 --- a/pkg/client/map/map.go +++ b/pkg/client/map/map.go @@ -113,8 +113,13 @@ type Event struct { // New creates a new partitioned Map func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session, opts ...Option) (Map, error) { + options := &options{} + for _, opt := range opts { + opt.apply(options) + } + results, err := util.ExecuteOrderedAsync(len(sessions), func(i int) (interface{}, error) { - return newPartition(ctx, name, sessions[i], opts...) + return newPartition(ctx, name, sessions[i]) }) if err != nil { return nil, err @@ -125,10 +130,18 @@ func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session maps[i] = result.(Map) } - return &_map{ + var m Map = &_map{ name: name, partitions: maps, - }, nil + } + if options.cached { + cached, err := newCachingMap(m, options.cacheSize) + if err != nil { + return nil, err + } + m = cached + } + return m, nil } // _map is the default single-partition implementation of Map diff --git a/pkg/client/map/partition.go b/pkg/client/map/partition.go index 90e9ca0..fbb57df 100644 --- a/pkg/client/map/partition.go +++ b/pkg/client/map/partition.go @@ -23,28 +23,15 @@ import ( "google.golang.org/grpc" ) -func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session, opts ...Option) (Map, error) { - options := &options{} - for _, opt := range opts { - opt.apply(options) - } - +func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session) (Map, error) { instance, err := primitive.NewInstance(ctx, name, session, &primitiveHandler{}) if err != nil { return nil, err } - var partition Map = &mapPartition{ + return &mapPartition{ name: name, instance: instance, - } - if options.cached { - cached, err := newCachingMap(partition, options.cacheSize) - if err != nil { - return nil, err - } - partition = cached - } - return partition, nil + }, nil } type mapPartition struct { From 04df48ffaf9fca09fa2fe9bd00c0dae55071ae3c Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Thu, 13 Feb 2020 12:55:31 -0800 Subject: [PATCH 6/6] Update cached map implementation to match spec --- pkg/client/map/cache.go | 157 ++++++++++++++++++++++++++---------- pkg/client/map/map.go | 24 +++--- pkg/client/map/partition.go | 33 ++++++-- 3 files changed, 154 insertions(+), 60 deletions(-) diff --git a/pkg/client/map/cache.go b/pkg/client/map/cache.go index ec95a65..c79c98e 100644 --- a/pkg/client/map/cache.go +++ b/pkg/client/map/cache.go @@ -28,6 +28,7 @@ func newCachingMap(_map Map, size int) (Map, error) { } cachingMap := &cachingMap{ delegatingMap: newDelegatingMap(_map), + pending: make(map[string]*cachedEntry), cache: cache, } if err := cachingMap.open(); err != nil { @@ -39,9 +40,11 @@ func newCachingMap(_map Map, size int) (Map, error) { // cachingMap is an implementation of the Map interface that caches entries type cachingMap struct { *delegatingMap - cancel context.CancelFunc - cache *lru.Cache - mu sync.RWMutex + cancel context.CancelFunc + pending map[string]*cachedEntry + cache *lru.Cache + cacheVersion int64 + mu sync.RWMutex } // open opens the map listeners @@ -56,73 +59,133 @@ func (m *cachingMap) open() error { } go func() { for event := range ch { - switch event.Type { - case EventNone: - m.cache.Add(event.Entry.Key, event.Entry) - case EventInserted: - m.cache.Add(event.Entry.Key, event.Entry) - case EventUpdated: - m.cache.Add(event.Entry.Key, event.Entry) - case EventRemoved: - m.cache.Remove(event.Entry.Key) - } + m.cacheUpdate(event.Entry, event.Type == EventRemoved) } }() return nil } -func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { - // Put the entry in the map using the underlying map delegate - entry, err := m.delegatingMap.Put(ctx, key, value, opts...) +// cacheUpdate caches the given updated entry +func (m *cachingMap) cacheUpdate(update *Entry, tombstone bool) { + m.mu.Lock() + defer m.mu.Unlock() + + // If the update version is less than the cache version, the cache contains + // more recent updates. Ignore the update. + if update.Version <= m.cacheVersion { + return + } + + // If the pending entry is newer than the update entry, the update can be ignored. + // Otherwise, remove the entry from the pending cache if present. + if pending, ok := m.pending[update.Key]; ok { + if pending.Version > update.Version { + return + } + delete(m.pending, update.Key) + } + + // If the entry is a tombstone, remove it from the cache, otherwise insert it. + if tombstone { + m.cache.Remove(update.Key) + } else { + m.cache.Add(update.Key, update) + } + + // Update the cache version. + m.cacheVersion = update.Version +} + +// cacheRead caches the given read entry +func (m *cachingMap) cacheRead(read *Entry, tombstone bool) { + m.mu.Lock() + defer m.mu.Unlock() + + // If the entry version is less than the cache version, ignore the update. The entry will + // have been cached as an update. + if read.Version <= m.cacheVersion { + return + } + + // The pending cache contains the most recent known state for the entry. + // If the read entry is newer than the pending entry for the key, update + // the pending cache. + if pending, ok := m.pending[read.Key]; !ok || read.Version > pending.Version { + m.pending[read.Key] = &cachedEntry{ + Entry: read, + tombstone: tombstone, + } + } +} + +// getCache gets a cached entry +func (m *cachingMap) getCache(key string) (*Entry, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + // The pending cache contains the most recent known states. If the entry is present + // in the pending cache, return it rather than using the LRU cache. + if entry, ok := m.pending[key]; ok { + if entry.tombstone { + return nil, true + } + return entry.Entry, true + } + + // If the entry is present in the LRU cache, return it. + if entry, ok := m.cache.Get(key); ok { + return entry.(*Entry), true + } + return nil, false +} + +func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { + // If the entry is already in the cache, return it + if entry, ok := m.getCache(key); ok { + return entry, nil + } + + // Otherwise, fetch the entry from the underlying map + entry, err := m.delegatingMap.Get(ctx, key, opts...) if err != nil { return nil, err } - // If the entry in the cache is still older than the updated entry, update the entry - // This check is performed because a concurrent event could update the cached entry - m.mu.Lock() - prevEntry, ok := m.cache.Get(key) - if !ok || prevEntry.(*Entry).Version < entry.Version { - m.cache.Add(key, entry) + // Update the cache if necessary + if err != nil { + return nil, err } - m.mu.Unlock() + m.cacheRead(entry, entry.Value == nil) return entry, nil } -func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { - // Remove the entry from the map using the underlying map delegate - entry, err := m.delegatingMap.Remove(ctx, key, opts...) +func (m *cachingMap) Put(ctx context.Context, key string, value []byte, opts ...PutOption) (*Entry, error) { + // Put the entry in the map using the underlying map delegate + entry, err := m.delegatingMap.Put(ctx, key, value, opts...) if err != nil { return nil, err } - // If the entry in the cache is still older than the removed entry, remove the entry - // This check is performed because a concurrent event could update the cached entry - m.mu.Lock() - prevEntry, ok := m.cache.Get(key) - if ok && prevEntry.(*Entry).Version <= entry.Version { - m.cache.Remove(key) + // Update the cache if necessary + if err != nil { + return nil, err } - m.mu.Unlock() + m.cacheRead(entry, false) return entry, nil } -func (m *cachingMap) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, error) { - // If the entry is already in the cache, return it - m.mu.RLock() - cachedEntry, ok := m.cache.Get(key) - m.mu.RUnlock() - if ok { - return cachedEntry.(*Entry), nil +func (m *cachingMap) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { + // Remove the entry from the map using the underlying map delegate + entry, err := m.delegatingMap.Remove(ctx, key, opts...) + if err != nil { + return nil, err } - // Otherwise, fetch the entry from the underlying map - // Note that we do not cache the entry here since it could have been removed, in which - // case we'd have to maintain tombstones in the cache to compare versions. - entry, err := m.delegatingMap.Get(ctx, key, opts...) + // Update the cache if necessary if err != nil { return nil, err } + m.cacheRead(entry, true) return entry, nil } @@ -134,3 +197,9 @@ func (m *cachingMap) Close(ctx context.Context) error { m.mu.Unlock() return m.delegatingMap.Close(ctx) } + +// cachedEntry is a cached entry +type cachedEntry struct { + *Entry + tombstone bool +} diff --git a/pkg/client/map/map.go b/pkg/client/map/map.go index 89a7971..b762f0f 100644 --- a/pkg/client/map/map.go +++ b/pkg/client/map/map.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/atomix/go-client/pkg/client/primitive" "github.com/atomix/go-client/pkg/client/util" + "math" "sync" "time" ) @@ -119,6 +120,9 @@ func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session } results, err := util.ExecuteOrderedAsync(len(sessions), func(i int) (interface{}, error) { + if options.cached { + return newPartition(ctx, name, sessions[i], WithCache(int(math.Max(float64(options.cacheSize/len(sessions)), 1)))) + } return newPartition(ctx, name, sessions[i]) }) if err != nil { @@ -130,18 +134,10 @@ func New(ctx context.Context, name primitive.Name, sessions []*primitive.Session maps[i] = result.(Map) } - var m Map = &_map{ + return &_map{ name: name, partitions: maps, - } - if options.cached { - cached, err := newCachingMap(m, options.cacheSize) - if err != nil { - return nil, err - } - m = cached - } - return m, nil + }, nil } // _map is the default single-partition implementation of Map @@ -175,7 +171,13 @@ func (m *_map) Get(ctx context.Context, key string, opts ...GetOption) (*Entry, if err != nil { return nil, err } - return session.Get(ctx, key, opts...) + entry, err := session.Get(ctx, key, opts...) + if err != nil { + return nil, err + } else if entry.Value == nil { + return nil, nil + } + return entry, nil } func (m *_map) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { diff --git a/pkg/client/map/partition.go b/pkg/client/map/partition.go index fbb57df..f3f3cf5 100644 --- a/pkg/client/map/partition.go +++ b/pkg/client/map/partition.go @@ -23,15 +23,28 @@ import ( "google.golang.org/grpc" ) -func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session) (Map, error) { +func newPartition(ctx context.Context, name primitive.Name, session *primitive.Session, opts ...Option) (Map, error) { + options := &options{} + for _, opt := range opts { + opt.apply(options) + } + instance, err := primitive.NewInstance(ctx, name, session, &primitiveHandler{}) if err != nil { return nil, err } - return &mapPartition{ + var partition Map = &mapPartition{ name: name, instance: instance, - }, nil + } + if options.cached { + cached, err := newCachingMap(partition, options.cacheSize) + if err != nil { + return nil, err + } + partition = cached + } + return partition, nil } type mapPartition struct { @@ -122,7 +135,12 @@ func (m *mapPartition) Get(ctx context.Context, key string, opts ...GetOption) ( Updated: response.Updated, }, nil } - return nil, nil + + // Return a non-empty nil-value Entry when response version is 0 + return &Entry{ + Key: key, + Version: int64(response.Header.Index), + }, nil } func (m *mapPartition) Remove(ctx context.Context, key string, opts ...RemoveOption) (*Entry, error) { @@ -259,23 +277,28 @@ func (m *mapPartition) Watch(ctx context.Context, ch chan<- *Event, opts ...Watc defer close(ch) for event := range stream { response := event.(*api.EventResponse) + var version int64 var t EventType switch response.Type { case api.EventResponse_NONE: t = EventNone + version = response.Version case api.EventResponse_INSERTED: t = EventInserted + version = response.Version case api.EventResponse_UPDATED: t = EventUpdated + version = response.Version case api.EventResponse_REMOVED: t = EventRemoved + version = int64(response.Header.Index) } ch <- &Event{ Type: t, Entry: &Entry{ Key: response.Key, Value: response.Value, - Version: response.Version, + Version: version, Created: response.Created, Updated: response.Updated, },