Skip to content

Commit

Permalink
fix panic when using cassandra in multiple periodic configs or as a s…
Browse files Browse the repository at this point in the history
…tore for both index and delete requests (grafana#2774)

* add purpose label to metric being used for tracing cassandra session and relevant tests

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* update changelog

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* passing registerer wrapped with purpose to index client factory functions

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* add registerer for metrics in dynamodb clients

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* minor nit

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* changes suggested from PR review

Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
sandeepsukhani and pracucci authored Jul 23, 2020
1 parent b9064e4 commit 32b5a9b
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 117 deletions.
60 changes: 60 additions & 0 deletions aws/dynamodb_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package aws

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)

type dynamoDBMetrics struct {
dynamoRequestDuration *instrument.HistogramCollector
dynamoConsumedCapacity *prometheus.CounterVec
dynamoThrottled *prometheus.CounterVec
dynamoFailures *prometheus.CounterVec
dynamoDroppedRequests *prometheus.CounterVec
dynamoQueryPagesCount prometheus.Histogram
}

func newMetrics(r prometheus.Registerer) *dynamoDBMetrics {
m := dynamoDBMetrics{}

m.dynamoRequestDuration = instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_request_duration_seconds",
Help: "Time spent doing DynamoDB requests.",

// DynamoDB latency seems to range from a few ms to a several seconds and is
// important. So use 9 buckets from 1ms to just over 1 minute (65s).
Buckets: prometheus.ExponentialBuckets(0.001, 4, 9),
}, []string{"operation", "status_code"}))
m.dynamoConsumedCapacity = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_consumed_capacity_total",
Help: "The capacity units consumed by operation.",
}, []string{"operation", tableNameLabel})
m.dynamoThrottled = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_throttled_total",
Help: "The total number of throttled events.",
}, []string{"operation", tableNameLabel})
m.dynamoFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_failures_total",
Help: "The total number of errors while storing chunks to the chunk store.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
m.dynamoDroppedRequests = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_dropped_requests_total",
Help: "The total number of requests which were dropped due to errors encountered from dynamo.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
m.dynamoQueryPagesCount = promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_query_pages_count",
Help: "Number of pages per query.",
// Most queries will have one page, however this may increase with fuzzy
// metric names.
Buckets: prometheus.ExponentialBuckets(1, 4, 6),
})

return &m
}
104 changes: 29 additions & 75 deletions aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,55 +50,6 @@ const (
validationException = "ValidationException"
)

var (
dynamoRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_request_duration_seconds",
Help: "Time spent doing DynamoDB requests.",

// DynamoDB latency seems to range from a few ms to a several seconds and is
// important. So use 9 buckets from 1ms to just over 1 minute (65s).
Buckets: prometheus.ExponentialBuckets(0.001, 4, 9),
}, []string{"operation", "status_code"}))
dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_consumed_capacity_total",
Help: "The capacity units consumed by operation.",
}, []string{"operation", tableNameLabel})
dynamoThrottled = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_throttled_total",
Help: "The total number of throttled events.",
}, []string{"operation", tableNameLabel})
dynamoFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_failures_total",
Help: "The total number of errors while storing chunks to the chunk store.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
dynamoDroppedRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_dropped_requests_total",
Help: "The total number of requests which were dropped due to errors encountered from dynamo.",
}, []string{tableNameLabel, errorReasonLabel, "operation"})
dynamoQueryPagesCount = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_query_pages_count",
Help: "Number of pages per query.",
// Most queries will have one page, however this may increase with fuzzy
// metric names.
Buckets: prometheus.ExponentialBuckets(1, 4, 6),
})
)

func init() {
dynamoRequestDuration.Register()
prometheus.MustRegister(dynamoConsumedCapacity)
prometheus.MustRegister(dynamoThrottled)
prometheus.MustRegister(dynamoFailures)
prometheus.MustRegister(dynamoQueryPagesCount)
prometheus.MustRegister(dynamoDroppedRequests)
}

// DynamoDBConfig specifies config for a DynamoDB database.
type DynamoDBConfig struct {
DynamoDB flagext.URLValue `yaml:"dynamodb_url"`
Expand Down Expand Up @@ -148,20 +99,22 @@ type dynamoDBStorageClient struct {
// of boilerplate.
batchGetItemRequestFn func(ctx context.Context, input *dynamodb.BatchGetItemInput) dynamoDBRequest
batchWriteItemRequestFn func(ctx context.Context, input *dynamodb.BatchWriteItemInput) dynamoDBRequest

metrics *dynamoDBMetrics
}

// NewDynamoDBIndexClient makes a new DynamoDB-backed IndexClient.
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) {
return newDynamoDBStorageClient(cfg, schemaCfg)
func NewDynamoDBIndexClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.IndexClient, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)
}

// NewDynamoDBChunkClient makes a new DynamoDB-backed chunk.Client.
func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (chunk.Client, error) {
return newDynamoDBStorageClient(cfg, schemaCfg)
func NewDynamoDBChunkClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (chunk.Client, error) {
return newDynamoDBStorageClient(cfg, schemaCfg, reg)
}

// newDynamoDBStorageClient makes a new DynamoDB-backed IndexClient and chunk.Client.
func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig) (*dynamoDBStorageClient, error) {
func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer) (*dynamoDBStorageClient, error) {
dynamoDB, err := dynamoClientFromURL(cfg.DynamoDB.URL)
if err != nil {
return nil, err
Expand All @@ -172,6 +125,7 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig)
schemaCfg: schemaCfg,
DynamoDB: dynamoDB,
writeThrottle: rate.NewLimiter(rate.Limit(cfg.ThrottleLimit), dynamoDBMaxWriteBatchSize),
metrics: newMetrics(reg),
}
client.batchGetItemRequestFn = client.batchGetItemRequest
client.batchWriteItemRequestFn = client.batchWriteItemRequest
Expand All @@ -187,9 +141,9 @@ func (a dynamoDBStorageClient) NewWriteBatch() chunk.WriteBatch {
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
}

