Skip to content

Commit

Permalink
etcd: remove mutex side keys from results in List
Browse files Browse the repository at this point in the history
This PR completes the work done on aligning the
results returned by `List` by filtering `*_lock`
keys from the results.

Additionally, we test for this case for both etcd
v2 and etcd v3 and we change the lock suffix to be
`___lock` instead of `_lock` to avoid collision with
similar key names following the pattern of using a
simple underscore (might brake backward compatibility
of `List` results for client the using master version
of libkv).

Signed-off-by: Alexandre Beslic <abeslic@abronan.com>
  • Loading branch information
abronan committed Oct 16, 2017
1 parent eb8d775 commit 5e4bb28
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 37 deletions.
23 changes: 12 additions & 11 deletions store/etcd/v2/etcd.go
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/docker/libkv/store"
)

const (
lockSuffix = "___lock"
)

var (
// ErrAbortTryLock is thrown when a user stops trying to seek the lock
// by sending a signal to the stop chan, this is used to verify if the
Expand All @@ -27,9 +31,7 @@ var (
// Etcd is the receiver type for the
// Store interface
type Etcd struct {
client etcd.KeysAPI
mutexKey string // mutexKey is the key to write appended with a "_lock" suffix
writeKey string // writeKey is the actual key to update protected by the mutexKey
client etcd.KeysAPI
}

type etcdLock struct {
Expand Down Expand Up @@ -445,6 +447,11 @@ func (s *Etcd) List(directory string, opts *store.ReadOptions) ([]*store.KVPair,
kv = append(kv, pairs...)
}

// Filter out etcd mutex side keys with `___lock` suffix
if strings.Contains(string(n.Key), lockSuffix) {
continue
}

kv = append(kv, &store.KVPair{
Key: n.Key,
Value: []byte(n.Value),
Expand Down Expand Up @@ -487,22 +494,16 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke
}
}

mutexKey := s.normalize(key + "_lock")
writeKey := s.normalize(key)

// Create lock object
lock = &etcdLock{
client: s.client,
stopRenew: renewCh,
mutexKey: mutexKey,
writeKey: writeKey,
mutexKey: s.normalize(key + lockSuffix),
writeKey: s.normalize(key),
value: value,
ttl: ttl,
}

s.mutexKey = mutexKey
s.writeKey = writeKey

return lock, nil
}

Expand Down
1 change: 1 addition & 0 deletions store/etcd/v2/etcd_test.go
Expand Up @@ -53,6 +53,7 @@ func TestEtcdStore(t *testing.T) {
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestListLock(t, kv)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}
6 changes: 3 additions & 3 deletions store/etcd/v3/etcd.go
Expand Up @@ -16,7 +16,7 @@ import (
const (
defaultLockTTL = 20 * time.Second
etcdDefaultTimeout = 5 * time.Second
lockSuffix = "_lock"
lockSuffix = "___lock"
)

// EtcdV3 is the receiver type for the
Expand Down Expand Up @@ -518,8 +518,8 @@ func (s *EtcdV3) list(directory string, opts *store.ReadOptions) (int64, []*stor
continue
}

// Allow to filter the _lock entry when the mutex key directory is given
if strings.Contains(string(n.Key), lockSuffix+"/") {
// Filter out etcd mutex side keys with `___lock` suffix
if strings.Contains(string(n.Key), lockSuffix) {
continue
}

Expand Down
24 changes: 1 addition & 23 deletions store/etcd/v3/etcd_test.go
Expand Up @@ -53,29 +53,7 @@ func TestEtcdV3Store(t *testing.T) {
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestListLock(t, kv)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

func TestEtcdListLockKey(t *testing.T) {
lockKV := makeEtcdV3Client(t)

key := "testLocketcdv3"
value := []byte("bar")

// We should be able to create a new lock on key
lock, err := lockKV.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second})
assert.NoError(t, err)
assert.NotNil(t, lock)

// Lock should successfully succeed or block
lockChan, err := lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)

pairs, err := lockKV.List(key)
assert.NoError(t, err)
assert.NotNil(t, pairs)
assert.Equal(t, 1, len(pairs))
assert.Equal(t, key, pairs[0].Key)
}
48 changes: 48 additions & 0 deletions testutils/utils.go
Expand Up @@ -3,6 +3,7 @@ package testutils
import (
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -18,6 +19,12 @@ func RunTestCommon(t *testing.T, kv store.Store) {
testDeleteTree(t, kv)
}

// RunTestListLock tests the list output for mutexes
// and checks that internal side keys are not listed
func RunTestListLock(t *testing.T, kv store.Store) {
testListLockKey(t, kv)
}

// RunTestAtomic tests the Atomic operations by the K/V
// backends
func RunTestAtomic(t *testing.T, kv store.Store) {
Expand Down Expand Up @@ -555,6 +562,45 @@ func testList(t *testing.T, kv store.Store) {
assert.Nil(t, pairs)
}

func testListLockKey(t *testing.T, kv store.Store) {
listKey := "testListLockSide"

err := kv.Put(listKey, []byte("val"), &store.WriteOptions{IsDir: true})
assert.NoError(t, err)

err = kv.Put(listKey+"/subfolder", []byte("val"), &store.WriteOptions{IsDir: true})
assert.NoError(t, err)

// Put keys under subfolder.
for i := 1; i <= 3; i++ {
key := listKey + "/subfolder/key" + strconv.Itoa(i)
err := kv.Put(key, []byte("val"), nil)
assert.NoError(t, err)

// We lock the child key
lock, err := kv.NewLock(key, &store.LockOptions{Value: []byte("locked"), TTL: 2 * time.Second})
assert.NoError(t, err)
assert.NotNil(t, lock)

lockChan, err := lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)
}

// List children of the root directory (`listKey`), this should
// not output any `___lock` entries and must contain 4 results.
pairs, err := kv.List(listKey, nil)
assert.NoError(t, err)
assert.NotNil(t, pairs)
assert.Equal(t, 4, len(pairs))

for _, pair := range pairs {
if strings.Contains(string(pair.Key), "___lock") {
assert.FailNow(t, "tesListLockKey: found a key containing lock suffix '___lock'")
}
}
}

func testDeleteTree(t *testing.T, kv store.Store) {
prefix := "testDeleteTree"

Expand Down Expand Up @@ -615,6 +661,8 @@ func RunCleanup(t *testing.T, kv store.Store) {
"testPutTTL",
"testList/subfolder",
"testList",
"testListLockSide/subfolder",
"testListLockSide",
"testDeleteTree",
} {
err := kv.DeleteTree(key)
Expand Down

0 comments on commit 5e4bb28

Please sign in to comment.