-
Notifications
You must be signed in to change notification settings - Fork 0
/
cassandra.go
649 lines (575 loc) · 16.6 KB
/
cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cassandra
import (
"context"
"reflect"
"time"
"github.com/uber/peloton/pkg/storage/objects/base"
"github.com/uber/peloton/pkg/storage/orm"
"github.com/gocql/gocql"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc/yarpcerrors"
)
const (
_defaultRetryTimeout = 50 * time.Millisecond
_defaultRetryAttempts = 5
useCasWrite = true
)
const (
// operation tags for metrics
create = "create"
cas = "cas"
get = "get"
getAll = "get_all"
getIter = "get_iter"
update = "update"
del = "delete"
// default limit for select statements.
_defaultQueryLimit = 1
_ignoredQueryLimit = 0
)
type cassandraConnector struct {
// implements orm.Connector interface
orm.Connector
// Session is the gocql session created for this connector
Session *gocql.Session
// scope is the storage scope for metrics
scope tally.Scope
// scope is the storage scope for success metrics
executeSuccessScope tally.Scope
// scope is the storage scope for failure metrics
executeFailScope tally.Scope
// Conf is the Cassandra connector config for this cluster
Conf *Config
}
// NewCassandraConnector initializes a Cassandra Connector
func NewCassandraConnector(
config *Config,
scope tally.Scope,
) (orm.Connector, error) {
session, err := CreateStoreSession(
config.CassandraConn, config.StoreName)
if err != nil {
return nil, err
}
// create a storeScope for the keyspace StoreName
storeScope := scope.SubScope("cql").Tagged(
map[string]string{"store": config.StoreName})
return &cassandraConnector{
Session: session,
scope: storeScope,
executeSuccessScope: storeScope.Tagged(
map[string]string{"result": "success"}),
executeFailScope: storeScope.Tagged(
map[string]string{"result": "fail"}),
Conf: config,
}, nil
}
// ensure that implementation (cassandraConnector) satisfies the interface
var _ orm.Connector = (*cassandraConnector)(nil)
// getGocqlErrorTag gets a error tag for metrics based on gocql error
// We cannot just use err.Error() as a tag because it contains invalid
// characters like = : etc. which will be rejected by M3
func getGocqlErrorTag(err error) string {
if yarpcerrors.IsAlreadyExists(err) {
return "already_exists"
}
if yarpcerrors.IsNotFound(err) {
return "not_found"
}
switch err.(type) {
case *gocql.RequestErrReadFailure:
return "read_failure"
case *gocql.RequestErrWriteFailure:
return "write_failure"
case *gocql.RequestErrAlreadyExists:
return "already_exists"
case *gocql.RequestErrReadTimeout:
return "read_timeout"
case *gocql.RequestErrWriteTimeout:
return "write_timeout"
case *gocql.RequestErrUnavailable:
return "unavailable"
case *gocql.RequestErrFunctionFailure:
return "function_failure"
case *gocql.RequestErrUnprepared:
return "unprepared"
default:
return "unknown"
}
}
// buildResultRow is used to allocate memory for the row to be populated by
// Cassandra read operation based on what object fields are being read
func buildResultRow(e *base.Definition, columns []string) []interface{} {
results := make([]interface{}, len(columns))
timeType := reflect.ValueOf(time.Now())
gocqlUUIDType := reflect.ValueOf(gocql.UUIDFromTime(time.Now()))
for i, column := range columns {
// get the type of the field from the ColumnToType mapping for object
// That we we can allocate appropriate memory for this field
typ := e.ColumnToType[column]
switch typ.Kind() {
case reflect.String:
var value *string
results[i] = &value
case reflect.Int32, reflect.Uint32, reflect.Int:
// C* internally uses int and int64
var value *int
results[i] = &value
case reflect.Int64, reflect.Uint64:
// C* internally uses int and int64
var value *int64
results[i] = &value
case reflect.Bool:
var value *bool
results[i] = &value
case reflect.Slice:
var value *[]byte
results[i] = &value
case timeType.Kind():
var value *time.Time
results[i] = &value
case gocqlUUIDType.Kind():
var value *gocql.UUID
results[i] = &value
case reflect.Ptr:
// Special case for custom optional string type:
// string type used in Cassandra
// converted to/from custom type in ORM layer
if typ == reflect.TypeOf(&base.OptionalString{}) {
var value *string
results[i] = &value
break
}
// Special case for custom optional int type:
// int64 type used in Cassandra
// converted to/from custom type in ORM layer
if typ == reflect.TypeOf(&base.OptionalUInt64{}) {
var value *int64
results[i] = &value
break
}
// for unrecognized pointer types, fall back to default logging
fallthrough
default:
// This should only happen if we start using a new cassandra type
// without adding to the translation layer
log.WithFields(log.Fields{"type": typ.Kind(), "column": column}).
Infof("type not found")
}
}
return results
}
// getRowFromResult translates a row read from Cassandra into a list of
// base.Column to be interpreted by base store client
func getRowFromResult(
e *base.Definition, columnNames []string, columnVals []interface{},
) []base.Column {
row := make([]base.Column, 0, len(columnNames))
for i, columnName := range columnNames {
// construct a list of column objects from the lists of column names
// and values that were returned by the cassandra query
column := base.Column{
Name: columnName,
}
switch rv := columnVals[i].(type) {
case **int:
column.Value = *rv
case **int64:
column.Value = *rv
case **string:
column.Value = *rv
case **gocql.UUID:
column.Value = *rv
case **time.Time:
column.Value = *rv
case **bool:
column.Value = *rv
case **[]byte:
column.Value = *rv
default:
// This should only happen if we start using a new cassandra type
// without adding to the translation layer
log.WithFields(log.Fields{
"data": columnVals[i],
"column": columnName}).Infof("type not found")
}
row = append(row, column)
}
return row
}
// splitColumnNameValue is used to return list of column names and list of their
// corresponding value. Order is very important in this lists as they will be
// used separately when constructing the CQL query.
func splitColumnNameValue(row []base.Column) (
colNames []string, colValues []interface{}) {
// Split row into two lists of column names and column values.
// So for a location `i` in the list, the colNames[i] and colValues[i] will
// represent row[i]
for _, column := range row {
colNames = append(colNames, column.Name)
colValues = append(colValues, column.Value)
}
return colNames, colValues
}
// Create creates a new row in DB if it already doesn't exist. Uses CAS write.
func (c *cassandraConnector) CreateIfNotExists(
ctx context.Context,
e *base.Definition,
row []base.Column,
) error {
return c.create(ctx, e, row, useCasWrite)
}
// Create creates a new row in DB.
func (c *cassandraConnector) Create(
ctx context.Context,
e *base.Definition,
row []base.Column,
) error {
return c.create(ctx, e, row, !useCasWrite)
}
func (c *cassandraConnector) create(
ctx context.Context,
e *base.Definition,
row []base.Column,
casWrite bool,
) error {
// split row into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
colNames, colValues := splitColumnNameValue(row)
// Prepare insert statement
stmt, err := InsertStmt(
Table(e.Name),
Columns(colNames),
Values(colValues),
IfNotExist(casWrite),
)
if err != nil {
return err
}
operation := create
if casWrite {
operation = cas
}
q := c.Session.Query(stmt, colValues...).WithContext(ctx)
if casWrite {
applied, err := q.MapScanCAS(map[string]interface{}{})
if err != nil {
sendCounters(c.executeFailScope, e.Name, operation, err)
return err
}
if !applied {
return yarpcerrors.AlreadyExistsErrorf("item already exists")
}
} else {
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, operation, err)
return err
}
}
sendLatency(c.scope, e.Name, operation, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, operation, nil)
return nil
}
// buildSelectQuery builds a select query using base object and key columns.
// If limit is non-zero, it will be enforced in the select query.
// If limit is 0, the select query will fetch all rows that match.
func (c *cassandraConnector) buildSelectQuery(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
colNamesToRead []string,
limit int,
) (*gocql.Query, error) {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// Prepare select statement
stmt, err := SelectStmt(
Table(e.Name),
Columns(colNamesToRead),
Conditions(keyColNames),
Limit(limit),
)
if err != nil {
return nil, err
}
return c.Session.Query(stmt, keyColValues...).WithContext(ctx), nil
}
// Get fetches a record from DB using primary keys
// returns a map describing a row from DB, key is columnName,
// value is columnValue.
func (c *cassandraConnector) Get(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
colNamesToRead ...string,
) (map[string]interface{}, error) {
var result []map[string]interface{}
if len(colNamesToRead) == 0 {
colNamesToRead = e.GetColumnsToRead()
}
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_defaultQueryLimit)
if err != nil {
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
result, err = cqlIter.SliceMap()
if err != nil {
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, errors.Wrap(err, "SliceMap failed")
}
if len(result) > 1 {
log.WithField("rows len",
len(result)).Info("Get SliceMap returns more than 1 row")
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, nil
}
if len(result) == 0 {
return nil, nil
}
// at this stage, we know result should be an array size of 1
c.processDBData(result)
sendLatency(c.scope, e.Name, get, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, get, err)
return result[0], err
}
// GetAll fetches all rows from DB using partition keys
// returns an array of map[string]interface{}
// the key of the map is the columnName, the value of the map is ColumnValue
func (c *cassandraConnector) GetAll(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) ([]map[string]interface{}, error) {
var result []map[string]interface{}
colNamesToRead := e.GetColumnsToRead()
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_ignoredQueryLimit)
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
defer cqlIter.Close()
result, err = cqlIter.SliceMap()
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
return nil, errors.Wrap(err, "SliceMap failed")
}
c.processDBData(result)
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
} else {
sendCounters(c.executeSuccessScope, e.Name, getAll, err)
}
return result, err
}
func (c *cassandraConnector) processDBData(
result []map[string]interface{}) {
for i, mapItem := range result {
for k, v := range mapItem {
switch v.(type) {
case gocql.UUID:
result[i][k] = v.(gocql.UUID).String()
// C* internally uses int and int64
// ORM object use uint32 or uint64
case int:
result[i][k] = uint32(v.(int))
case int64:
result[i][k] = uint64(v.(int64))
default:
continue
}
}
}
}
// GetAllIter gives an iterator to fetch all rows from DB
func (c *cassandraConnector) GetAllIter(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) (iter orm.Iterator, err error) {
colNamesToRead := e.GetColumnsToRead()
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_ignoredQueryLimit)
if err != nil {
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
sendLatency(c.scope, e.Name, getIter, time.Duration(q.Latency()))
return newIterator(
e,
colNamesToRead,
c.executeSuccessScope,
c.executeFailScope,
cqlIter,
), nil
}
// Delete deletes a record from DB using primary keys
func (c *cassandraConnector) Delete(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) error {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// Prepare delete statement
stmt, err := DeleteStmt(
Table(e.Name),
Conditions(keyColNames),
)
if err != nil {
return err
}
q := c.Session.Query(stmt, keyColValues...).WithContext(ctx)
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, del, err)
return err
}
sendLatency(c.scope, e.Name, del, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, del, nil)
return nil
}
// Update updates an existing row in DB.
func (c *cassandraConnector) Update(
ctx context.Context,
e *base.Definition,
row []base.Column,
keyCols []base.Column,
) error {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// split row into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
colNames, colValues := splitColumnNameValue(row)
// Prepare update statement
stmt, err := UpdateStmt(
Table(e.Name),
Updates(colNames),
Conditions(keyColNames),
)
if err != nil {
return err
}
// list of values to be supplied in the query
updateVals := append(colValues, keyColValues...)
q := c.Session.Query(
stmt, updateVals...).WithContext(ctx)
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, update, err)
return err
}
sendLatency(c.scope, e.Name, update, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, update, nil)
return nil
}
// cassandraIterator implements interface Iterator for Cassandra
type cassandraIterator struct {
cqlIter *gocql.Iter
tableDef *base.Definition
colNamesToRead []string
successScope tally.Scope
failScope tally.Scope
}
// ensure that implementation (cassandraIterator) satisfies the interface
var _ orm.Iterator = (*cassandraIterator)(nil)
func newIterator(
e *base.Definition,
cols []string,
successScope tally.Scope,
failScope tally.Scope,
cqlIter *gocql.Iter,
) *cassandraIterator {
return &cassandraIterator{
cqlIter: cqlIter,
tableDef: e,
successScope: successScope,
failScope: failScope,
colNamesToRead: cols,
}
}
func (iter *cassandraIterator) Close() {
iter.cqlIter.Close()
}
func (iter *cassandraIterator) Next() ([]base.Column, error) {
result := buildResultRow(iter.tableDef, iter.colNamesToRead)
if iter.cqlIter.Scan(result...) {
row := getRowFromResult(iter.tableDef, iter.colNamesToRead, result)
return row, nil
}
// Either end-of-results or error
if errors := iter.cqlIter.Close(); errors != nil {
sendCounters(iter.failScope, iter.tableDef.Name, getIter, errors)
return nil, errors
}
sendCounters(iter.successScope, iter.tableDef.Name, getIter, nil)
return nil, nil
}
// helper function to record call latency metric
func sendLatency(
scope tally.Scope,
table, operation string,
d time.Duration,
) {
s := scope.Tagged(map[string]string{
"table": table,
"operation": operation,
})
s.Timer("execute_latency").Record(d)
}
// helper function to record cql query success/failure metrics
func sendCounters(
scope tally.Scope,
table, operation string,
err error,
) {
errMsg := "none"
if err != nil {
errMsg = getGocqlErrorTag(err)
}
s := scope.Tagged(map[string]string{
"table": table,
"operation": operation,
"error": errMsg,
})
s.Counter("execute").Inc(1)
}