Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
improve error handling, logging and reporting
Browse files Browse the repository at this point in the history
- if writes to cassandra fail, just continue on to the next def
- keep count of defs successfully archived.
  • Loading branch information
woodsaj committed Mar 8, 2019
1 parent 9ef83eb commit d2868c8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
13 changes: 11 additions & 2 deletions cmd/mt-index-prune/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ type counters struct {
total int
active int
deprecated int
archived int
}

func (c *counters) PrintCounters() {
fmt.Println(fmt.Sprintf("Total analyzed defs: %d", c.total))
fmt.Println(fmt.Sprintf("Active defs: %d", c.active))
fmt.Println(fmt.Sprintf("Deprecated defs: %d", c.deprecated))
fmt.Println(fmt.Sprintf("Archived defs: %d", c.archived))
}

func main() {
Expand Down Expand Up @@ -68,7 +70,7 @@ func main() {
fmt.Println()
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042")
fmt.Println("mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassandra:9042")
}

if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
Expand Down Expand Up @@ -123,7 +125,9 @@ func main() {
defCounters := counters{}
defs := make([]schema.MetricDefinition, 0)
deprecatedDefs := make([]schema.MetricDefinition, 0)

for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ {
log.Infof("starting to process partition %d", partition)
defsByNameWithTags := make(map[string][]schema.MetricDefinition)
defs = cassIdx.LoadPartitions([]int32{int32(partition)}, defs, now)
defCounters.total += len(defs)
Expand Down Expand Up @@ -161,10 +165,15 @@ func main() {
}

if noDryRun {
err = cassIdx.ArchiveDefs(deprecatedDefs)
count, err := cassIdx.ArchiveDefs(deprecatedDefs)
log.Infof("archiving request complete. successful=%d", count)
if count != len(deprecatedDefs) {
log.Warnf("some defs failed to be archived. failed=%d", len(deprecatedDefs)-count)
}
if err != nil {
log.Warnf("Failed to archive defs: %s", err.Error())
}
defCounters.archived += count
}

defs = defs[:0]
Expand Down
2 changes: 1 addition & 1 deletion docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ cass config flags:
EXAMPLES:
mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042
mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassandra:9042
```


Expand Down
43 changes: 34 additions & 9 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,17 @@ NAMES:
return defs
}

func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
// ArchiveDefs writes each of the provided defs to the archive table and
// then deletes the defs from the metric_idx table.
func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) (int, error) {
defChan := make(chan *schema.MetricDefinition, c.cfg.numConns)
g, ctx := errgroup.WithContext(context.Background())

// keep track of how many defs were successfully archived.
success := make([]int, c.cfg.numConns)

for i := 0; i < c.cfg.numConns; i++ {
i := i
g.Go(func() error {
for {
select {
Expand All @@ -405,13 +412,25 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
}
err := c.addDefToArchive(*def)
if err != nil {
return err
// If we failed to add the def to the archive table then just continue on to the next def.
// As we havnet yet removed the this def from the metric_idx table yet, the next time archiving
// is performed the this def will be processed again. As no action is needed by an operator, we
// just log this as a warning.
log.Warnf("cassandra-idx: Failed add def to archive table. error=%s. def=%+v", err, *def)
continue
}

err = c.deleteDef(def.Id, def.Partition)
if err != nil {
return err
// The next time archiving is performed this def will be processed again. Re-adding the def to the archive
// table will just be treated like an update with only the archived_at field changing. As no action is needed
// by an operator, we just log this as a warning.
log.Warnf("cassandra-idx: Failed to remove archived def from metric_idx table. error=%s. def=%+v", err, *def)
continue
}

// increment counter of defs successfully archived
success[i] = success[i] + 1
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -422,11 +441,17 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
defChan <- &defs[i]
}
close(defChan)
if err := g.Wait(); err != nil {
return err

// wait for all goroutines to complete.
err := g.Wait()

// get the count of defs successfully archived.
total := 0
for _, count := range success {
total = total + count
}

return nil
return total, err
}

func (c *CasIdx) processWriteQueue() {
Expand Down Expand Up @@ -513,9 +538,9 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {
return nil
}

// log first failure and every 20th after that.
if (attempts % 20) == 0 {
log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. %s. the value was: %+v", err, def)
// log first failure as a warning. If we reach max attempts, the error will bubble up to the caller.
if attempts == 0 {
log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. error=%s. def=%+v", err, def)
}
}

Expand Down

0 comments on commit d2868c8

Please sign in to comment.