Skip to content

Commit

Permalink
Add a simple cache for objects stored in etcd.
Browse files Browse the repository at this point in the history
  • Loading branch information
fgrzadkowski committed Apr 27, 2015
1 parent 9f753c2 commit 0bef09e
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 30 deletions.
3 changes: 2 additions & 1 deletion pkg/master/master_test.go
Expand Up @@ -30,7 +30,8 @@ func TestGetServersToValidate(t *testing.T) {
config := Config{}
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
config.EtcdHelper = tools.EtcdHelper{fakeClient, latest.Codec, nil}
config.EtcdHelper = tools.NewEtcdHelper(fakeClient, latest.Codec)
config.EtcdHelper.Versioner = nil

master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{})

Expand Down
78 changes: 70 additions & 8 deletions pkg/tools/etcd_helper.go
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"os/exec"
"reflect"
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
Expand All @@ -38,6 +39,15 @@ type EtcdHelper struct {
Codec runtime.Codec
// optional, no atomic operations can be performed without this interface
Versioner EtcdVersioner

// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
cache map[uint64]interface{}
mutex sync.RWMutex
}

// NewEtcdHelper creates a helper that works against objects that use the internal
Expand All @@ -47,6 +57,7 @@ func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec) EtcdHelper {
Client: client,
Codec: codec,
Versioner: APIObjectVersioner{},
cache: make(map[uint64]interface{}),
}
}

Expand Down Expand Up @@ -116,19 +127,70 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
}
continue
}
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
if obj, found := h.getFromCache(node.ModifiedIndex); found {
v.Set(reflect.Append(v, reflect.ValueOf(obj)))
} else {
obj := reflect.New(v.Type().Elem())
if err := h.Codec.DecodeInto([]byte(node.Value), obj.Interface().(runtime.Object)); err != nil {
return err
}
if h.Versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
}
v.Set(reflect.Append(v, obj.Elem()))
if node.ModifiedIndex != 0 {
h.addToCache(node.ModifiedIndex, obj.Interface())
}
}
v.Set(reflect.Append(v, obj.Elem()))
}
return nil
}

// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
getFromCache(index uint64) (interface{}, bool)
addToCache(index uint64, obj interface{})
}

const maxEtcdCacheEntries int = 100000

func (h *EtcdHelper) getFromCache(index uint64) (interface{}, bool) {
var obj interface{}
func() {
h.mutex.RLock()
defer h.mutex.RUnlock()
obj = h.cache[index]
}()
if obj != nil {
// We should not return the object itself to avoid poluting the cache if someone
// modifies returned values.
objCopy, err := conversion.DeepCopy(obj)
if err != nil {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
return objCopy, true
} else {
return nil, false
}
}

func (h *EtcdHelper) addToCache(index uint64, obj interface{}) {
h.mutex.Lock()
defer h.mutex.Unlock()
h.cache[index] = obj
if len(h.cache) > maxEtcdCacheEntries {
var randomKey uint64
for randomKey = range h.cache {
break
}
delete(h.cache, randomKey)
}
}

// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
// definition) and extracts a go object per etcd node into a slice with the resource version.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
Expand Down
10 changes: 6 additions & 4 deletions pkg/tools/etcd_helper_test.go
Expand Up @@ -169,7 +169,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
Key: "/baz",
Value: getEncodedPod("baz"),
Dir: false,
ModifiedIndex: 1,
ModifiedIndex: 3,
},
},
},
Expand All @@ -194,7 +194,7 @@ func TestExtractToListAcrossDirectories(t *testing.T) {
Items: []api.Pod{
// We expect list to be sorted by directory (e.g. namespace) first, then by name.
{
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "1"},
ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "3"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Expand Down Expand Up @@ -470,7 +470,8 @@ func TestSetObjWithVersion(t *testing.T) {
func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
helper := NewEtcdHelper(fakeClient, testapi.Codec())
helper.Versioner = nil
returnedObj := &api.Pod{}
err := helper.SetObj("/some/key", obj, returnedObj, 3)
if err != nil {
Expand All @@ -496,7 +497,8 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
func TestSetObjNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
helper := NewEtcdHelper(fakeClient, testapi.Codec())
helper.Versioner = nil
err := helper.SetObj("/some/key", obj, nil, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
Expand Down
18 changes: 14 additions & 4 deletions pkg/tools/etcd_helper_watch.go
Expand Up @@ -71,7 +71,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil)
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
Expand All @@ -80,7 +80,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
// API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil)
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
Expand All @@ -102,7 +102,7 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
//
// Errors will be sent down the channel.
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform)
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w
}
Expand Down Expand Up @@ -142,14 +142,16 @@ type etcdWatcher struct {

// Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event)

cache etcdCache
}

// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc) *etcdWatcher {
func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner EtcdVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
Expand All @@ -162,6 +164,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
etcdStop: make(chan bool),
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),
cache: cache,
}
w.emit = func(e watch.Event) { w.outgoing <- e }
go w.translate()
Expand Down Expand Up @@ -253,6 +256,10 @@ func (w *etcdWatcher) translate() {
}

