Skip to content

Commit

Permalink
Merge pull request #131 from weaveworks/dynamo-limits
Browse files Browse the repository at this point in the history
Fix up DynamoDB Watcher
  • Loading branch information
tomwilkie committed Nov 11, 2016
2 parents 3dac944 + 3e9db14 commit 96d884c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 30 deletions.
27 changes: 2 additions & 25 deletions chunk/dynamo_client.go
Expand Up @@ -18,8 +18,7 @@ const (
)

type dynamoWatcher struct {
accountMaxCapacity *prometheus.GaugeVec
tableCapacity *prometheus.GaugeVec
tableCapacity *prometheus.GaugeVec

dynamoDB *dynamodb.DynamoDB
tableName string
Expand Down Expand Up @@ -49,11 +48,6 @@ func WatchDynamo(dynamoDBURL string, interval time.Duration) (Watcher, error) {

tableName := strings.TrimPrefix(url.Path, "/")
w := &dynamoWatcher{
accountMaxCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_account_max_capacity_units",
Help: "Account-wide DynamoDB capacity, measured in DynamoDB capacity units.",
}, []string{"op"}),
tableCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_table_capacity_units",
Expand Down Expand Up @@ -81,12 +75,7 @@ func (w *dynamoWatcher) updateLoop() {
select {
case <-ticker.C:
log.Debugf("Updating limits from dynamo")
err := w.updateAccountLimits()
if err != nil {
// TODO: Back off if err is throttling related.
log.Warnf("Could not fetch account limits from dynamo: %v", err)
}
err = w.updateTableLimits()
err := w.updateTableLimits()
if err != nil {
log.Warnf("Could not fetch table limits from dynamo: %v", err)
}
Expand All @@ -96,16 +85,6 @@ func (w *dynamoWatcher) updateLoop() {
}
}

func (w *dynamoWatcher) updateAccountLimits() error {
limits, err := w.dynamoDB.DescribeLimits(&dynamodb.DescribeLimitsInput{})
if err != nil {
return err
}
w.accountMaxCapacity.WithLabelValues(readLabel).Set(float64(*limits.AccountMaxReadCapacityUnits))
w.accountMaxCapacity.WithLabelValues(writeLabel).Set(float64(*limits.AccountMaxWriteCapacityUnits))
return nil
}

func (w *dynamoWatcher) updateTableLimits() error {
output, err := w.dynamoDB.DescribeTable(&dynamodb.DescribeTableInput{
TableName: &w.tableName,
Expand All @@ -121,12 +100,10 @@ func (w *dynamoWatcher) updateTableLimits() error {

// Describe implements prometheus.Collector.
func (w *dynamoWatcher) Describe(ch chan<- *prometheus.Desc) {
w.accountMaxCapacity.Describe(ch)
w.tableCapacity.Describe(ch)
}

// Collect implements prometheus.Collector.
func (w *dynamoWatcher) Collect(ch chan<- prometheus.Metric) {
w.accountMaxCapacity.Collect(ch)
w.tableCapacity.Collect(ch)
}
15 changes: 10 additions & 5 deletions cmd/cortex/main.go
Expand Up @@ -70,6 +70,7 @@ type cfg struct {
remoteTimeout time.Duration
numTokens int
logSuccess bool
watchDynamo bool

ingesterConfig ingester.Config
distributorConfig distributor.Config
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")
flag.Parse()

chunkStore, err := setupChunkStore(cfg)
Expand All @@ -108,12 +110,15 @@ func main() {
if cfg.dynamodbPollInterval < 1*time.Minute {
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
}
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
if err != nil {
log.Fatalf("Error initializing DynamoDB watcher: %v", err)

if cfg.watchDynamo {
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
if err != nil {
log.Fatalf("Error initializing DynamoDB watcher: %v", err)
}
defer resourceWatcher.Stop()
prometheus.MustRegister(resourceWatcher)
}
defer resourceWatcher.Stop()
prometheus.MustRegister(resourceWatcher)

consul, err := ring.NewConsulClient(cfg.consulHost)
if err != nil {
Expand Down

0 comments on commit 96d884c

Please sign in to comment.