forked from coreos/fleet
/
lock.go
55 lines (45 loc) · 1.56 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package registry
import (
"fmt"
"path"
"github.com/coreos/fleet/third_party/github.com/coreos/go-etcd/etcd"
log "github.com/coreos/fleet/third_party/github.com/golang/glog"
)
const (
// ResourceMutexTTL is the number of seconds to allow a mutex to be held on a resource
ResourceMutexTTL = 10
mutexPrefix = "/mutex/"
)
// lockResource will attempt to lock a mutex on a resource defined by the
// provided class and id. The context will be persisted to the Registry to
// track by whom the mutex is currently locked.
func (r *Registry) lockResource(class, id, context string) *TimedResourceMutex {
mutexName := fmt.Sprintf("%s-%s", class, id)
log.V(2).Infof("Attempting to acquire mutex on %s", mutexName)
key := path.Join(keyPrefix, mutexPrefix, mutexName)
resp, err := r.etcd.Create(key, context, uint64(ResourceMutexTTL))
if err != nil {
log.V(2).Infof("Failed to acquire mutex on %s", mutexName)
return nil
}
log.V(2).Infof("Successfully acquired mutex on %s", mutexName)
return &TimedResourceMutex{r.etcd, *resp.Node}
}
// TimedResourceMutex is a proxy to an auto-expiring mutex
// stored in the Registry. It assumes the mutex creator has
// initialized a timer.
type TimedResourceMutex struct {
etcd *etcd.Client
node etcd.Node
}
// Unlock will attempt to remove the lock held on the mutex
// in the Registry.
func (t *TimedResourceMutex) Unlock() error {
_, err := t.etcd.CompareAndDelete(t.node.Key, "", t.node.CreatedIndex)
if err != nil {
err = fmt.Errorf("Received error while unlocking mutex: %v", err)
log.V(2).Info(err)
return err
}
return nil
}