Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

add the mt-index-prune utility #1231

Merged
merged 14 commits into from
Mar 8, 2019
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/cmd/mt-explain/mt-explain
/cmd/mt-index-cat/mt-index-cat
/cmd/mt-index-migrate/mt-index-migrate
/cmd/mt-index-prune/mt-index-prune
/cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
Expand Down
174 changes: 174 additions & 0 deletions cmd/mt-index-prune/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package main

import (
"flag"
"fmt"
"os"
"regexp"
"time"

"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/idx/memory"
"github.com/grafana/metrictank/logger"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)

func init() {
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
}

func perror(err error) {
if err != nil {
log.Fatal(err.Error())
}
}

type counters struct {
total int
active int
deprecated int
}

func (c *counters) PrintCounters() {
fmt.Println(fmt.Sprintf("Total analyzed defs: %d", c.total))
fmt.Println(fmt.Sprintf("Active defs: %d", c.active))
fmt.Println(fmt.Sprintf("Deprecated defs: %d", c.deprecated))
}

func main() {
var noDryRun, verbose bool
var partitionCount int
var indexRulesFile string
globalFlags := flag.NewFlagSet("global config flags", flag.ExitOnError)
globalFlags.BoolVar(&noDryRun, "no-dry-run", false, "do not only plan and print what to do, but also execute it")
globalFlags.BoolVar(&verbose, "verbose", false, "print every metric name that gets archived")
globalFlags.IntVar(&partitionCount, "partition-count", 8, "the number of partitions in existence")
globalFlags.StringVar(&indexRulesFile, "index-rules-file", "/etc/metrictank/index-rules.conf", "name of file which defines the max-stale times")
cassFlags := cassandra.ConfigSetup()

flag.Usage = func() {
fmt.Println("mt-index-prune")
fmt.Println()
fmt.Println("Retrieves a metrictank index and moves all deprecated entries into an archive table")
fmt.Println()
fmt.Printf("Usage:\n\n")
fmt.Printf(" mt-index-prune [global config flags] <idxtype> [idx config flags]\n\n")
fmt.Printf("global config flags:\n\n")
globalFlags.PrintDefaults()
fmt.Println()
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
fmt.Printf("cass config flags:\n\n")
cassFlags.PrintDefaults()
fmt.Println()
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042")
}

if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
flag.Usage()
os.Exit(0)
}

if len(os.Args) < 2 {
flag.Usage()
os.Exit(-1)
}

var cassI int
for i, v := range os.Args {
if v == "cass" {
cassI = i
}
}

if cassI == 0 {
log.Println("only indextype 'cass' supported")
flag.Usage()
os.Exit(1)
}
globalFlags.Parse(os.Args[1:cassI])

indexRules, err := conf.ReadIndexRules(indexRulesFile)
if os.IsNotExist(err) {
log.Fatalf("Index-rules.conf file %s does not exist; exiting", indexRulesFile)
os.Exit(1)
}
now := time.Now()
cutoffs := indexRules.Cutoffs(now)

cassFlags.Parse(os.Args[cassI+1:])
cassandra.CliConfig.Enabled = true
cassIdx := cassandra.New(cassandra.CliConfig)
err = cassIdx.InitBare()
perror(err)

// we don't want to filter any metric definitions during the loading
// so MaxStale is set to 0
memory.IndexRules = conf.IndexRules{
Rules: nil,
Default: conf.IndexRule{
Name: "default",
Pattern: regexp.MustCompile(""),
MaxStale: 0,
},
}

defCounters := counters{}
defs := make([]schema.MetricDefinition, 0)
deprecatedDefs := make([]schema.MetricDefinition, 0)
for partition := int32(0); partition < int32(partitionCount); partition++ {
defsByNameWithTags := make(map[string][]schema.MetricDefinition)
defs = cassIdx.LoadPartitions([]int32{partition}, defs, now)
defCounters.total += len(defs)
for _, def := range defs {
name := def.NameWithTags()
defsByNameWithTags[name] = append(defsByNameWithTags[name], def)
}

for name, defs := range defsByNameWithTags {
// find the latest LastUpdate ts
latest := int64(0)
for _, def := range defs {
if def.LastUpdate > latest {
latest = def.LastUpdate
}
}

irId, _ := indexRules.Match(name)
if latest < cutoffs[irId] {
for _, def := range defs {
deprecatedDefs = append(deprecatedDefs, def)
}
defCounters.deprecated += len(defs)

if verbose {
fmt.Println(fmt.Sprintf("Metric is deprecated: %s", name))
}
} else {
defCounters.active += len(defs)

if verbose {
fmt.Println(fmt.Sprintf("Metric is active: %s", name))
}
}
}

if noDryRun {
err = cassIdx.ArchiveDefs(deprecatedDefs)
if err != nil {
perror(fmt.Errorf("Failed to archive defs: %s", err.Error()))
replay marked this conversation as resolved.
Show resolved Hide resolved
}
}

defs = defs[:0]
deprecatedDefs = deprecatedDefs[:0]
}

defCounters.PrintCounters()
}
73 changes: 73 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,79 @@ Flags:
```


## mt-index-prune

```
mt-index-prune

Retrieves a metrictank index and moves all deprecated entries into an archive table

Usage:

mt-index-prune [global config flags] <idxtype> [idx config flags]

global config flags:

-index-rules-file string
name of file which defines the max-stale times (default "/etc/metrictank/index-rules.conf")
-no-dry-run
do not only plan and print what to do, but also execute it
-partition-count int
the number of partitions in existence (default 8)
-verbose
print every metric name that gets archived

idxtype: only 'cass' supported for now

cass config flags:

