/
cassandra.go
261 lines (215 loc) · 7.17 KB
/
cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package cassandra
import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
log "github.com/cihub/seelog"
"github.com/hailo-platform/H2O/service/config"
"github.com/hailo-platform/H2O/service/dns"
"github.com/hailo-platform/H2O/gossie/src/gossie"
)
var (
defaultPort = 9160
defaultHosts = []string{"localhost:" + strconv.Itoa(defaultPort)}
defaultTier = "general"
)
var (
mtx sync.RWMutex
pools map[string]gossie.ConnectionPool
retries int
readCl string
writeCl string
timeout time.Duration
nodes []string
once sync.Once
DefaultPoolConstructor func([]string, string, gossie.PoolOptions) (gossie.ConnectionPool, error) = gossie.NewConnectionPool
// Pointer to the underlying ConnectionPool constructor to use (used to switch out the pool implementation when
// mocking)
PoolConstructor func([]string, string, gossie.PoolOptions) (gossie.ConnectionPool, error) = DefaultPoolConstructor
// map of authentication options for each keyspace
auth map[string]*authenticationOptions
)
type authenticationOptions struct {
username string
password string
}
// String obfuscates our password when printing out the authenticationOptions struct
func (a *authenticationOptions) String() string {
return fmt.Sprintf("username: %s, password: xxxxxxxx", a.username)
}
func init() {
pools = make(map[string]gossie.ConnectionPool)
auth = make(map[string]*authenticationOptions)
}
func getHosts() []string {
cassandraHostKey := getCassandraHostConfigKey()
config.WaitUntilLoaded(5 * time.Second)
port := config.AtPath("hailo", "service", "cassandra", "defaults", "thriftPort").AsInt(defaultPort)
if hosts := config.AtPath("hailo", "service", "cassandra", cassandraHostKey).AsHostnameArray(port); len(hosts) > 0 {
return hosts
}
// No hosts returned: try DNS
tier := config.AtPath("hailo", "service", "cassandra", "tier").AsString("premium")
hosts, err := dns.Hosts("cassandra-" + tier)
if err != nil {
log.Errorf("Failed to load Cassandra hosts from dns: %v", err)
return defaultHosts
}
if len(hosts) == 0 {
return defaultHosts
}
// We need to append the port to hosts coming from DNS
for i, host := range hosts {
hosts[i] = host + fmt.Sprintf(":%d", port)
}
return hosts
}
// try to connect, then kick off listener for config changes
func setup() {
ch := config.SubscribeChanges()
go func() {
for {
<-ch
reconnectDefault()
}
}()
reconnectDefault()
}
func reconnectDefault() {
mtx.Lock()
defer mtx.Unlock()
log.Infof("Reloading cassandra configuration")
retries = config.AtPath("hailo", "service", "cassandra", "defaults", "maxRetries").AsInt(5)
readCl = config.AtPath("hailo", "service", "cassandra", "defaults", "readConsistencyLevel").AsString("ONE")
writeCl = config.AtPath("hailo", "service", "cassandra", "defaults", "writeConsistencyLevel").AsString("ONE")
timeout = config.AtPath("hailo", "service", "cassandra", "defaults", "recvTimeout").AsDuration("1s")
log.Debugf("Setting Cassandra defaults retries:%v, readCl: %v, writeCl: %v, timeout: %v from config", retries, readCl, writeCl, timeout)
nodes = getHosts()
log.Debugf("Setting Cassandra nodes %v from config", nodes)
// Set up authentication if enabled
if authEnabled := config.AtPath("hailo", "service", "cassandra", "authentication", "enabled").AsBool(); authEnabled {
// Get config as json as its effectively a map[string]map[string]string
authconfig := config.AtPath("hailo", "service", "cassandra", "authentication", "keyspaces").AsJson()
// Parse and set if successful
a, err := parseAuth(authconfig)
if err == nil {
auth = a
log.Debugf("Setting Cassandra authentication from config: %v", a)
} else {
log.Warnf("Failed to set Cassandra authentication from config: %v", err)
}
}
// Reset the pools map
pools = make(map[string]gossie.ConnectionPool)
}
// ConnectionPool yields a configured connection pool for the given keyspace
// This should be called on each use, as this will mean that you will always
// get an up-to-date connection pool (automatically updated if the config
// changes)
func ConnectionPool(ks string) (gossie.ConnectionPool, error) {
once.Do(setup)
if pool := getPool(ks); pool != nil {
return pool, nil
}
p, err := newPool(ks)
return p, err
}
func getPool(ks string) gossie.ConnectionPool {
mtx.RLock()
defer mtx.RUnlock()
if pool, ok := pools[ks]; ok {
return pool
}
return nil
}
func newPool(ks string) (gossie.ConnectionPool, error) {
mtx.Lock()
defer mtx.Unlock()
// double check existence now we have full lock
if pool, ok := pools[ks]; ok {
return pool, nil
}
opts := gossie.PoolOptions{
ReadConsistency: cl(readCl),
WriteConsistency: cl(writeCl),
Timeout: int(timeout.Nanoseconds() / int64(time.Millisecond)),
Retries: retries,
}
// Add authentication options if set for this keyspace
if a := auth[ks]; a != nil {
opts.Authentication = map[string]string{
"keyspace": ks,
"username": a.username,
"password": a.password,
}
}
log.Debugf("Initialising Cassandra connection pool for KS %s connecting to %v with options %v", ks, nodes, opts)
p, err := PoolConstructor(nodes, ks, opts)
if err != nil {
return nil, err
}
pools[ks] = p
return pools[ks], nil
}
func cl(val string) int {
switch val {
case "ONE":
return gossie.CONSISTENCY_ONE
case "LOCAL_QUORUM":
return gossie.CONSISTENCY_LOCAL_QUORUM
case "QUORUM":
return gossie.CONSISTENCY_QUORUM
case "EACH_QUORUM":
return gossie.CONSISTENCY_EACH_QUORUM
case "ALL":
return gossie.CONSISTENCY_ALL
case "TWO":
return gossie.CONSISTENCY_TWO
case "THREE":
return gossie.CONSISTENCY_THREE
}
return gossie.CONSISTENCY_DEFAULT
}
// getCassandraHostConfigPath gets the config key that should be used to load cassandra hosts
// this is to support multiple 'tiered' cassandra clusters
func getCassandraHostConfigKey() string {
// Check what cluster are we supposed to contact, revert to the default config if not specified
tier := config.AtPath("hailo", "service", "cassandra", "tier").AsString(defaultTier)
if tier == "" {
tier = defaultTier
}
log.Debugf("Attempting to connect to the %v Cassandra cluster", tier)
var cassandraHosts string
switch tier {
case "general":
cassandraHosts = "hosts"
default:
cassandraHosts = fmt.Sprintf("%sHosts", tier)
}
return cassandraHosts
}
// parseAuth converts our config []byte to a map of authentication options per keyspace
func parseAuth(js []byte) (map[string]*authenticationOptions, error) {
ret := make(map[string]*authenticationOptions)
// Blank config = no auth
if len(js) == 0 {
return ret, nil
}
// Unmarshal json to intermediate structure
c := make(map[string]map[string]string)
if err := json.Unmarshal(js, &c); err != nil {
log.Warnf("Failed to unmarshal cassandra authentication configuration: %v", err) // don't log data in an attempt to not log our passwords...
return ret, err
}
// Pull out auth for each configured keyspace
for ks, v := range c {
ret[ks] = &authenticationOptions{
username: v["username"],
password: v["password"],
}
log.Debugf("[Cassandra] Authentication options for keyspace '%s' loaded: %v", ks, ret[ks])
}
return ret, nil
}