forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consistent_watchable_store.go
141 lines (121 loc) · 4.35 KB
/
consistent_watchable_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"encoding/binary"
"log"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/storage/backend"
"github.com/coreos/etcd/storage/storagepb"
)
var (
consistentIndexKeyName = []byte("consistent_index")
)
// ConsistentIndexGetter is an interface that wraps the Get method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexGetter interface {
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64
}
type consistentWatchableStore struct {
*watchableStore
// The field is used to get the consistent index of current
// executing entry.
// When the store finishes executing current entry, it will
// put the index got from ConsistentIndexGetter into the
// underlying backend. This helps to recover consistent index
// when restoring.
ig ConsistentIndexGetter
skip bool // indicate whether or not to skip an operation
}
func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newConsistentWatchableStore(b, le, ig)
}
// newConsistentWatchableStore creates a new consistentWatchableStore with the give
// backend.
func newConsistentWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *consistentWatchableStore {
return &consistentWatchableStore{
watchableStore: newWatchableStore(b, le),
ig: ig,
}
}
func (s *consistentWatchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
id := s.TxnBegin()
rev, err := s.TxnPut(id, key, value, lease)
if err != nil {
log.Panicf("unexpected TxnPut error (%v)", err)
}
if err := s.TxnEnd(id); err != nil {
log.Panicf("unexpected TxnEnd error (%v)", err)
}
return rev
}
func (s *consistentWatchableStore) DeleteRange(key, end []byte) (n, rev int64) {
id := s.TxnBegin()
n, rev, err := s.TxnDeleteRange(id, key, end)
if err != nil {
log.Panicf("unexpected TxnDeleteRange error (%v)", err)
}
if err := s.TxnEnd(id); err != nil {
log.Panicf("unexpected TxnEnd error (%v)", err)
}
return n, rev
}
func (s *consistentWatchableStore) TxnBegin() int64 {
id := s.watchableStore.TxnBegin()
// If the consistent index of executing entry is not larger than store
// consistent index, skip all operations in this txn.
s.skip = s.ig.ConsistentIndex() <= s.consistentIndex()
if !s.skip {
// TODO: avoid this unnecessary allocation
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, s.ig.ConsistentIndex())
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
s.watchableStore.store.tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
return id
}
func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
if s.skip {
return nil, 0, nil
}
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
}
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
if s.skip {
return 0, nil
}
return s.watchableStore.TxnPut(txnID, key, value, lease)
}
func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
if s.skip {
return 0, 0, nil
}
return s.watchableStore.TxnDeleteRange(txnID, key, end)
}
func (s *consistentWatchableStore) TxnEnd(txnID int64) error {
// reset skip var
s.skip = false
return s.watchableStore.TxnEnd(txnID)
}
func (s *consistentWatchableStore) consistentIndex() uint64 {
// get the index
// tx has been locked in TxnBegin, so there is no need to lock it again
_, vs := s.watchableStore.store.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
return binary.BigEndian.Uint64(vs[0])
}