-
Notifications
You must be signed in to change notification settings - Fork 473
/
consul.go
150 lines (122 loc) · 3.52 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------
package consul
import (
"encoding/json"
"fmt"
"github.com/agrea/ptr"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
// Consul is a state store implementation for HashiCorp Consul.
type Consul struct {
state.DefaultBulkStore
client *api.Client
keyPrefixPath string
logger logger.Logger
}
type consulConfig struct {
Datacenter string `json:"datacenter"`
HTTPAddr string `json:"httpAddr"`
ACLToken string `json:"aclToken"`
Scheme string `json:"scheme"`
KeyPrefixPath string `json:"keyPrefixPath"`
}
// NewConsulStateStore returns a new consul state store.
func NewConsulStateStore(logger logger.Logger) *Consul {
s := &Consul{logger: logger}
s.DefaultBulkStore = state.NewDefaultBulkStore(s)
return s
}
// Init does metadata and config parsing and initializes the
// Consul client
func (c *Consul) Init(metadata state.Metadata) error {
consulConfig, err := metadataToConfig(metadata.Properties)
if err != nil {
return fmt.Errorf("couldn't convert metadata properties: %s", err)
}
var keyPrefixPath string
if consulConfig.KeyPrefixPath == "" {
keyPrefixPath = "dapr"
}
config := &api.Config{
Datacenter: consulConfig.Datacenter,
Address: consulConfig.HTTPAddr,
Token: consulConfig.ACLToken,
Scheme: consulConfig.Scheme,
}
client, err := api.NewClient(config)
if err != nil {
return errors.Wrap(err, "initializing consul client")
}
c.client = client
c.keyPrefixPath = keyPrefixPath
return nil
}
// Features returns the features available in this state store
func (c *Consul) Features() []state.Feature {
// Etag is just returned and not handled in set or delete operations.
return nil
}
func metadataToConfig(connInfo map[string]string) (*consulConfig, error) {
b, err := json.Marshal(connInfo)
if err != nil {
return nil, err
}
var config consulConfig
err = json.Unmarshal(b, &config)
if err != nil {
return nil, err
}
return &config, nil
}
// Get retrieves a Consul KV item
func (c *Consul) Get(req *state.GetRequest) (*state.GetResponse, error) {
queryOpts := &api.QueryOptions{}
if req.Options.Consistency == state.Strong {
queryOpts.RequireConsistent = true
}
resp, queryMeta, err := c.client.KV().Get(fmt.Sprintf("%s/%s", c.keyPrefixPath, req.Key), queryOpts)
if err != nil {
return nil, err
}
if resp == nil {
return &state.GetResponse{}, nil
}
return &state.GetResponse{
Data: resp.Value,
ETag: ptr.String(queryMeta.LastContentHash),
}, nil
}
// Set saves a Consul KV item
func (c *Consul) Set(req *state.SetRequest) error {
var reqValByte []byte
b, ok := req.Value.([]byte)
if ok {
reqValByte = b
} else {
reqValByte, _ = json.Marshal(req.Value)
}
keyWithPath := fmt.Sprintf("%s/%s", c.keyPrefixPath, req.Key)
_, err := c.client.KV().Put(&api.KVPair{
Key: keyWithPath,
Value: reqValByte,
}, nil)
if err != nil {
return fmt.Errorf("couldn't set key %s: %s", keyWithPath, err)
}
return nil
}
// Delete performes a Consul KV delete operation
func (c *Consul) Delete(req *state.DeleteRequest) error {
keyWithPath := fmt.Sprintf("%s/%s", c.keyPrefixPath, req.Key)
_, err := c.client.KV().Delete(keyWithPath, nil)
if err != nil {
return fmt.Errorf("couldn't delete key %s: %s", keyWithPath, err)
}
return nil
}