forked from hashicorp/consul
/
kvs_endpoint.go
135 lines (122 loc) · 3.33 KB
/
kvs_endpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package consul
import (
"fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"time"
)
// KVS endpoint is used to manipulate the Key-Value store
type KVS struct {
srv *Server
}
// Apply is used to apply a KVS request to the data store. This should
// only be used for operations that modify the data
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
// Verify the args
if args.DirEnt.Key == "" && args.Op != structs.KVSDeleteTree {
return fmt.Errorf("Must provide key")
}
// Apply the update
resp, err := k.srv.raftApply(structs.KVSRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: Apply failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Check if the return type is a bool
if respBool, ok := resp.(bool); ok {
*reply = respBool
}
return nil
}
// Get is used to lookup a single key
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args, args, reply); done {
return err
}
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSGet"),
func() error {
index, ent, err := state.KVSGet(args.Key)
if err != nil {
return err
}
if ent == nil {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
}
return nil
})
}
// List is used to list all keys with a given prefix
func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.List", args, args, reply); done {
return err
}
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSList"),
func() error {
index, ent, err := state.KVSList(args.Key)
if err != nil {
return err
}
if len(ent) == 0 {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
// Determine the maximum affected index
var maxIndex uint64
for _, e := range ent {
if e.ModifyIndex > maxIndex {
maxIndex = e.ModifyIndex
}
}
reply.Index = maxIndex
reply.Entries = ent
}
return nil
})
}
// ListKeys is used to list all keys with a given prefix to a seperator
func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error {
if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done {
return err
}
// Get the local state
state := k.srv.fsm.State()
return k.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("KVSListKeys"),
func() error {
var err error
reply.Index, reply.Keys, err = state.KVSListKeys(args.Prefix, args.Seperator)
return err
})
}