/
or-set.go
397 lines (307 loc) · 9.95 KB
/
or-set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package crdt
import (
"fmt"
"os"
"sort"
"strings"
"sync"
"encoding/base64"
"io/ioutil"
"github.com/satori/go.uuid"
)
// Structs
// ORSet conforms to the specification of an observed-
// removed set defined by Shapiro, Preguiça, Baquero,
// and Zawirski. It consists of unique IDs and data items.
type ORSet struct {
lock *sync.RWMutex
file *os.File
elements map[string]string
}
// sendFunc is used as a parameter to below defined
// AddSendDownstream function that broadcasts an update
// payload to all other replicas.
type sendFunc func(string)
// Functions
// InitORSet returns an empty initialized new
// observed-removed set.
func InitORSet() *ORSet {
return &ORSet{
lock: &sync.RWMutex{},
elements: make(map[string]string),
}
}
// InitORSetWithFile takes in a file name and initializes
// a new ORSet with opened file handler to that name as
// designated log file.
func InitORSetWithFile(fileName string) (*ORSet, error) {
// Attempt to create a new CRDT file.
f, err := os.Create(fileName)
if err != nil {
return nil, fmt.Errorf("opening CRDT file '%s' failed with: %v", fileName, err)
}
// Change permissions.
err = f.Chmod(0600)
if err != nil {
return nil, fmt.Errorf("changing permissions of CRDT file '%s' failed with: %v", fileName, err)
}
// Init an empty ORSet.
s := InitORSet()
s.file = f
// Write newly created CRDT file to stable storage.
if err = s.WriteORSetToFile(false); err != nil {
return nil, fmt.Errorf("error during CRDT file write-back: %v", err)
}
return s, nil
}
// InitORSetFromFile parses an ORSet found in the
// supplied file and returns it, initialized with
// elements saved in file.
func InitORSetFromFile(fileName string) (*ORSet, error) {
// Attempt to open CRDT file and assign to set afterwards.
f, err := os.OpenFile(fileName, os.O_RDWR, 0600)
if err != nil {
return nil, fmt.Errorf("opening CRDT file '%s' failed with: %v", fileName, err)
}
// Init an empty ORSet.
s := InitORSet()
s.file = f
// Parse contained CRDT state from file.
contentsRaw, err := ioutil.ReadAll(s.file)
if err != nil {
return nil, fmt.Errorf("reading all contents from CRDT file '%s' failed with: %v", fileName, err)
}
contents := string(contentsRaw)
// Account for an empty CRDT set which is valid.
if contents == "" {
return s, nil
}
// Split content at each ';' (semicolon).
parts := strings.Split(contents, ";")
// Check even number of elements.
if (len(parts) % 2) != 0 {
return nil, fmt.Errorf("odd number of elements in CRDT file '%s'", fileName)
}
// Range over all value-tag-pairs.
for value := 0; value < len(parts); value += 2 {
tag := value + 1
// Decode string from base64.
decValue, err := base64.StdEncoding.DecodeString(parts[value])
if err != nil {
return nil, fmt.Errorf("decoding base64 string in CRDT file '%s' failed: %v", fileName, err)
}
// Assign decoded value to corresponding
// tag in elements set.
s.elements[parts[tag]] = string(decValue)
}
return s, nil
}
// WriteORSetToFile saves an active ORSet onto
// stable storage at location from initialization.
// This allows for a CRDT ORSet to be made persistent
// and later be resumed from prior state.
func (s *ORSet) WriteORSetToFile(needsLocking bool) error {
if needsLocking {
// Write-lock the set and unlock on any exit.
s.lock.Lock()
defer s.lock.Unlock()
}
marshalled := ""
for tag, valueRaw := range s.elements {
// Encode value in base64 encoding.
value := base64.StdEncoding.EncodeToString([]byte(valueRaw))
// Append value and tag to write-out file.
if marshalled == "" {
marshalled = fmt.Sprintf("%v;%s", value, tag)
} else {
marshalled = fmt.Sprintf("%s;%v;%s", marshalled, value, tag)
}
}
// Reset position in file to beginning.
_, err := s.file.Seek(0, os.SEEK_SET)
if err != nil {
return fmt.Errorf("error while setting head back to beginning in CRDT file '%s': %v", s.file.Name(), err)
}
// Write marshalled set to file.
newNumOfBytes, err := s.file.WriteString(marshalled)
if err != nil {
return fmt.Errorf("failed to write ORSet contents to file '%s': %v", s.file.Name(), err)
}
// Adjust file size to just written length of string.
if err := s.file.Truncate(int64(newNumOfBytes)); err != nil {
return fmt.Errorf("error while truncating CRDT file '%s' to new size: %v", s.file.Name(), err)
}
// Save to stable storage.
if err := s.file.Sync(); err != nil {
return fmt.Errorf("could not synchronise CRDT file '%s' contents to stable storage: %v", s.file.Name(), err)
}
return nil
}
// GetAllValues returns all distinct values
// of a supplied ORSet.
func (s *ORSet) GetAllValues() []string {
// Read-lock set and unlock on exit.
s.lock.RLock()
defer s.lock.RUnlock()
// Make a slice of initial size 0.
allValues := make([]string, 0)
// Also prepare a map to store which elements
// we already considered.
seenValues := make(map[string]bool)
for _, value := range s.elements {
// Check if we did not yet considered this value.
if _, seen := seenValues[value]; seen != true {
// If so, append it and set seen value to true.
allValues = append(allValues, value)
seenValues[value] = true
}
}
// Sort slice of strings.
sort.Strings(allValues)
return allValues
}
// Lookup cycles through elements in ORSet and
// returns true if element e is present and
// false otherwise.
func (s *ORSet) Lookup(e string, needsLocking bool) bool {
if needsLocking {
// Read-lock set and unlock on exit.
s.lock.RLock()
defer s.lock.RUnlock()
}
for _, value := range s.elements {
// When we find the value while iterating
// through set, we return true and end loop
// execution at this point.
if e == value {
return true
}
}
return false
}
// AddEffect is the effect part of an update add operation
// defined by the specification. It is executed by all
// replicas of the data set including the source node. It
// inserts given element and tag into the set representation.
func (s *ORSet) AddEffect(e string, tag string, needsLocking bool, needsWriteBack bool) error {
if needsLocking {
// Write-lock set and unlock on exit.
s.lock.Lock()
defer s.lock.Unlock()
}
// Insert data element e at key tag.
s.elements[tag] = e
if !needsWriteBack {
return nil
}
// Instructed to write changes back to file.
err := s.WriteORSetToFile(false)
if err != nil {
// Error during write-back to stable storage.
// Prepare remove set consistent of just added element.
rSet := make(map[string]string)
rSet[tag] = e
// Revert just made changes.
s.RemoveEffect(rSet, false, false)
return fmt.Errorf("error during writing CRDT file back: %v", err)
}
return nil
}
// Add is a helper function only to be executed at the
// source node of an update. It executes the prepare and
// effect update parts of an add operation. Afterwards,
// the update instruction is send downstream to all other
// replicas via the send function which takes care of the
// reliable causally-ordered broadcast.
func (s *ORSet) Add(e string, send sendFunc) error {
// Create a new unique tag.
tag := uuid.NewV4().String()
// Write-lock set and unlock on exit.
s.lock.Lock()
defer s.lock.Unlock()
// Apply effect part of update add. Do not lock
// structure but write changes back to stable storage.
err := s.AddEffect(e, tag, false, true)
if err != nil {
return err
}
// Send to other involved nodes.
send(fmt.Sprintf("%v;%s", base64.StdEncoding.EncodeToString([]byte(e)), tag))
return nil
}
// RemoveEffect is the effect part of an update remove
// operation defined by the specification. It is executed
// by all replicas of the data set including the source node.
// It removes supplied set of tags from the ORSet's set.
func (s *ORSet) RemoveEffect(rSet map[string]string, needsLocking bool, needsWriteBack bool) error {
if needsLocking {
// Write-lock set and unlock on exit.
s.lock.Lock()
defer s.lock.Unlock()
}
// Range over set of received tags to-be-deleted.
for rTag := range rSet {
// Each time we see such tag in this replica's
// set, we delete it.
if _, found := s.elements[rTag]; found {
delete(s.elements, rTag)
}
}
if !needsWriteBack {
return nil
}
// Instructed to write changes back to file.
err := s.WriteORSetToFile(false)
if err != nil {
// Error during write-back to stable storage.
// Revert just made changes.
for tag, value := range rSet {
s.AddEffect(value, tag, false, false)
}
return fmt.Errorf("error during writing CRDT file back: %v", err)
}
return nil
}
// Remove is a helper function only to be executed
// by the source node of an update remove operation.
// It first handles the prepare part by checking the
// deletion precondition and creating a remove set
// and afterwards executes the effect part locally and
// sends out the remove message to all other replicas.
func (s *ORSet) Remove(e string, send sendFunc) error {
// Write-lock set and unlock on exit.
s.lock.Lock()
defer s.lock.Unlock()
// Check precondition: is element present in set?
if s.Lookup(e, false) != true {
return fmt.Errorf("element to be removed not found in set")
}
// Initialize string to send out.
var msg string
// Initialize set of elements to remove.
rmElements := make(map[string]string)
// Otherwise range over set elements.
for tag, value := range s.elements {
if e == value {
// If we see the element to-be-deleted, we add
// the associated tag into our prepared remove set.
rmElements[tag] = e
// And we also append it to the message that will
// be sent out to other replicas.
if msg == "" {
msg = fmt.Sprintf("%v;%s", base64.StdEncoding.EncodeToString([]byte(e)), tag)
} else {
msg = fmt.Sprintf("%s;%v;%s", msg, base64.StdEncoding.EncodeToString([]byte(e)), tag)
}
}
}
// Execute the effect part of the update remove but do
// not lock the set structure as we already maintain a lock.
// Also, write changes back to stable storage.
if err := s.RemoveEffect(rmElements, false, true); err != nil {
return err
}
// Send message to other replicas.
send(msg)
return nil
}