-auth
enable cassandra user authentication
-ca-path string
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
-consistency string
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
-create-keyspace
enable the creation of the index keyspace and tables, only one node needs this (default true)
-disable-initial-host-lookup
instruct the driver to not attempt to get host info from the system.peers table
-enabled
(default true)
-host-verification
host (hostname and server cert) verification when using SSL (default true)
-hosts string
comma separated list of cassandra addresses in host:port form (default "localhost:9042")
-keyspace string
Cassandra keyspace to store metricDefinitions in. (default "metrictank")
-num-conns int
number of concurrent connections to cassandra (default 10)
-password string
password for authentication (default "cassandra")
-protocol-version int
cql protocol version to use (default 4)
-prune-interval duration
Interval at which the index should be checked for stale series. (default 3h0m0s)
-schema-file string
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-idx-cassandra.toml")
-ssl
enable SSL connection to cassandra
-timeout duration
cassandra request timeout (default 1s)
-update-cassandra-index
synchronize index changes to cassandra. not all your nodes need to do this. (default true)
-update-interval duration
frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates (default 3h0m0s)
-username string
username for authentication (default "cassandra")
-write-queue-size int
Max number of metricDefs allowed to be unwritten to cassandra (default 100000)


EXAMPLES:
mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042
```


## mt-kafka-mdm-sniff

```
Expand Down
67 changes: 63 additions & 4 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,25 @@ func (c *CasIdx) InitBare() error {
// read templates
schemaKeyspace := util.ReadEntry(c.cfg.schemaFile, "schema_keyspace").(string)
schemaTable := util.ReadEntry(c.cfg.schemaFile, "schema_table").(string)
schemaArchiveTable := util.ReadEntry(c.cfg.schemaFile, "schema_archive_table").(string)

// create the keyspace or ensure it exists
if c.cfg.createKeyspace {
log.Infof("cassandra-idx: ensuring that keyspace %s exist.", c.cfg.keyspace)
log.Infof("cassandra-idx: ensuring that keyspace %s exists.", c.cfg.keyspace)
err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, c.cfg.keyspace)).Exec()
if err != nil {
return fmt.Errorf("failed to initialize cassandra keyspace: %s", err)
}
log.Info("cassandra-idx: ensuring that table metric_idx exist.")
log.Info("cassandra-idx: ensuring that table metric_idx exists.")
err = tmpSession.Query(fmt.Sprintf(schemaTable, c.cfg.keyspace)).Exec()
if err != nil {
return fmt.Errorf("failed to initialize cassandra table: %s", err)
}
log.Info("cassandra-idx: ensuring that table metric_idx_archive exists.")
err = tmpSession.Query(fmt.Sprintf(schemaArchiveTable, c.cfg.keyspace)).Exec()
if err != nil {
return fmt.Errorf("failed to initialize cassandra table: %s", err)
}
} else {
var keyspaceMetadata *gocql.KeyspaceMetadata
for attempt := 1; attempt > 0; attempt++ {
Expand All @@ -141,7 +147,9 @@ func (c *CasIdx) InitBare() error {
log.Warnf("cassandra-idx: cassandra keyspace not found. retrying in 5s. attempt: %d", attempt)
time.Sleep(5 * time.Second)
} else {
if _, ok := keyspaceMetadata.Tables["metric_idx"]; ok {
_, okIdx := keyspaceMetadata.Tables["metric_idx"]
_, okArchive := keyspaceMetadata.Tables["metric_idx_archive"]
if okIdx && okArchive {
break
} else {
if attempt >= 5 {
Expand All @@ -152,7 +160,6 @@ func (c *CasIdx) InitBare() error {
}
}
}

}

tmpSession.Close()
Expand Down Expand Up @@ -383,6 +390,22 @@ NAMES:
return defs
}

func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
for _, def := range defs {
woodsaj marked this conversation as resolved.
Show resolved Hide resolved
err := c.addDefToArchive(def)
if err != nil {
return err
}

err = c.deleteDef(def.Id, def.Partition)
if err != nil {
return err
}
}

return nil
}

func (c *CasIdx) processWriteQueue() {
var success bool
var attempts int
Expand Down Expand Up @@ -435,6 +458,42 @@ func (c *CasIdx) processWriteQueue() {
c.wg.Done()
}

func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {
insertQry := `INSERT INTO metric_idx_archive (id, orgid, partition, name, interval, unit, mtype, tags, lastupdate, archived_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
maxAttempts := 5
now := time.Now().UTC().Unix()
var err error

for attempts := 0; attempts < maxAttempts; attempts++ {
if attempts > 0 {
sleepTime := 100 * attempts
if sleepTime > 2000 {
sleepTime = 2000
}
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
}

err := c.session.Query(
insertQry,
def.Id.String(),
def.OrgId,
def.Partition,
def.Name,
def.Interval,
def.Unit,
def.Mtype,
def.Tags,
def.LastUpdate,
now).Exec()

if err == nil {
return nil
}
woodsaj marked this conversation as resolved.
Show resolved Hide resolved
}

return err
}

func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := c.MemoryIdx.Delete(orgId, pattern)
Expand Down
17 changes: 17 additions & 0 deletions scripts/config/schema-idx-cassandra.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,20 @@ CREATE TABLE IF NOT EXISTS %s.metric_idx (
) WITH compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
"""

schema_archive_table = """
CREATE TABLE IF NOT EXISTS %s.metric_idx_archive (
id text,
orgid int,
partition int,
name text,
interval int,
unit text,
mtype text,
tags set<text>,
lastupdate int,
archived_at int,
PRIMARY KEY (partition, id)
) WITH compaction = {'class': 'SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
"""