From d07c1bd1325c1a804d478b85cc1763828081901b Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Wed, 6 Mar 2019 13:18:19 -0300 Subject: [PATCH 01/14] add the mt-index-prune utility --- cmd/mt-index-prune/main.go | 173 +++++++++++++++++++++++ idx/cassandra/cassandra.go | 70 ++++++++- scripts/config/schema-idx-cassandra.toml | 17 +++ 3 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 cmd/mt-index-prune/main.go diff --git a/cmd/mt-index-prune/main.go b/cmd/mt-index-prune/main.go new file mode 100644 index 0000000000..f1ef373495 --- /dev/null +++ b/cmd/mt-index-prune/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "flag" + "fmt" + "os" + "regexp" + "time" + + "github.com/grafana/metrictank/conf" + "github.com/grafana/metrictank/idx/cassandra" + "github.com/grafana/metrictank/idx/memory" + "github.com/grafana/metrictank/logger" + "github.com/raintank/schema" + log "github.com/sirupsen/logrus" +) + +func init() { + formatter := &logger.TextFormatter{} + formatter.TimestampFormat = "2006-01-02 15:04:05.000" + log.SetFormatter(formatter) + log.SetLevel(log.InfoLevel) +} + +func perror(err error) { + if err != nil { + log.Fatal(err.Error()) + } +} + +type counters struct { + total int + active int + deprecated int +} + +func (c *counters) PrintCounters() { + fmt.Println(fmt.Sprintf("Total analyzed defs: %d", c.total)) + fmt.Println(fmt.Sprintf("Active defs: %d", c.active)) + fmt.Println(fmt.Sprintf("Deprecated defs: %d", c.deprecated)) +} + +func main() { + var noDryRun, verbose bool + var partitionCount int + var indexRulesFile string + globalFlags := flag.NewFlagSet("global config flags", flag.ExitOnError) + globalFlags.BoolVar(&noDryRun, "no-dry-run", false, "do not only plan and print what to do, but also execute it") + globalFlags.BoolVar(&verbose, "verbose", false, "print every metric name that gets archived") + globalFlags.IntVar(&partitionCount, "partition-count", 8, "the number of partitions in existence") + globalFlags.StringVar(&indexRulesFile, "index-rules-file", "/etc/metrictank/index-rules.conf", "name of file which defines the max-stale times") + cassFlags := cassandra.ConfigSetup() + + flag.Usage = func() { + fmt.Println("mt-index-prune") + fmt.Println() + fmt.Println("Retrieves a metrictank index and moves all deprecated entries into an archive table") + fmt.Println() + fmt.Printf("Usage:\n\n") + fmt.Printf(" mt-index-prune [global config flags] [idx config flags]\n\n") + fmt.Printf("global config flags:\n\n") + globalFlags.PrintDefaults() + fmt.Println() + fmt.Printf("idxtype: only 'cass' supported for now\n\n") + fmt.Printf("cass config flags:\n\n") + cassFlags.PrintDefaults() + fmt.Println() + fmt.Println() + fmt.Println("EXAMPLES:") + fmt.Println("mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042") + } + + if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") { + flag.Usage() + os.Exit(0) + } + + if len(os.Args) < 2 { + flag.Usage() + os.Exit(-1) + } + + var cassI int + for i, v := range os.Args { + if v == "cass" { + cassI = i + } + } + if cassI == 0 { + log.Println("only indextype 'cass' supported") + flag.Usage() + os.Exit(1) + } + globalFlags.Parse(os.Args[1:cassI]) + + indexRules, err := conf.ReadIndexRules(indexRulesFile) + if os.IsNotExist(err) { + log.Fatalf("Index-rules.conf file %s does not exist; using defaults", indexRulesFile) + } + now := time.Now() + cutoffs := indexRules.Cutoffs(now) + + cassFlags.Parse(os.Args[cassI+1:]) + cassandra.CliConfig.Enabled = true + + cassIdx := cassandra.New(cassandra.CliConfig) + err = cassIdx.InitBare() + perror(err) + + // we don't want to filter any metric definitions during the loading + // so MaxStale is set to 0 + memory.IndexRules = conf.IndexRules{ + Rules: nil, + Default: conf.IndexRule{ + Name: "default", + Pattern: regexp.MustCompile(""), + MaxStale: 0, + }, + } + + defCounters := counters{} + defs := make([]schema.MetricDefinition, 0) + deprecatedDefs := make([]schema.MetricDefinition, 0) + for partition := int32(0); partition < int32(partitionCount); partition++ { + defsByNameWithTags := make(map[string][]schema.MetricDefinition) + defs = cassIdx.LoadPartitions([]int32{partition}, defs, now) + defCounters.total += len(defs) + for _, def := range defs { + name := def.NameWithTags() + defsByNameWithTags[name] = append(defsByNameWithTags[name], def) + } + + for name, defs := range defsByNameWithTags { + // find the latest LastUpdate ts + latest := int64(0) + for _, def := range defs { + if def.LastUpdate > latest { + latest = def.LastUpdate + } + } + + irId, _ := indexRules.Match(name) + if latest < cutoffs[irId] { + for _, def := range defs { + deprecatedDefs = append(deprecatedDefs, def) + } + defCounters.deprecated += len(defs) + + if verbose { + fmt.Println(fmt.Sprintf("Metric is deprecated: %s", name)) + } + } else { + defCounters.active += len(defs) + + if verbose { + fmt.Println(fmt.Sprintf("Metric is active: %s", name)) + } + } + } + + if noDryRun { + err = cassIdx.ArchiveDefs(deprecatedDefs) + if err != nil { + perror(fmt.Errorf("Failed to archive defs: %s", err.Error())) + } + } + + defs = defs[:0] + deprecatedDefs = deprecatedDefs[:0] + } + + defCounters.PrintCounters() +} diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index cca1d068fb..8e96e163b0 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -117,19 +117,25 @@ func (c *CasIdx) InitBare() error { // read templates schemaKeyspace := util.ReadEntry(c.cfg.schemaFile, "schema_keyspace").(string) schemaTable := util.ReadEntry(c.cfg.schemaFile, "schema_table").(string) + schemaArchiveTable := util.ReadEntry(c.cfg.schemaFile, "schema_archive_table").(string) // create the keyspace or ensure it exists if c.cfg.createKeyspace { - log.Infof("cassandra-idx: ensuring that keyspace %s exist.", c.cfg.keyspace) + log.Infof("cassandra-idx: ensuring that keyspace %s exists.", c.cfg.keyspace) err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, c.cfg.keyspace)).Exec() if err != nil { return fmt.Errorf("failed to initialize cassandra keyspace: %s", err) } - log.Info("cassandra-idx: ensuring that table metric_idx exist.") + log.Info("cassandra-idx: ensuring that table metric_idx exists.") err = tmpSession.Query(fmt.Sprintf(schemaTable, c.cfg.keyspace)).Exec() if err != nil { return fmt.Errorf("failed to initialize cassandra table: %s", err) } + log.Info("cassandra-idx: ensuring that table metric_idx_archive exists.") + err = tmpSession.Query(fmt.Sprintf(schemaArchiveTable, c.cfg.keyspace)).Exec() + if err != nil { + return fmt.Errorf("failed to initialize cassandra table: %s", err) + } } else { var keyspaceMetadata *gocql.KeyspaceMetadata for attempt := 1; attempt > 0; attempt++ { @@ -141,7 +147,9 @@ func (c *CasIdx) InitBare() error { log.Warnf("cassandra-idx: cassandra keyspace not found. retrying in 5s. attempt: %d", attempt) time.Sleep(5 * time.Second) } else { - if _, ok := keyspaceMetadata.Tables["metric_idx"]; ok { + _, okIdx := keyspaceMetadata.Tables["metric_idx"] + _, okArchive := keyspaceMetadata.Tables["metric_idx_archive"] + if okIdx && okArchive { break } else { if attempt >= 5 { @@ -152,7 +160,6 @@ func (c *CasIdx) InitBare() error { } } } - } tmpSession.Close() @@ -383,6 +390,22 @@ NAMES: return defs } +func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error { + for _, def := range defs { + err := c.addDefToArchive(def) + if err != nil { + return err + } + + err = c.deleteDef(def.Id, def.Partition) + if err != nil { + return err + } + } + + return nil +} + func (c *CasIdx) processWriteQueue() { var success bool var attempts int @@ -435,6 +458,45 @@ func (c *CasIdx) processWriteQueue() { c.wg.Done() } +func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { + insertQry := `INSERT INTO metric_idx_archive (id, orgid, partition, name, interval, unit, mtype, tags, lastupdate, archived_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + attempts := 0 + + for attempts < 5 { + attempts++ + err := c.session.Query( + insertQry, + def.Id.String(), + def.OrgId, + def.Partition, + def.Name, + def.Interval, + def.Unit, + def.Mtype, + def.Tags, + def.LastUpdate, + time.Now().UTC().Unix()).Exec() + + if err == nil { + break + } + + if attempts >= 5 { + return fmt.Errorf("Failed writing to cassandra: %s", err.Error()) + } + + sleepTime := 100 * attempts + if sleepTime > 2000 { + sleepTime = 2000 + } + time.Sleep(time.Duration(sleepTime) * time.Millisecond) + attempts++ + + } + + return nil +} + func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) { pre := time.Now() defs, err := c.MemoryIdx.Delete(orgId, pattern) diff --git a/scripts/config/schema-idx-cassandra.toml b/scripts/config/schema-idx-cassandra.toml index 1ea97c6d43..2da49e4711 100644 --- a/scripts/config/schema-idx-cassandra.toml +++ b/scripts/config/schema-idx-cassandra.toml @@ -17,3 +17,20 @@ CREATE TABLE IF NOT EXISTS %s.metric_idx ( ) WITH compaction = {'class': 'SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} """ + +schema_archive_table = """ +CREATE TABLE IF NOT EXISTS %s.metric_idx_archive ( + id text, + orgid int, + partition int, + name text, + interval int, + unit text, + mtype text, + tags set, + lastupdate int, + archived_at int, + PRIMARY KEY (partition, id) +) WITH compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} +""" From dce29cfbeb74abce6a88e6757e526e9bbe5c3d78 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 06:25:15 -0300 Subject: [PATCH 02/14] update docs --- docs/tools.md | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/docs/tools.md b/docs/tools.md index 205add0262..c3190b41bd 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -197,6 +197,79 @@ Flags: ``` +## mt-index-prune + +``` +mt-index-prune + +Retrieves a metrictank index and moves all deprecated entries into an archive table + +Usage: + + mt-index-prune [global config flags] [idx config flags] + +global config flags: + + -index-rules-file string + name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") + -no-dry-run + do not only plan and print what to do, but also execute it + -partition-count int + the number of partitions in existence (default 8) + -verbose + print every metric name that gets archived + +idxtype: only 'cass' supported for now + +cass config flags: + + -auth + enable cassandra user authentication + -ca-path string + cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem") + -consistency string + write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") + -create-keyspace + enable the creation of the index keyspace and tables, only one node needs this (default true) + -disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table + -enabled + (default true) + -host-verification + host (hostname and server cert) verification when using SSL (default true) + -hosts string + comma separated list of cassandra addresses in host:port form (default "localhost:9042") + -keyspace string + Cassandra keyspace to store metricDefinitions in. (default "metrictank") + -num-conns int + number of concurrent connections to cassandra (default 10) + -password string + password for authentication (default "cassandra") + -protocol-version int + cql protocol version to use (default 4) + -prune-interval duration + Interval at which the index should be checked for stale series. (default 3h0m0s) + -schema-file string + File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-idx-cassandra.toml") + -ssl + enable SSL connection to cassandra + -timeout duration + cassandra request timeout (default 1s) + -update-cassandra-index + synchronize index changes to cassandra. not all your nodes need to do this. (default true) + -update-interval duration + frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates (default 3h0m0s) + -username string + username for authentication (default "cassandra") + -write-queue-size int + Max number of metricDefs allowed to be unwritten to cassandra (default 100000) + + +EXAMPLES: +mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042 +``` + + ## mt-kafka-mdm-sniff ``` From ef2aa81e5624bfb87d78a9f4cec659d253f10b8e Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 06:37:14 -0300 Subject: [PATCH 03/14] update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 85bef64176..a7f1c7b990 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ /cmd/mt-explain/mt-explain /cmd/mt-index-cat/mt-index-cat /cmd/mt-index-migrate/mt-index-migrate +/cmd/mt-index-prune/mt-index-prune /cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order /cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff /cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff From c5dc6d3c80ddfa00a9a8623e1ec418518a0f8866 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 06:37:25 -0300 Subject: [PATCH 04/14] exit if index-rules couldn't be found --- cmd/mt-index-prune/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/mt-index-prune/main.go b/cmd/mt-index-prune/main.go index f1ef373495..99e61c3007 100644 --- a/cmd/mt-index-prune/main.go +++ b/cmd/mt-index-prune/main.go @@ -86,6 +86,7 @@ func main() { cassI = i } } + if cassI == 0 { log.Println("only indextype 'cass' supported") flag.Usage() @@ -95,14 +96,14 @@ func main() { indexRules, err := conf.ReadIndexRules(indexRulesFile) if os.IsNotExist(err) { - log.Fatalf("Index-rules.conf file %s does not exist; using defaults", indexRulesFile) + log.Fatalf("Index-rules.conf file %s does not exist; exiting", indexRulesFile) + os.Exit(1) } now := time.Now() cutoffs := indexRules.Cutoffs(now) cassFlags.Parse(os.Args[cassI+1:]) cassandra.CliConfig.Enabled = true - cassIdx := cassandra.New(cassandra.CliConfig) err = cassIdx.InitBare() perror(err) From fb59a2c61b3a6097b35d664d438aad0a0cf660fa Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 06:53:23 -0300 Subject: [PATCH 05/14] make for loop easier to understand --- idx/cassandra/cassandra.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 8e96e163b0..0e0e842dd4 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -460,10 +460,18 @@ func (c *CasIdx) processWriteQueue() { func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { insertQry := `INSERT INTO metric_idx_archive (id, orgid, partition, name, interval, unit, mtype, tags, lastupdate, archived_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - attempts := 0 + maxAttempts := 5 + var err error + + for attempts := 0; attempts < maxAttempts; attempts++ { + if attempts > 0 { + sleepTime := 100 * attempts + if sleepTime > 2000 { + sleepTime = 2000 + } + time.Sleep(time.Duration(sleepTime) * time.Millisecond) + } - for attempts < 5 { - attempts++ err := c.session.Query( insertQry, def.Id.String(), @@ -478,23 +486,11 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { time.Now().UTC().Unix()).Exec() if err == nil { - break - } - - if attempts >= 5 { - return fmt.Errorf("Failed writing to cassandra: %s", err.Error()) - } - - sleepTime := 100 * attempts - if sleepTime > 2000 { - sleepTime = 2000 + return nil } - time.Sleep(time.Duration(sleepTime) * time.Millisecond) - attempts++ - } - return nil + return err } func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) { From 13cc134edb0c40863cdb2151058fc9a09bb4ae89 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 07:03:19 -0300 Subject: [PATCH 06/14] save some calls to time.Now() --- idx/cassandra/cassandra.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 0e0e842dd4..6f8eb959cb 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -461,6 +461,7 @@ func (c *CasIdx) processWriteQueue() { func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { insertQry := `INSERT INTO metric_idx_archive (id, orgid, partition, name, interval, unit, mtype, tags, lastupdate, archived_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` maxAttempts := 5 + now := time.Now().UTC().Unix() var err error for attempts := 0; attempts < maxAttempts; attempts++ { @@ -483,7 +484,7 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { def.Mtype, def.Tags, def.LastUpdate, - time.Now().UTC().Unix()).Exec() + now).Exec() if err == nil { return nil From 7f8bb745f4f078cbcb9a468bc1dd84c18df0a0f6 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 10:14:22 -0300 Subject: [PATCH 07/14] different way to specify partitions makes it possible to iterate over any single partition or an arbitrary range of partitions --- cmd/mt-index-prune/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/mt-index-prune/main.go b/cmd/mt-index-prune/main.go index 99e61c3007..8692bfc9b8 100644 --- a/cmd/mt-index-prune/main.go +++ b/cmd/mt-index-prune/main.go @@ -42,12 +42,13 @@ func (c *counters) PrintCounters() { func main() { var noDryRun, verbose bool - var partitionCount int + var partitionFrom, partitionTo int var indexRulesFile string globalFlags := flag.NewFlagSet("global config flags", flag.ExitOnError) globalFlags.BoolVar(&noDryRun, "no-dry-run", false, "do not only plan and print what to do, but also execute it") globalFlags.BoolVar(&verbose, "verbose", false, "print every metric name that gets archived") - globalFlags.IntVar(&partitionCount, "partition-count", 8, "the number of partitions in existence") + globalFlags.IntVar(&partitionFrom, "partition-from", 0, "the partition to start at") + globalFlags.IntVar(&partitionTo, "partition-to", -1, "prune all partitions up to this one (exclusive). If unset, only the partition defined with \"--partition-from\" gets pruned") globalFlags.StringVar(&indexRulesFile, "index-rules-file", "/etc/metrictank/index-rules.conf", "name of file which defines the max-stale times") cassFlags := cassandra.ConfigSetup() @@ -122,9 +123,9 @@ func main() { defCounters := counters{} defs := make([]schema.MetricDefinition, 0) deprecatedDefs := make([]schema.MetricDefinition, 0) - for partition := int32(0); partition < int32(partitionCount); partition++ { + for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ { defsByNameWithTags := make(map[string][]schema.MetricDefinition) - defs = cassIdx.LoadPartitions([]int32{partition}, defs, now) + defs = cassIdx.LoadPartitions([]int32{int32(partition)}, defs, now) defCounters.total += len(defs) for _, def := range defs { name := def.NameWithTags() From 6821d257486be219ad2dcd12c38e2197191a2a26 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 10:15:01 -0300 Subject: [PATCH 08/14] add error when writing to cassandra --- idx/cassandra/cassandra.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 6f8eb959cb..decb9d8bb5 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -489,6 +489,11 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { if err == nil { return nil } + + // log first failure and every 20th after that. + if (attempts % 20) == 0 { + log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. %s. the value was: %+v", err, def) + } } return err From d5f72f48be35ad8f5b74e49f87340c1cfe77d525 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 10:16:34 -0300 Subject: [PATCH 09/14] change fatal error to warning --- cmd/mt-index-prune/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mt-index-prune/main.go b/cmd/mt-index-prune/main.go index 8692bfc9b8..0c415f4f4d 100644 --- a/cmd/mt-index-prune/main.go +++ b/cmd/mt-index-prune/main.go @@ -163,7 +163,7 @@ func main() { if noDryRun { err = cassIdx.ArchiveDefs(deprecatedDefs) if err != nil { - perror(fmt.Errorf("Failed to archive defs: %s", err.Error())) + log.Warnf("Failed to archive defs: %s", err.Error()) } } From 9d8aba41da104b287278dd27bd17e7050560ccfe Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 10:48:30 -0300 Subject: [PATCH 10/14] execute cassandra operations in a pool of routines --- idx/cassandra/cassandra.go | 41 +++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index decb9d8bb5..819ceb0d94 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -1,6 +1,7 @@ package cassandra import ( + "context" "fmt" "strconv" "strings" @@ -16,6 +17,7 @@ import ( "github.com/grafana/metrictank/util" "github.com/raintank/schema" log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) var ( @@ -391,16 +393,37 @@ NAMES: } func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error { - for _, def := range defs { - err := c.addDefToArchive(def) - if err != nil { - return err - } + defChan := make(chan *schema.MetricDefinition, c.cfg.numConns) + g, ctx := errgroup.WithContext(context.Background()) + for i := 0; i < c.cfg.numConns; i++ { + g.Go(func() error { + for { + select { + case def, ok := <-defChan: + if !ok { + return nil + } + err := c.addDefToArchive(*def) + if err != nil { + return err + } - err = c.deleteDef(def.Id, def.Partition) - if err != nil { - return err - } + err = c.deleteDef(def.Id, def.Partition) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + for i := range defs { + defChan <- &defs[i] + } + close(defChan) + if err := g.Wait(); err != nil { + return err } return nil From 8684bea28563ffe84fef44b058cce954b8b7210c Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 10:48:54 -0300 Subject: [PATCH 11/14] update docs --- docs/tools.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/tools.md b/docs/tools.md index c3190b41bd..a666dbb6bf 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -214,8 +214,10 @@ global config flags: name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf") -no-dry-run do not only plan and print what to do, but also execute it - -partition-count int - the number of partitions in existence (default 8) + -partition-from int + the partition to start at + -partition-to int + prune all partitions up to this one (exclusive). If unset, only the partition defined with "--partition-from" gets pruned (default -1) -verbose print every metric name that gets archived From 434b7bcf78c04971ea2bb904d492871c50f635c7 Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Thu, 7 Mar 2019 11:13:39 -0300 Subject: [PATCH 12/14] fix dep --- Gopkg.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Gopkg.lock b/Gopkg.lock index dc03b57a8c..2e4ccf8c61 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1068,6 +1068,7 @@ "github.com/uber/jaeger-client-go", "github.com/uber/jaeger-client-go/config", "github.com/uber/jaeger-client-go/log", + "golang.org/x/sync/errgroup", "gopkg.in/macaron.v1", ] solver-name = "gps-cdcl" From 9ef83ebae697d4741b3bcbf12b6c225e0ac2e46e Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 8 Mar 2019 15:33:59 +0800 Subject: [PATCH 13/14] add schema_archive_table to scylladb template --- scripts/config/schema-idx-scylladb.toml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/scripts/config/schema-idx-scylladb.toml b/scripts/config/schema-idx-scylladb.toml index e63b7b32c1..293d196db6 100644 --- a/scripts/config/schema-idx-scylladb.toml +++ b/scripts/config/schema-idx-scylladb.toml @@ -18,3 +18,20 @@ CREATE TABLE IF NOT EXISTS %s.metric_idx ( ) WITH compaction = {'class': 'SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} """ + +schema_archive_table = """ +CREATE TABLE IF NOT EXISTS %s.metric_idx_archive ( + id text, + orgid int, + partition int, + name text, + interval int, + unit text, + mtype text, + tags set, + lastupdate int, + archived_at int, + PRIMARY KEY (partition, id) +) WITH compaction = {'class': 'SizeTieredCompactionStrategy'} + AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} +""" \ No newline at end of file From d2868c8ad7040eff466fafc5a697750c1d7afea2 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 8 Mar 2019 15:35:35 +0800 Subject: [PATCH 14/14] improve error handling, logging and reporting - if writes to cassandra fail, just continue on to the next def - keep count of defs successfully archived. --- cmd/mt-index-prune/main.go | 13 ++++++++++-- docs/tools.md | 2 +- idx/cassandra/cassandra.go | 43 ++++++++++++++++++++++++++++++-------- 3 files changed, 46 insertions(+), 12 deletions(-) diff --git a/cmd/mt-index-prune/main.go b/cmd/mt-index-prune/main.go index 0c415f4f4d..32ea58f364 100644 --- a/cmd/mt-index-prune/main.go +++ b/cmd/mt-index-prune/main.go @@ -32,12 +32,14 @@ type counters struct { total int active int deprecated int + archived int } func (c *counters) PrintCounters() { fmt.Println(fmt.Sprintf("Total analyzed defs: %d", c.total)) fmt.Println(fmt.Sprintf("Active defs: %d", c.active)) fmt.Println(fmt.Sprintf("Deprecated defs: %d", c.deprecated)) + fmt.Println(fmt.Sprintf("Archived defs: %d", c.archived)) } func main() { @@ -68,7 +70,7 @@ func main() { fmt.Println() fmt.Println() fmt.Println("EXAMPLES:") - fmt.Println("mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042") + fmt.Println("mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassandra:9042") } if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") { @@ -123,7 +125,9 @@ func main() { defCounters := counters{} defs := make([]schema.MetricDefinition, 0) deprecatedDefs := make([]schema.MetricDefinition, 0) + for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ { + log.Infof("starting to process partition %d", partition) defsByNameWithTags := make(map[string][]schema.MetricDefinition) defs = cassIdx.LoadPartitions([]int32{int32(partition)}, defs, now) defCounters.total += len(defs) @@ -161,10 +165,15 @@ func main() { } if noDryRun { - err = cassIdx.ArchiveDefs(deprecatedDefs) + count, err := cassIdx.ArchiveDefs(deprecatedDefs) + log.Infof("archiving request complete. successful=%d", count) + if count != len(deprecatedDefs) { + log.Warnf("some defs failed to be archived. failed=%d", len(deprecatedDefs)-count) + } if err != nil { log.Warnf("Failed to archive defs: %s", err.Error()) } + defCounters.archived += count } defs = defs[:0] diff --git a/docs/tools.md b/docs/tools.md index a666dbb6bf..2a2719876d 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -268,7 +268,7 @@ cass config flags: EXAMPLES: -mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042 +mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassandra:9042 ``` diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 819ceb0d94..51e873e2bc 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -392,10 +392,17 @@ NAMES: return defs } -func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error { +// ArchiveDefs writes each of the provided defs to the archive table and +// then deletes the defs from the metric_idx table. +func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) (int, error) { defChan := make(chan *schema.MetricDefinition, c.cfg.numConns) g, ctx := errgroup.WithContext(context.Background()) + + // keep track of how many defs were successfully archived. + success := make([]int, c.cfg.numConns) + for i := 0; i < c.cfg.numConns; i++ { + i := i g.Go(func() error { for { select { @@ -405,13 +412,25 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error { } err := c.addDefToArchive(*def) if err != nil { - return err + // If we failed to add the def to the archive table then just continue on to the next def. + // As we havnet yet removed the this def from the metric_idx table yet, the next time archiving + // is performed the this def will be processed again. As no action is needed by an operator, we + // just log this as a warning. + log.Warnf("cassandra-idx: Failed add def to archive table. error=%s. def=%+v", err, *def) + continue } err = c.deleteDef(def.Id, def.Partition) if err != nil { - return err + // The next time archiving is performed this def will be processed again. Re-adding the def to the archive + // table will just be treated like an update with only the archived_at field changing. As no action is needed + // by an operator, we just log this as a warning. + log.Warnf("cassandra-idx: Failed to remove archived def from metric_idx table. error=%s. def=%+v", err, *def) + continue } + + // increment counter of defs successfully archived + success[i] = success[i] + 1 case <-ctx.Done(): return ctx.Err() } @@ -422,11 +441,17 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error { defChan <- &defs[i] } close(defChan) - if err := g.Wait(); err != nil { - return err + + // wait for all goroutines to complete. + err := g.Wait() + + // get the count of defs successfully archived. + total := 0 + for _, count := range success { + total = total + count } - return nil + return total, err } func (c *CasIdx) processWriteQueue() { @@ -513,9 +538,9 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error { return nil } - // log first failure and every 20th after that. - if (attempts % 20) == 0 { - log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. %s. the value was: %+v", err, def) + // log first failure as a warning. If we reach max attempts, the error will bubble up to the caller. + if attempts == 0 { + log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. error=%s. def=%+v", err, def) } }