/
zookeeper.go
193 lines (158 loc) · 4.33 KB
/
zookeeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package zk
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"
"time"
"github.com/samuel/go-zookeeper/zk"
"github.com/sirupsen/logrus"
"github.com/eremetic-framework/eremetic"
)
// connection wraps a zk.Conn struct for testability
type connection interface {
Close()
Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)
Delete(path string, n int32) error
Exists(path string) (bool, *zk.Stat, error)
Get(path string) ([]byte, *zk.Stat, error)
Set(path string, data []byte, version int32) (*zk.Stat, error)
Children(path string) ([]string, *zk.Stat, error)
}
// connector helps create a zookeeper connection
type connector interface {
Connect(path string) (connection, error)
}
// TaskDB is a Zookeeper implementation of the task database.
type TaskDB struct {
conn connection
path string
}
type defaultConnector struct{}
func (z defaultConnector) Connect(zksStr string) (connection, error) {
zks := strings.Split(zksStr, ",")
conn, _, err := zk.Connect(zks, time.Second)
return conn, err
}
func parsePath(zkpath string) (string, string, error) {
u, err := url.Parse(zkpath)
if err != nil {
return "", "", err
}
path := strings.TrimRight(u.Path, "/")
return u.Host, path, nil
}
// NewTaskDB returns a new instance of a Zookeeper TaskDB.
func NewTaskDB(zk string) (*TaskDB, error) {
return newCustomTaskDB(defaultConnector{}, zk)
}
func newCustomTaskDB(c connector, path string) (*TaskDB, error) {
if path == "" {
return nil, errors.New("Missing ZK path")
}
servers, path, err := parsePath(path)
if err != nil {
return nil, err
}
conn, err := c.Connect(servers)
if err != nil {
return nil, err
}
exists, _, err := conn.Exists(path)
if err != nil {
return nil, err
}
if !exists {
flags := int32(0)
acl := zk.WorldACL(zk.PermAll)
_, err = conn.Create(path, nil, flags, acl)
if err != nil {
logrus.WithError(err).Error("Unable to create node.")
return nil, err
}
}
return &TaskDB{
conn: conn,
path: path,
}, nil
}
// Close closes the connection to the database.
func (z *TaskDB) Close() {
z.conn.Close()
}
// Clean removes all tasks from the database.
func (z *TaskDB) Clean() error {
path := fmt.Sprintf("%s/", z.path)
return z.conn.Delete(path, -1)
}
// PutTask adds a new task to the database.
func (z *TaskDB) PutTask(task *eremetic.Task) error {
path := fmt.Sprintf("%s/%s", z.path, task.ID)
encode, err := eremetic.Encode(task)
if err != nil {
logrus.WithError(err).Error("Unable to encode task to byte-array.")
return err
}
exists, stat, err := z.conn.Exists(path)
if err != nil {
logrus.WithError(err).Error("Unable to check existence of database.")
return err
}
if exists {
_, err = z.conn.Set(path, encode, stat.Version)
return err
}
flags := int32(0)
acl := zk.WorldACL(zk.PermAll)
_, err = z.conn.Create(path, encode, flags, acl)
return err
}
// ReadTask returns a task with a given id, or an error if not found.
func (z *TaskDB) ReadTask(id string) (eremetic.Task, error) {
task, err := z.ReadUnmaskedTask(id)
eremetic.ApplyMask(&task)
return task, err
}
// ReadUnmaskedTask returns a task with all its environment variables unmasked.
func (z *TaskDB) ReadUnmaskedTask(id string) (eremetic.Task, error) {
var task eremetic.Task
path := fmt.Sprintf("%s/%s", z.path, id)
bytes, _, err := z.conn.Get(path)
if err != nil {
logrus.WithError(err).Debug("Unable to Get from zk.")
}
uError := json.Unmarshal(bytes, &task)
if uError != nil {
logrus.WithError(uError).Debug(fmt.Sprintf("Unable to Unmarshal %s.", string(bytes)))
}
return task, err
}
// DeleteTask deletes a task with the matching ID from zookeeper
func (z *TaskDB) DeleteTask(id string) error {
path := fmt.Sprintf("%s/%s", z.path, id)
_, stat, err := z.conn.Exists(path)
if err != nil {
logrus.WithError(err).Error("Unable to check existence of database.")
return err
}
err = z.conn.Delete(path, stat.Version)
return err
}
// ListTasks returns all tasks.
func (z *TaskDB) ListTasks(filter *eremetic.TaskFilter) ([]*eremetic.Task, error) {
tasks := []*eremetic.Task{}
paths, _, _ := z.conn.Children(z.path)
for _, p := range paths {
t, err := z.ReadTask(p)
if err != nil {
logrus.WithError(err).Error("Unable to read task from database, skipping")
continue
}
eremetic.ApplyMask(&t)
if filter.Match(&t) {
tasks = append(tasks, &t)
}
}
return tasks, nil
}