Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

cassandra-idx: load partitions in parallel #1270

Merged
merged 6 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-cluster-query/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
6 changes: 6 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ cass config flags:
host (hostname and server cert) verification when using SSL (default true)
-hosts string
comma separated list of cassandra addresses in host:port form (default "localhost:9042")
-init-load-concurrency int
Number of partitions to load concurrently on startup. (default 1)
-keyspace string
Cassandra keyspace to store metricDefinitions in. (default "metrictank")
-num-conns int
Expand Down Expand Up @@ -241,6 +243,8 @@ cass config flags:
host (hostname and server cert) verification when using SSL (default true)
-hosts string
comma separated list of cassandra addresses in host:port form (default "localhost:9042")
-init-load-concurrency int
Number of partitions to load concurrently on startup. (default 1)
-keyspace string
Cassandra keyspace to store metricDefinitions in. (default "metrictank")
-num-conns int
Expand Down Expand Up @@ -787,6 +791,8 @@ cass config flags:
host (hostname and server cert) verification when using SSL (default true)
-hosts string
comma separated list of cassandra addresses in host:port form (default "localhost:9042")
-init-load-concurrency int
Number of partitions to load concurrently on startup. (default 1)
-keyspace string
Cassandra keyspace to store metricDefinitions in. (default "metrictank")
-num-conns int
Expand Down
30 changes: 25 additions & 5 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gocql/gocql"
Expand Down Expand Up @@ -343,12 +344,29 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
var defs []schema.MetricDefinition
var num int
gate := make(chan struct{}, c.cfg.initLoadConcurrency)
var wg sync.WaitGroup
defPool := sync.Pool{
New: func() interface{} {
return []schema.MetricDefinition{}
},
}
var num uint32
for _, partition := range cluster.Manager.GetPartitions() {
defs = c.LoadPartitions([]int32{partition}, defs[:0], pre)
num += c.MemoryIndex.LoadPartition(partition, defs)
}
wg.Add(1)
go func(p int32) {
gate <- struct{}{}
defs := defPool.Get().([]schema.MetricDefinition)
defer func() {
defPool.Put(defs[:0])
wg.Done()
<-gate
}()
defs = c.LoadPartitions([]int32{p}, defs, pre)
atomic.AddUint32(&num, uint32(c.MemoryIndex.LoadPartition(p, defs)))
}(partition)
}
wg.Wait()
log.Infof("cassandra-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
}

Expand All @@ -357,6 +375,7 @@ func (c *CasIdx) Load(defs []schema.MetricDefinition, now time.Time) []schema.Me
return c.load(defs, iter, now)
}

// LoadPartitions appends MetricDefinitions from the given partitions to defs and returns the modified defs, honoring pruning settings relative to now
func (c *CasIdx) LoadPartitions(partitions []int32, defs []schema.MetricDefinition, now time.Time) []schema.MetricDefinition {
placeholders := make([]string, len(partitions))
for i, p := range partitions {
Expand All @@ -367,6 +386,7 @@ func (c *CasIdx) LoadPartitions(partitions []int32, defs []schema.MetricDefiniti
return c.load(defs, iter, now)
}

// load appends MetricDefinitions from the iterator to defs and returns the modified defs, honoring pruning settings relative to now
func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, now time.Time) []schema.MetricDefinition {
defsByNames := make(map[string][]*schema.MetricDefinition)
var id, name, unit, mtype string
Expand Down
3 changes: 3 additions & 0 deletions idx/cassandra/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type IdxConfig struct {
numConns int
protoVer int
disableInitialHostLookup bool
initLoadConcurrency int
}

// NewIdxConfig returns IdxConfig with default values set.
Expand All @@ -64,6 +65,7 @@ func NewIdxConfig() *IdxConfig {
auth: false,
username: "cassandra",
password: "cassandra",
initLoadConcurrency: 1,
}
}

Expand Down Expand Up @@ -92,6 +94,7 @@ func ConfigSetup() *flag.FlagSet {
casIdx.BoolVar(&CliConfig.updateCassIdx, "update-cassandra-index", CliConfig.updateCassIdx, "synchronize index changes to cassandra. not all your nodes need to do this.")
casIdx.DurationVar(&CliConfig.updateInterval, "update-interval", CliConfig.updateInterval, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates")
casIdx.DurationVar(&CliConfig.pruneInterval, "prune-interval", CliConfig.pruneInterval, "Interval at which the index should be checked for stale series.")
casIdx.IntVar(&CliConfig.initLoadConcurrency, "init-load-concurrency", CliConfig.initLoadConcurrency, "Number of partitions to load concurrently on startup.")
casIdx.IntVar(&CliConfig.protoVer, "protocol-version", CliConfig.protoVer, "cql protocol version to use")
casIdx.BoolVar(&CliConfig.createKeyspace, "create-keyspace", CliConfig.createKeyspace, "enable the creation of the index keyspace and tables, only one node needs this")
casIdx.StringVar(&CliConfig.schemaFile, "schema-file", CliConfig.schemaFile, "File containing the needed schemas in case database needs initializing")
Expand Down
2 changes: 2 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down
2 changes: 2 additions & 0 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ num-conns = 10
write-queue-size = 100000
#Interval at which the index should be checked for stale series. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'
prune-interval = 3h
# Number of partitions to load concurrently on startup.
init-load-concurrency = 1
# synchronize index changes to cassandra. not all your nodes need to do this.
update-cassandra-index = true
#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'. Setting to '0s' will cause instant updates.
Expand Down