-
Notifications
You must be signed in to change notification settings - Fork 2
/
reservation.go
108 lines (85 loc) · 2.53 KB
/
reservation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package sync
import (
"errors"
"fmt"
"time"
log "github.com/cihub/seelog"
"github.com/hailo-platform/H2O/service/zookeeper"
gozk "github.com/hailo-platform/H2O/go-zookeeper/zk"
)
var (
ErrReserved = errors.New("[Reservation] Item already reserved")
)
// Reservation will reserve an item given an id. The reservation must expire and may be released
// at any time by any actor
type Reservation interface {
Reserve(time.Duration) error
Release() error
}
type DefaultReservation struct {
Ttl time.Duration
path string
id string
acl []gozk.ACL
}
type ReservationData struct {
Expires time.Time
}
// NewReservation creates a default reservation
func NewReservation(path, id string, acl []gozk.ACL) Reservation {
return &DefaultReservation{
path: path,
acl: acl,
id: id,
}
}
// Reserve will reserve an item with the id in the DefaultReservation for the given amount of
// time.
func (dr *DefaultReservation) Reserve(d time.Duration) error {
log.Debugf("[Sync:Reservation] Attempting to reserve '%s' for %s...", dr.id, d)
lockpath := constructPath(dr.path, dr.id)
// Check if the reservation already exists
b, _, err := zookeeper.Get(lockpath)
if err == nil {
// It exists, check if expired
var expired time.Time
expired.GobDecode(b)
log.Debugf("[Sync:Reservation] Read existing node for '%s', expires at %s", dr.id, expired)
if expired.Before(time.Now()) {
log.Debugf("[Sync:Reservation] Deleting expired lock '%s'", dr.id)
// It has expired, delete the node and get the reservation as usual
zookeeper.Delete(lockpath, -1)
} else {
return ErrReserved
}
}
expires := time.Now().Add(d)
expiresBytes, err := expires.GobEncode()
if err != nil {
return err
}
for {
_, err = zookeeper.Create(lockpath, expiresBytes, 0, dr.acl)
if err == gozk.ErrNoNode {
createParents(dr.path)
} else if err == nil {
break
} else {
log.Warnf("[Reservation] ZK error creating ephemeral lock node for reservation: %v", err)
return err
}
}
log.Debugf("[Sync:Reservation] Created lock node for '%s', expires at %s", dr.id, expires)
return nil
}
// Release will release the reservation in the DefaultReservation
func (dr *DefaultReservation) Release() error {
return zookeeper.Delete(constructPath(dr.path, dr.id), -1)
}
// AnonymousRelease will release the reservation of the item with the given id and path
func AnonymousRelease(path, id string) error {
return zookeeper.Delete(constructPath(path, id), -1)
}
func constructPath(path, id string) string {
return fmt.Sprintf("%s/reserve-%s", path, id)
}