func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
if obj, found := w.cache.getFromCache(node.ModifiedIndex); found {
return obj.(runtime.Object), nil
}

obj, err := w.encoding.Decode([]byte(node.Value))
if err != nil {
return nil, err
Expand All @@ -274,6 +281,9 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
}
}

if node.ModifiedIndex != 0 {
w.cache.addToCache(node.ModifiedIndex, obj)
}
return obj, nil
}

Expand Down
36 changes: 23 additions & 13 deletions pkg/tools/etcd_helper_watch_test.go
Expand Up @@ -31,6 +31,16 @@ import (

var versioner = APIObjectVersioner{}

// Implements etcdCache interface as empty methods (i.e. does not cache any objects)
type fakeEtcdCache struct{}

func (f *fakeEtcdCache) getFromCache(index uint64) (interface{}, bool) {
return nil, false
}

func (f *fakeEtcdCache) addToCache(index uint64, obj interface{}) {
}

func TestWatchInterpretations(t *testing.T) {
codec := latest.Codec
// Declare some pods to make the test cases compact.
Expand Down Expand Up @@ -114,7 +124,7 @@ func TestWatchInterpretations(t *testing.T) {

for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil)
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
Expand Down Expand Up @@ -152,7 +162,7 @@ func TestWatchInterpretations(t *testing.T) {
}

func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -166,7 +176,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -180,7 +190,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
func TestWatchInterpretation_ResponseBadData(t *testing.T) {
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil)
w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -205,7 +215,7 @@ func TestWatchEtcdError(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
fakeClient.WatchImmediateError = fmt.Errorf("immediate error")
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.Watch("/some/key", 4, Everything)
if err != nil {
Expand Down Expand Up @@ -233,7 +243,7 @@ func TestWatch(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
Expand Down Expand Up @@ -406,7 +416,7 @@ func TestWatchEtcdState(t *testing.T) {
for key, value := range testCase.Initial {
fakeClient.Data[key] = value
}
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)
watching, err := h.Watch("/somekey/foo", testCase.From, Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand Down Expand Up @@ -477,7 +487,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
for k, testCase := range testCases {
fakeClient := NewFakeEtcdClient(t)
fakeClient.Data["/some/key"] = testCase.Response
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
Expand Down Expand Up @@ -537,7 +547,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
EtcdIndex: 3,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.WatchList("/some/key", 0, Everything)
if err != nil {
Expand Down Expand Up @@ -575,7 +585,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}

fakeClient := NewFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.WatchList("/some/key", 1, Everything)
if err != nil {
Expand Down Expand Up @@ -626,7 +636,7 @@ func TestWatchFromNotFound(t *testing.T) {
ErrorCode: 100,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
Expand All @@ -652,7 +662,7 @@ func TestWatchFromOtherError(t *testing.T) {
ErrorCode: 101,
},
}
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)

watching, err := h.Watch("/some/key", 0, Everything)
if err != nil {
Expand Down Expand Up @@ -683,7 +693,7 @@ func TestWatchFromOtherError(t *testing.T) {

func TestWatchPurposefulShutdown(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
h := NewEtcdHelper(fakeClient, codec)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}

// Test purposeful shutdown
Expand Down

0 comments on commit 0bef09e

Please sign in to comment.