Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1232 from grafana/partitionedIdx
Browse files Browse the repository at this point in the history
Add partitioned index
  • Loading branch information
woodsaj committed Mar 8, 2019
2 parents 2a90de3 + b83a9a0 commit 20f6fde
Show file tree
Hide file tree
Showing 16 changed files with 1,018 additions and 131 deletions.
1 change: 0 additions & 1 deletion cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func main() {
notifierKafka.ConfigProcess(*instance)
statsConfig.ConfigProcess(*instance)
mdata.ConfigProcess()
memory.ConfigProcess()
cassandra.ConfigProcess()
bigtable.ConfigProcess()
bigtableStore.ConfigProcess(mdata.MaxChunkSpan())
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false
```

### Bigtable index
Expand Down
26 changes: 13 additions & 13 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type writeReq struct {
}

type BigtableIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
tbl *bigtable.Table
client *bigtable.Client
Expand All @@ -70,9 +70,9 @@ func New(cfg *IdxConfig) *BigtableIdx {
log.Fatalf("bigtable-idx: %s", err)
}
idx := &BigtableIdx{
MemoryIdx: *memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
MemoryIndex: memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
}
if cfg.UpdateBigtableIdx {
idx.writeQueue = make(chan writeReq, cfg.WriteQueueSize-cfg.WriteMaxFlushSize)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (b *BigtableIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (b *BigtableIdx) Init() error {
log.Infof("bigtable-idx: Initializing. Project=%s, Instance=%s", b.cfg.GcpProject, b.cfg.BigtableInstance)
if err := b.MemoryIdx.Init(); err != nil {
if err := b.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -190,7 +190,7 @@ func (b *BigtableIdx) Init() error {
}

func (b *BigtableIdx) Stop() {
b.MemoryIdx.Stop()
b.MemoryIndex.Stop()
close(b.shutdown)
if b.cfg.UpdateBigtableIdx {
close(b.writeQueue)
Expand All @@ -208,7 +208,7 @@ func (b *BigtableIdx) Stop() {
func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := b.MemoryIndex.Update(point, partition)

if !b.cfg.UpdateBigtableIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -241,7 +241,7 @@ func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Arc
func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := b.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
log.Debugf("bigtable-idx: updating def %s in index.", archive.MetricDefinition.Id)
b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -297,7 +297,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
select {
case b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("bigtable-idx: writeQueue is full, update of %s not saved this time", archive.MetricDefinition.Id)
Expand All @@ -315,7 +315,7 @@ func (b *BigtableIdx) rebuildIndex() {
var defs []schema.MetricDefinition
for _, partition := range cluster.Manager.GetPartitions() {
defs = b.LoadPartition(partition, defs[:0], pre)
num += b.MemoryIdx.Load(defs)
num += b.MemoryIndex.LoadPartition(partition, defs)
}

log.Infof("bigtable-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
Expand Down Expand Up @@ -459,7 +459,7 @@ LOOP:

func (b *BigtableIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := b.MemoryIdx.Delete(orgId, pattern)
defs, err := b.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (b *BigtableIdx) deleteRow(key string) error {

func (b *BigtableIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("bigtable-idx: start pruning of series")
pruned, err := b.MemoryIdx.Prune(now)
pruned, err := b.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("bigtable-idx: prune error. %s", err)
Expand Down
28 changes: 16 additions & 12 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type writeReq struct {

// CasIdx implements the the "MetricIndex" interface
type CasIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
cluster *gocql.ClusterConfig
session *gocql.Session
Expand Down Expand Up @@ -96,7 +96,7 @@ func New(cfg *IdxConfig) *CasIdx {
}

idx := &CasIdx{
MemoryIdx: *memory.New(),
MemoryIndex: memory.New(),
cfg: cfg,
cluster: cluster,
updateInterval32: uint32(cfg.updateInterval.Nanoseconds() / int64(time.Second)),
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *CasIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (c *CasIdx) Init() error {
log.Infof("initializing cassandra-idx. Hosts=%s", c.cfg.hosts)
if err := c.MemoryIdx.Init(); err != nil {
if err := c.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -207,7 +207,7 @@ func (c *CasIdx) Init() error {

func (c *CasIdx) Stop() {
log.Info("cassandra-idx: stopping")
c.MemoryIdx.Stop()
c.MemoryIndex.Stop()

// if updateCassIdx is disabled then writeQueue should never have been initialized
if c.cfg.updateCassIdx {
Expand All @@ -222,7 +222,7 @@ func (c *CasIdx) Stop() {
func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := c.MemoryIndex.Update(point, partition)

if !c.cfg.updateCassIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive,
func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := c.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -302,7 +302,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
select {
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)
Expand All @@ -315,8 +315,12 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
defs := c.LoadPartitions(cluster.Manager.GetPartitions(), nil, pre)
num := c.MemoryIdx.Load(defs)
var defs []schema.MetricDefinition
var num int
for _, partition := range cluster.Manager.GetPartitions() {
defs = c.LoadPartitions([]int32{partition}, defs[:0], pre)
num += c.MemoryIndex.LoadPartition(partition, defs)
}
log.Infof("cassandra-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
}

Expand Down Expand Up @@ -549,7 +553,7 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {

func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := c.MemoryIdx.Delete(orgId, pattern)
defs, err := c.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -596,7 +600,7 @@ func (c *CasIdx) deleteDefAsync(key schema.MKey, part int32) {

func (c *CasIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("cassandra-idx: start pruning of series")
pruned, err := c.MemoryIdx.Prune(now)
pruned, err := c.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("cassandra-idx: pruning error: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func init() {
}

func initForTests(c *CasIdx) error {
return c.MemoryIdx.Init()
return c.MemoryIndex.Init()
}

func getSeriesNames(depth, count int, prefix string) []string {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestAddToWriteQueue(t *testing.T) {
So(time.Now(), ShouldHappenAfter, pre.Add(time.Second))
})
})
ix.MemoryIdx.Stop()
ix.MemoryIndex.Stop()
close(ix.writeQueue)
}

Expand Down
Loading

0 comments on commit 20f6fde

Please sign in to comment.