func logWriteRetry(ctx context.Context, unprocessed dynamoDBWriteBatch) {
func logWriteRetry(unprocessed dynamoDBWriteBatch, metrics *dynamoDBMetrics) {
for table, reqs := range unprocessed {
dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
metrics.dynamoThrottled.WithLabelValues("DynamoDB.BatchWriteItem", table).Add(float64(len(reqs)))
for _, req := range reqs {
item := req.PutRequest.Item
var hash, rnge string
Expand Down Expand Up @@ -225,25 +179,25 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})

err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.BatchWriteItem", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return request.Send()
})
resp := request.Data().(*dynamodb.BatchWriteItemOutput)

for _, cc := range resp.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchWriteItem", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

if err != nil {
for tableName := range requests {
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem")
recordDynamoError(tableName, err, "DynamoDB.BatchWriteItem", a.metrics)
}

// If we get provisionedThroughputExceededException, then no items were processed,
// so back off and retry all.
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
logWriteRetry(ctx, requests)
logWriteRetry(requests, a.metrics)
unprocessed.TakeReqs(requests, -1)
_ = a.writeThrottle.WaitN(ctx, len(requests))
backoff.Wait()
Expand All @@ -256,7 +210,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
// to determine if a request was dropped (or not)
for tableName := range requests {
dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc()
a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchWriteItem").Inc()
}
continue
}
Expand All @@ -268,7 +222,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
// If there are unprocessed items, retry those items.
unprocessedItems := dynamoDBWriteBatch(resp.UnprocessedItems)
if len(unprocessedItems) > 0 {
logWriteRetry(ctx, unprocessedItems)
logWriteRetry(unprocessedItems, a.metrics)
_ = a.writeThrottle.WaitN(ctx, unprocessedItems.Len())
unprocessed.TakeReqs(unprocessedItems, -1)
}
Expand Down Expand Up @@ -329,11 +283,11 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery

pageCount := 0
defer func() {
dynamoQueryPagesCount.Observe(float64(pageCount))
a.metrics.dynamoQueryPagesCount.Observe(float64(pageCount))
}()

retryer := newRetryer(ctx, a.cfg.backoffConfig)
err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.QueryPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(innerCtx context.Context) error {
if sp := ot.SpanFromContext(innerCtx); sp != nil {
sp.SetTag("tableName", query.TableName)
sp.SetTag("hashValue", query.HashValue)
Expand All @@ -345,12 +299,12 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery
}

if cc := output.ConsumedCapacity; cc != nil {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

return callback(query, &dynamoDBReadResponse{items: output.Items})
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages"))
}, retryer.withRetries, withErrorHandler(query.TableName, "DynamoDB.QueryPages", a.metrics))
})
if err != nil {
return errors.Wrapf(err, "QueryPages error: table=%v", query.TableName)
Expand Down Expand Up @@ -481,19 +435,19 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})

err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
err := instrument.CollectedRequest(ctx, "DynamoDB.BatchGetItemPages", a.metrics.dynamoRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
return request.Send()
})
response := request.Data().(*dynamodb.BatchGetItemOutput)

for _, cc := range response.ConsumedCapacity {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName).
a.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.BatchGetItemPages", *cc.TableName).
Add(float64(*cc.CapacityUnits))
}

if err != nil {
for tableName := range requests {
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages")
recordDynamoError(tableName, err, "DynamoDB.BatchGetItemPages", a.metrics)
}

// If we get provisionedThroughputExceededException, then no items were processed,
Expand All @@ -509,7 +463,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c
// recording the drop counter separately from recordDynamoError(), as the error code alone may not provide enough context
// to determine if a request was dropped (or not)
for tableName := range requests {
dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc()
a.metrics.dynamoDroppedRequests.WithLabelValues(tableName, validationException, "DynamoDB.BatchGetItemPages").Inc()
}
continue
}
Expand Down Expand Up @@ -792,21 +746,21 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
}
}

func withErrorHandler(tableName, operation string) func(req *request.Request) {
func withErrorHandler(tableName, operation string, metrics *dynamoDBMetrics) func(req *request.Request) {
return func(req *request.Request) {
req.Handlers.CompleteAttempt.PushBack(func(req *request.Request) {
if req.Error != nil {
recordDynamoError(tableName, req.Error, operation)
recordDynamoError(tableName, req.Error, operation, metrics)
}
})
}
}

func recordDynamoError(tableName string, err error, operation string) {
func recordDynamoError(tableName string, err error, operation string, metrics *dynamoDBMetrics) {
if awsErr, ok := err.(awserr.Error); ok {
dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
metrics.dynamoFailures.WithLabelValues(tableName, awsErr.Code(), operation).Add(float64(1))
} else {
dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
metrics.dynamoFailures.WithLabelValues(tableName, otherError, operation).Add(float64(1))
}
}

Expand Down
Loading

0 comments on commit 32b5a9b

Please sign in to comment.