/
kv.go
99 lines (88 loc) · 2.68 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package etcdserver
import (
"context"
"fmt"
"time"
"github.com/api7/gopkg/pkg/log"
"go.etcd.io/etcd/api/v3/etcdserverpb"
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
"go.uber.org/zap"
)
func (k *EtcdServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*etcdserverpb.RangeResponse, error) {
var rev int64
etcdKvs := []*mvccpb.KeyValue{}
if len(r.RangeEnd) > 0 {
revision, kvs, _ := k.backend.List(ctx, string(r.Key), string(r.Key), r.Limit, time.Now().Unix())
for _, kv := range kvs {
etcdKvs = append(etcdKvs, &mvccpb.KeyValue{
Key: []byte(kv.Key),
Value: kv.Value,
ModRevision: kv.ModRevision,
CreateRevision: kv.CreateRevision,
Lease: kv.Lease,
})
}
rev = revision
} else {
revision, kv, err := k.backend.Get(ctx, string(r.Key), "0", 0, time.Now().Unix())
if err != nil {
return nil, err
}
if kv != nil {
etcdKvs = append(etcdKvs, &mvccpb.KeyValue{
Key: []byte(kv.Key),
Value: kv.Value,
ModRevision: kv.ModRevision,
CreateRevision: kv.CreateRevision,
Lease: kv.Lease,
})
}
rev = revision
}
return &etcdserverpb.RangeResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: rev,
},
Kvs: etcdKvs,
Count: int64(len(etcdKvs)),
}, nil
}
func (k *EtcdServer) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) {
key := string(r.Key)
var (
rev int64
err error
)
_, kv, _ := k.backend.Get(ctx, key, "0", 0, time.Now().Unix())
if kv != nil {
revision, _, _, rerr := k.backend.Update(ctx, key, r.Value, kv.ModRevision, 0)
log.Debugw("update", zap.String("key", key), zap.String("value", string(r.Value)), zap.Int64("revision", revision), zap.Error(rerr))
err = rerr
rev = revision
} else {
revision, rerr := k.backend.Create(ctx, key, r.Value, 0)
log.Debugw("create", zap.String("key", key), zap.String("value", string(r.Value)), zap.Int64("revision", revision), zap.Error(rerr))
err = rerr
rev = revision
}
return &etcdserverpb.PutResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: rev,
},
}, err
}
// Only one deletion is supported, and range deletion is not supported.
// TODO: support delete range
func (k *EtcdServer) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) {
if r.RangeEnd != nil {
return nil, fmt.Errorf("delete range is not supported")
}
_, prevKV, _ := k.backend.Get(ctx, string(r.Key), "0", 0, 0)
rev, _, _, _ := k.backend.Delete(ctx, string(r.Key), prevKV.ModRevision)
return &etcdserverpb.DeleteRangeResponse{
Header: &etcdserverpb.ResponseHeader{
Revision: rev,
},
Deleted: 1,
}, nil
}