Skip to content

Commit

Permalink
Merge 8699b1f into 93ab0e6
Browse files Browse the repository at this point in the history
  • Loading branch information
jlhawn committed Dec 18, 2017
2 parents 93ab0e6 + 8699b1f commit 9995660
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 103 deletions.
4 changes: 3 additions & 1 deletion store/consul/consul_test.go
Expand Up @@ -47,13 +47,15 @@ func TestConsulStore(t *testing.T) {
lockKV := makeConsulClient(t)
ttlKV := makeConsulClient(t)

defer testutils.RunCleanup(t, kv)

testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestLockWait(t, kv, lockKV)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

func TestGetActiveSession(t *testing.T) {
Expand Down
174 changes: 86 additions & 88 deletions store/etcd/etcd.go
@@ -1,6 +1,7 @@
package etcd

import (
"context"
"crypto/tls"
"errors"
"log"
Expand All @@ -9,8 +10,6 @@ import (
"strings"
"time"

"golang.org/x/net/context"

etcd "github.com/coreos/etcd/client"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
Expand All @@ -30,13 +29,29 @@ type Etcd struct {
}

type etcdLock struct {
client etcd.KeysAPI
stopLock chan struct{}
client etcd.KeysAPI
key string
value string
ttl time.Duration

// Closed when the caller wants to stop renewing the lock. I'm not sure
// why this is even used - you could just call the Unlock() method.
stopRenew chan struct{}
key string
value string
last *etcd.Response
ttl time.Duration
// When the lock is held, this is the last modified index of the key.
// Used for conditional updates when extending the lock TTL and when
// conditionall deleteing when Unlock() is called.
lastIndex uint64
// When the lock is held, this function will cancel the locked context.
// This is called both by the Unlock() method in order to stop the
// background holding goroutine and in a deferred call in that background
// holding goroutine in case the lock is lost due to an error or the
// stopRenew channel is closed. Calling this function also closes the chan
// returned by the Lock() method.
cancel context.CancelFunc
// Used to sync the Unlock() call with the background holding goroutine.
// This channel is closed when that background goroutine exits, signalling
// that it is okay to conditionally delete the key.
doneHolding chan struct{}
}

const (
Expand Down Expand Up @@ -472,132 +487,115 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {

// Lock holder channel
lockHeld := make(chan struct{})
stopLocking := l.stopRenew

// Conditional Set - only if the key does not exist.
setOpts := &etcd.SetOptions{
TTL: l.ttl,
TTL: l.ttl,
PrevExist: etcd.PrevNoExist,
}

for {
setOpts.PrevExist = etcd.PrevNoExist
resp, err := l.client.Set(context.Background(), l.key, l.value, setOpts)
if err != nil {
if etcdError, ok := err.(etcd.Error); ok {
if etcdError.Code != etcd.ErrorCodeNodeExist {
return nil, err
}
setOpts.PrevIndex = ^uint64(0)
}
} else {
setOpts.PrevIndex = resp.Node.ModifiedIndex
}

setOpts.PrevExist = etcd.PrevExist
l.last, err = l.client.Set(context.Background(), l.key, l.value, setOpts)

if err == nil {
// Leader section
l.stopLock = stopLocking
go l.holdLock(l.key, lockHeld, stopLocking)
break
} else {
// If this is a legitimate error, return
if etcdError, ok := err.(etcd.Error); ok {
if etcdError.Code != etcd.ErrorCodeTestFailed {
return nil, err
}
}
// Acquired the lock!
l.lastIndex = resp.Node.ModifiedIndex
lockedCtx, cancel := context.WithCancel(context.Background())
l.cancel = cancel
l.doneHolding = make(chan struct{})

// Seeker section
errorCh := make(chan error)
chWStop := make(chan bool)
free := make(chan bool)
go l.holdLock(lockedCtx)

go l.waitLock(l.key, errorCh, chWStop, free)
return lockedCtx.Done(), nil
}

// Wait for the key to be available or for
// a signal to stop trying to lock the key
select {
case <-free:
break
case err := <-errorCh:
return nil, err
case <-stopChan:
return nil, ErrAbortTryLock
}
etcdErr, ok := err.(etcd.Error)
if !ok || etcdErr.Code != etcd.ErrorCodeNodeExist {
return nil, err // Unexpected error.
}

// Delete or Expire event occurred
// Retry
// Need to wait for the lock key to expire or be deleted.
if err := l.waitLock(stopChan, etcdErr.Index); err != nil {
return nil, err
}
}

return lockHeld, nil
// Delete or Expire event occurred.
// Retry
}
}

// Hold the lock as long as we can
// Hold the lock as long as we can.
// Updates the key ttl periodically until we receive
// an explicit stop signal from the Unlock method
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) {
defer close(lockHeld)
// an explicit stop signal from the Unlock method OR
// the stopRenew channel is closed.
func (l *etcdLock) holdLock(ctx context.Context) {
defer close(l.doneHolding)
defer l.cancel()

update := time.NewTicker(l.ttl / 3)
defer update.Stop()

var err error
setOpts := &etcd.SetOptions{TTL: l.ttl}

for {
select {
case <-update.C:
setOpts.PrevIndex = l.last.Node.ModifiedIndex
l.last, err = l.client.Set(context.Background(), key, l.value, setOpts)
setOpts.PrevIndex = l.lastIndex
resp, err := l.client.Set(ctx, l.key, l.value, setOpts)
if err != nil {
return
}

case <-stopLocking:
l.lastIndex = resp.Node.ModifiedIndex
case <-l.stopRenew:
return
case <-ctx.Done():
return
}
}
}

// WaitLock simply waits for the key to be available for creation
func (l *etcdLock) waitLock(key string, errorCh chan error, stopWatchCh chan bool, free chan<- bool) {
opts := &etcd.WatcherOptions{Recursive: false}
watcher := l.client.Watcher(key, opts)
// WaitLock simply waits for the key to be available for creation.
func (l *etcdLock) waitLock(stopWait <-chan struct{}, afterIndex uint64) error {
waitCtx, waitCancel := context.WithCancel(context.Background())
defer waitCancel()
go func() {
select {
case <-stopWait:
// If the caller closes the stopWait, cancel the wait context.
waitCancel()
case <-waitCtx.Done():
// No longer waiting.
}
}()

watcher := l.client.Watcher(l.key, &etcd.WatcherOptions{AfterIndex: afterIndex})
for {
event, err := watcher.Next(context.Background())
event, err := watcher.Next(waitCtx)
if err != nil {
errorCh <- err
return
if err == context.Canceled {
return ErrAbortTryLock
}
return err
}
if event.Action == "delete" || event.Action == "expire" {
free <- true
return
switch event.Action {
case "delete", "compareAndDelete", "expire":
return nil // The key has been deleted or expired.
}
}
}

// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *etcdLock) Unlock() error {
if l.stopLock != nil {
l.stopLock <- struct{}{}
}
if l.last != nil {
l.cancel() // Will signal the holdLock goroutine to exit.
<-l.doneHolding // Wait for the holdLock goroutine to exit.

var err error
if l.lastIndex != 0 {
delOpts := &etcd.DeleteOptions{
PrevIndex: l.last.Node.ModifiedIndex,
}
_, err := l.client.Delete(context.Background(), l.key, delOpts)
if err != nil {
return err
PrevIndex: l.lastIndex,
}
_, err = l.client.Delete(context.Background(), l.key, delOpts)
}
return nil
return err
}

// Close closes the client connection
Expand Down
4 changes: 3 additions & 1 deletion store/etcd/etcd_test.go
Expand Up @@ -48,11 +48,13 @@ func TestEtcdStore(t *testing.T) {
lockKV := makeEtcdClient(t)
ttlKV := makeEtcdClient(t)

defer testutils.RunCleanup(t, kv)

testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestLockWait(t, kv, lockKV)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}
3 changes: 2 additions & 1 deletion store/zookeeper/zookeeper_test.go
Expand Up @@ -45,10 +45,11 @@ func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
ttlKV := makeZkClient(t)

defer testutils.RunCleanup(t, kv)

testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

0 comments on commit 9995660

Please sign in to comment.