/
acquire_handler.go
128 lines (111 loc) · 3.4 KB
/
acquire_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package v2
import (
"net/http"
"path"
"strconv"
"time"
"github.com/coreos/go-etcd/etcd"
"github.com/gorilla/mux"
)
// acquireHandler attempts to acquire a lock on the given key.
// The "key" parameter specifies the resource to lock.
// The "ttl" parameter specifies how long the lock will persist for.
// The "timeout" parameter specifies how long the request should wait for the lock.
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()
// Setup connection watcher.
closeNotifier, _ := w.(http.CloseNotifier)
closeChan := closeNotifier.CloseNotify()
// Parse "key" and "ttl" query parameters.
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}
// Parse "timeout" parameter.
var timeout int
if len(req.FormValue("timeout")) == 0 {
timeout = -1
} else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
http.Error(w, "invalid timeout: " + err.Error(), http.StatusInternalServerError)
return
}
timeout = timeout + 1
// Create an incrementing id for the lock.
resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
indexpath := resp.Node.Key
// Keep updating TTL to make sure lock request is not expired before acquisition.
stop := make(chan bool)
go h.ttlKeepAlive(indexpath, ttl, stop)
// Monitor for broken connection.
stopWatchChan := make(chan bool)
go func() {
select {
case <-closeChan:
stopWatchChan <- true
case <-stop:
// Stop watching for connection disconnect.
}
}()
// Extract the lock index.
index, _ := strconv.Atoi(path.Base(resp.Node.Key))
// Wait until we successfully get a lock or we get a failure.
var success bool
for {
// Read all indices.
resp, err = h.client.Get(keypath, true, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
break
}
indices := extractResponseIndices(resp)
waitIndex := resp.Node.ModifiedIndex
prevIndex := findPrevIndex(indices, index)
// If there is no previous index then we have the lock.
if prevIndex == 0 {
success = true
break
}
// Otherwise watch previous index until it's gone.
_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
if err == etcd.ErrWatchStoppedByUser {
break
} else if err != nil {
http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
break
}
}
// Check for connection disconnect before we write the lock index.
select {
case <-stopWatchChan:
success = false
default:
}
// Stop the ttl keep-alive.
close(stop)
if success {
// Write lock index to response body if we acquire the lock.
h.client.Update(indexpath, "-", uint64(ttl))
w.Write([]byte(strconv.Itoa(index)))
} else {
// Make sure key is deleted if we couldn't acquire.
h.client.Delete(indexpath, false)
}
}
// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
func (h *handler) ttlKeepAlive(k string, ttl int, stop chan bool) {
for {
select {
case <-time.After(time.Duration(ttl / 2) * time.Second):
h.client.Update(k, "-", uint64(ttl))
case <-stop:
return
}
}
}