Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Add partitioned index #1232

Merged
merged 10 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func main() {
notifierKafka.ConfigProcess(*instance)
statsConfig.ConfigProcess(*instance)
mdata.ConfigProcess()
memory.ConfigProcess()
cassandra.ConfigProcess()
bigtable.ConfigProcess()
bigtableStore.ConfigProcess(mdata.MaxChunkSpan())
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false

### Bigtable index
[bigtable-idx]
Expand Down
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ match-cache-size = 1000
rules-file = /etc/metrictank/index-rules.conf
# maximum duration each second a prune job can lock the index.
max-prune-lock-time = 100ms
# use separate indexes per partition
partitioned = false
```

### Bigtable index
Expand Down
26 changes: 13 additions & 13 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type writeReq struct {
}

type BigtableIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
tbl *bigtable.Table
client *bigtable.Client
Expand All @@ -70,9 +70,9 @@ func New(cfg *IdxConfig) *BigtableIdx {
log.Fatalf("bigtable-idx: %s", err)
}
idx := &BigtableIdx{
MemoryIdx: *memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
MemoryIndex: memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
}
if cfg.UpdateBigtableIdx {
idx.writeQueue = make(chan writeReq, cfg.WriteQueueSize-cfg.WriteMaxFlushSize)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (b *BigtableIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (b *BigtableIdx) Init() error {
log.Infof("bigtable-idx: Initializing. Project=%s, Instance=%s", b.cfg.GcpProject, b.cfg.BigtableInstance)
if err := b.MemoryIdx.Init(); err != nil {
if err := b.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -190,7 +190,7 @@ func (b *BigtableIdx) Init() error {
}

func (b *BigtableIdx) Stop() {
b.MemoryIdx.Stop()
b.MemoryIndex.Stop()
close(b.shutdown)
if b.cfg.UpdateBigtableIdx {
close(b.writeQueue)
Expand All @@ -208,7 +208,7 @@ func (b *BigtableIdx) Stop() {
func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := b.MemoryIndex.Update(point, partition)

if !b.cfg.UpdateBigtableIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -241,7 +241,7 @@ func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Arc
func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := b.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
log.Debugf("bigtable-idx: updating def %s in index.", archive.MetricDefinition.Id)
b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -297,7 +297,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
select {
case b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("bigtable-idx: writeQueue is full, update of %s not saved this time", archive.MetricDefinition.Id)
Expand All @@ -315,7 +315,7 @@ func (b *BigtableIdx) rebuildIndex() {
var defs []schema.MetricDefinition
for _, partition := range cluster.Manager.GetPartitions() {
defs = b.LoadPartition(partition, defs[:0], pre)
num += b.MemoryIdx.Load(defs)
num += b.MemoryIndex.LoadPartition(partition, defs)
}

log.Infof("bigtable-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
Expand Down Expand Up @@ -459,7 +459,7 @@ LOOP:

func (b *BigtableIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := b.MemoryIdx.Delete(orgId, pattern)
defs, err := b.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (b *BigtableIdx) deleteRow(key string) error {

func (b *BigtableIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("bigtable-idx: start pruning of series")
pruned, err := b.MemoryIdx.Prune(now)
pruned, err := b.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("bigtable-idx: prune error. %s", err)
Expand Down
28 changes: 16 additions & 12 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type writeReq struct {

// CasIdx implements the the "MetricIndex" interface
type CasIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
cluster *gocql.ClusterConfig
session *gocql.Session
Expand Down Expand Up @@ -96,7 +96,7 @@ func New(cfg *IdxConfig) *CasIdx {
}

idx := &CasIdx{
MemoryIdx: *memory.New(),
MemoryIndex: memory.New(),
cfg: cfg,
cluster: cluster,
updateInterval32: uint32(cfg.updateInterval.Nanoseconds() / int64(time.Second)),
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *CasIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (c *CasIdx) Init() error {
log.Infof("initializing cassandra-idx. Hosts=%s", c.cfg.hosts)
if err := c.MemoryIdx.Init(); err != nil {
if err := c.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -207,7 +207,7 @@ func (c *CasIdx) Init() error {

func (c *CasIdx) Stop() {
log.Info("cassandra-idx: stopping")
c.MemoryIdx.Stop()
c.MemoryIndex.Stop()

// if updateCassIdx is disabled then writeQueue should never have been initialized
if c.cfg.updateCassIdx {
Expand All @@ -222,7 +222,7 @@ func (c *CasIdx) Stop() {
func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := c.MemoryIndex.Update(point, partition)

if !c.cfg.updateCassIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive,
func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := c.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -302,7 +302,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
select {
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)
Expand All @@ -315,8 +315,12 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
defs := c.LoadPartitions(cluster.Manager.GetPartitions(), nil, pre)
num := c.MemoryIdx.Load(defs)
var defs []schema.MetricDefinition
var num int
for _, partition := range cluster.Manager.GetPartitions() {
defs = c.LoadPartitions([]int32{partition}, defs[:0], pre)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@woodsaj isn't this a bug? subsequent calls of this seem to overwrite previously loaded defs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no.

num += c.MemoryIndex.LoadPartition(partition, defs)
}
log.Infof("cassandra-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
}

Expand Down Expand Up @@ -549,7 +553,7 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {

func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := c.MemoryIdx.Delete(orgId, pattern)
defs, err := c.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -596,7 +600,7 @@ func (c *CasIdx) deleteDefAsync(key schema.MKey, part int32) {

func (c *CasIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("cassandra-idx: start pruning of series")
pruned, err := c.MemoryIdx.Prune(now)
pruned, err := c.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("cassandra-idx: pruning error: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func init() {
}

func initForTests(c *CasIdx) error {
return c.MemoryIdx.Init()
return c.MemoryIndex.Init()
}

func getSeriesNames(depth, count int, prefix string) []string {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestAddToWriteQueue(t *testing.T) {
So(time.Now(), ShouldHappenAfter, pre.Add(time.Second))
})
})
ix.MemoryIdx.Stop()
ix.MemoryIndex.Stop()
close(ix.writeQueue)
}

Expand Down
Loading