forked from asonawalla/gazette
-
Notifications
You must be signed in to change notification settings - Fork 3
/
key_values.go
172 lines (149 loc) · 6.61 KB
/
key_values.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package keyspace
import (
"bytes"
"fmt"
"sort"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)
// KeyValue composes a "raw" Etcd KeyValue with its user-defined,
// decoded representation.
type KeyValue struct {
Raw mvccpb.KeyValue
Decoded interface{}
}
// A KeyValueDecoder decodes raw KeyValue instances into a user-defined
// representation. KeyValueDecoder returns an error if the KeyValue cannot be
// decoded. KeySpace will log decoding errors and not incorporate them into
// the current KeyValues, but will also treat them as recoverable and in all
// cases seek to bring the KeyValues representation to consistency with Etcd.
// In practice, this means bad values written to Etcd which fail to decode
// may be later corrected with valid representations: KeySpace will ignore
// the bad update and then reflect the corrected one once available.
type KeyValueDecoder func(raw *mvccpb.KeyValue) (interface{}, error)
// KeyValues is a collection of KeyValue naturally ordered on keys.
type KeyValues []KeyValue
// Search returns the index at which |key| is found to be present,
// or should be inserted to maintain ordering.
func (kv KeyValues) Search(key string) (ind int, found bool) {
ind = sort.Search(len(kv), func(i int) bool {
return bytes.Compare([]byte(key), kv[i].Raw.Key) <= 0
})
found = ind != len(kv) && bytes.Equal([]byte(key), kv[ind].Raw.Key)
return
}
// Range returns the sub-slice of KeyValues spanning range [from, to).
func (kv KeyValues) Range(from, to string) KeyValues {
var ind, _ = kv.Search(from)
var tmp = kv[ind:]
ind, _ = tmp.Search(to)
return tmp[:ind]
}
// Prefixed returns the sub-slice of KeyValues prefixed by |prefix|.
func (kv KeyValues) Prefixed(prefix string) KeyValues {
return kv.Range(prefix, clientv3.GetPrefixRangeEnd(prefix))
}
// Copy returns a deep-copy of the KeyValues.
func (kv KeyValues) Copy() KeyValues {
var out = make(KeyValues, len(kv))
for i, kv := range kv {
out[i] = kv
}
return out
}
// appendKeyValue attempts to decode and append the KeyValue to this KeyValues,
// or returns a decoding error. The appended KeyValue must order after all other
// keys, or appendKeyValue panics.
func appendKeyValue(kv KeyValues, decode KeyValueDecoder, cur *mvccpb.KeyValue) (KeyValues, error) {
if len(kv) != 0 && bytes.Compare((kv)[len(kv)-1].Raw.Key, cur.Key) != -1 {
panic("invalid key ordering")
} else if decoded, err := decode(cur); err != nil {
return kv, err
} else {
return append(kv, KeyValue{Raw: *cur, Decoded: decoded}), nil
}
}
// updateKeyValuesTail updates the tail of KeyValues with the decoded event, and/or
// returns an error describing the event's inconsistency. |event| must reference
// a key which is the current tail of |kv|, or is ordered after the current tail,
// or updateKeyValuesTail panics. When returning errors indicating inconsistencies,
// in many cases events are still applied, in a best-effort attempt to keep KeyValues
// consistent in spite of the potential for KeyValueDecoder errors. In other
// words, human errors in crafting values which fail to decode must not break the
// overall consistency of a KeyValues instance, updated incrementally over time.
func updateKeyValuesTail(kv KeyValues, decode KeyValueDecoder, event clientv3.Event) (KeyValues, error) {
var tail, cmp int
if tail = len(kv) - 1; tail >= 0 {
cmp = bytes.Compare(kv[tail].Raw.Key, event.Kv.Key)
} else {
cmp = -1 // Event key is ordered after empty KeyValues.
}
if cmp > 0 {
panic("invalid key ordering (tail is ordered after event key)")
} else if cmp < 0 {
// Event key is ordered after |tail|.
if event.Type == clientv3.EventTypeDelete {
// This case can happen if a key with a bad value (which failed to decode,
// and was not applied) is subsequently deleted. Ignoring the deletion
// brings KeyValues back to consistency.
return kv, fmt.Errorf("unexpected deletion of unknown key")
} else if event.Type != clientv3.EventTypePut {
// DELETE & PUT are the only defined types in Etcd's `mvccpb` package kv.proto.
panic(event.Type)
}
if decoded, err := decode(event.Kv); err != nil {
return kv, err
} else {
// Append a new value.
kv = append(kv, KeyValue{Raw: *event.Kv, Decoded: decoded})
}
if event.Kv.CreateRevision != event.Kv.ModRevision {
// Etcd creation events have matched Create & Mod revisions. We might then
// expect this key to be present in the KeyValues. However, a creation with
// a bad value (which is not applied) may be fixed by a future modification,
// in which case the revisions may differ. Applying the update as if it were
// a creation brings KeyValues back to consistency.
return kv, fmt.Errorf("unexpected modification of unknown key (applied anyway)")
} else if event.Kv.Version != 1 {
// Etcd Versions should always be 1 at creation; this case should really never happen.
return kv, fmt.Errorf("unexpected Version of created key (should be 1; applied anyway)")
}
return kv, nil
}
if kv[tail].Raw.ModRevision >= event.Kv.ModRevision {
// This represents a replay of an old event, and should never happen (since
// Events are observed with strictly monotonic Raft Revisions). Ignore the
// update, as applying old data would make KeyValues inconsistent.
return kv, fmt.Errorf(
"unexpected ModRevision (it's too small; prev is %s)", kv[tail].Raw.String())
}
if event.Type == clientv3.EventTypeDelete {
// Remove the current tail.
return kv[:tail], nil
} else if event.Type != clientv3.EventTypePut {
panic(event.Type) // Only DELETE and PUT are defined.
}
var decoded, err = decode(event.Kv)
if err != nil {
return kv, err
}
if kv[tail].Raw.CreateRevision != event.Kv.CreateRevision {
// This case is possible only if an interleaved deletion of this key was
// somehow missed (not sent by the Etcd server), and should never happen.
// If it does, apply the event anyway as it still reflects the latest value
// for this key.
err = fmt.Errorf(
"unexpected CreateRevision (should be equal; applied anyway; prev is %s)", kv[tail].Raw.String())
} else if kv[tail].Raw.Version+1 != event.Kv.Version {
// This case is possible if an interleaved modification of this key was
// somehow missed (not sent by the Etcd server), OR (and far more likely),
// an existing key was modified with a bad value, which was not applied,
// and we're now patching a subsequent correction. In this case, Version
// will appear to skip a value.
err = fmt.Errorf(
"unexpected Version (should be monotonic; applied anyway; prev is %s)", kv[tail].Raw.String())
}
// Update existing value at |ind|.
kv[tail] = KeyValue{Raw: *event.Kv, Decoded: decoded}
return kv, err
}