Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
use interface that both MemoryIdx and PartitionedMemoryIdx use
Browse files Browse the repository at this point in the history
- add config flag to enable partitionedIndex
- update cassandra and bigtable indexes to use MemoryIndex interface
- update all unit tests to run against both MemoryIdx and
  PartitionedMemoryIdx
  • Loading branch information
woodsaj committed Mar 8, 2019
1 parent c4c3db9 commit f4222c0
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 116 deletions.
26 changes: 13 additions & 13 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type writeReq struct {
}

type BigtableIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
tbl *bigtable.Table
client *bigtable.Client
Expand All @@ -70,9 +70,9 @@ func New(cfg *IdxConfig) *BigtableIdx {
log.Fatalf("bigtable-idx: %s", err)
}
idx := &BigtableIdx{
MemoryIdx: *memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
MemoryIndex: memory.New(),
cfg: cfg,
shutdown: make(chan struct{}),
}
if cfg.UpdateBigtableIdx {
idx.writeQueue = make(chan writeReq, cfg.WriteQueueSize-cfg.WriteMaxFlushSize)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (b *BigtableIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (b *BigtableIdx) Init() error {
log.Infof("bigtable-idx: Initializing. Project=%s, Instance=%s", b.cfg.GcpProject, b.cfg.BigtableInstance)
if err := b.MemoryIdx.Init(); err != nil {
if err := b.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -190,7 +190,7 @@ func (b *BigtableIdx) Init() error {
}

func (b *BigtableIdx) Stop() {
b.MemoryIdx.Stop()
b.MemoryIndex.Stop()
close(b.shutdown)
if b.cfg.UpdateBigtableIdx {
close(b.writeQueue)
Expand All @@ -208,7 +208,7 @@ func (b *BigtableIdx) Stop() {
func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := b.MemoryIndex.Update(point, partition)

if !b.cfg.UpdateBigtableIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -241,7 +241,7 @@ func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Arc
func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := b.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
log.Debugf("bigtable-idx: updating def %s in index.", archive.MetricDefinition.Id)
b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -297,7 +297,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
select {
case b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
b.MemoryIdx.UpdateArchive(archive)
b.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("bigtable-idx: writeQueue is full, update of %s not saved this time", archive.MetricDefinition.Id)
Expand All @@ -315,7 +315,7 @@ func (b *BigtableIdx) rebuildIndex() {
var defs []schema.MetricDefinition
for _, partition := range cluster.Manager.GetPartitions() {
defs = b.LoadPartition(partition, defs[:0], pre)
num += b.MemoryIdx.Load(defs)
num += b.MemoryIndex.LoadPartition(partition, defs)
}

log.Infof("bigtable-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
Expand Down Expand Up @@ -459,7 +459,7 @@ LOOP:

func (b *BigtableIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := b.MemoryIdx.Delete(orgId, pattern)
defs, err := b.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (b *BigtableIdx) deleteRow(key string) error {

func (b *BigtableIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("bigtable-idx: start pruning of series")
pruned, err := b.MemoryIdx.Prune(now)
pruned, err := b.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("bigtable-idx: prune error. %s", err)
Expand Down
28 changes: 16 additions & 12 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type writeReq struct {

// CasIdx implements the the "MetricIndex" interface
type CasIdx struct {
memory.MemoryIdx
memory.MemoryIndex
cfg *IdxConfig
cluster *gocql.ClusterConfig
session *gocql.Session
Expand Down Expand Up @@ -96,7 +96,7 @@ func New(cfg *IdxConfig) *CasIdx {
}

idx := &CasIdx{
MemoryIdx: *memory.New(),
MemoryIndex: memory.New(),
cfg: cfg,
cluster: cluster,
updateInterval32: uint32(cfg.updateInterval.Nanoseconds() / int64(time.Second)),
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *CasIdx) InitBare() error {
// rebuilds the in-memory index, sets up write queues, metrics and pruning routines
func (c *CasIdx) Init() error {
log.Infof("initializing cassandra-idx. Hosts=%s", c.cfg.hosts)
if err := c.MemoryIdx.Init(); err != nil {
if err := c.MemoryIndex.Init(); err != nil {
return err
}

Expand All @@ -207,7 +207,7 @@ func (c *CasIdx) Init() error {

func (c *CasIdx) Stop() {
log.Info("cassandra-idx: stopping")
c.MemoryIdx.Stop()
c.MemoryIndex.Stop()

// if updateCassIdx is disabled then writeQueue should never have been initialized
if c.cfg.updateCassIdx {
Expand All @@ -222,7 +222,7 @@ func (c *CasIdx) Stop() {
func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.Update(point, partition)
archive, oldPartition, inMemory := c.MemoryIndex.Update(point, partition)

if !c.cfg.updateCassIdx {
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -251,7 +251,7 @@ func (c *CasIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive,
func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := c.MemoryIdx.AddOrUpdate(mkey, data, partition)
archive, oldPartition, inMemory := c.MemoryIndex.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
Expand Down Expand Up @@ -291,7 +291,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -302,7 +302,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
select {
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
c.MemoryIndex.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)
Expand All @@ -315,8 +315,12 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
defs := c.LoadPartitions(cluster.Manager.GetPartitions(), nil, pre)
num := c.MemoryIdx.Load(defs)
var defs []schema.MetricDefinition
var num int
for _, partition := range cluster.Manager.GetPartitions() {
defs = c.LoadPartitions([]int32{partition}, defs[:0], pre)
num += c.MemoryIndex.LoadPartition(partition, defs)
}
log.Infof("cassandra-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
}

Expand Down Expand Up @@ -549,7 +553,7 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {

func (c *CasIdx) Delete(orgId uint32, pattern string) ([]idx.Archive, error) {
pre := time.Now()
defs, err := c.MemoryIdx.Delete(orgId, pattern)
defs, err := c.MemoryIndex.Delete(orgId, pattern)
if err != nil {
return defs, err
}
Expand Down Expand Up @@ -596,7 +600,7 @@ func (c *CasIdx) deleteDefAsync(key schema.MKey, part int32) {

func (c *CasIdx) Prune(now time.Time) ([]idx.Archive, error) {
log.Info("cassandra-idx: start pruning of series")
pruned, err := c.MemoryIdx.Prune(now)
pruned, err := c.MemoryIndex.Prune(now)
duration := time.Since(now)
if err != nil {
log.Errorf("cassandra-idx: pruning error: %s", err)
Expand Down
4 changes: 2 additions & 2 deletions idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func init() {
}

func initForTests(c *CasIdx) error {
return c.MemoryIdx.Init()
return c.MemoryIndex.Init()
}

func getSeriesNames(depth, count int, prefix string) []string {
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestAddToWriteQueue(t *testing.T) {
So(time.Now(), ShouldHappenAfter, pre.Add(time.Second))
})
})
ix.MemoryIdx.Stop()
ix.MemoryIndex.Stop()
close(ix.writeQueue)
}

Expand Down
25 changes: 25 additions & 0 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ var (
TagQueryWorkers int // number of workers to spin up when evaluation tag expressions
indexRulesFile string
IndexRules conf.IndexRules
Partitioned bool
)

func ConfigSetup() {
memoryIdx := flag.NewFlagSet("memory-idx", flag.ExitOnError)
memoryIdx.BoolVar(&Enabled, "enabled", false, "")
memoryIdx.BoolVar(&TagSupport, "tag-support", false, "enables/disables querying based on tags")
memoryIdx.BoolVar(&Partitioned, "partitioned", false, "use separate indexes per partition")
memoryIdx.IntVar(&TagQueryWorkers, "tag-query-workers", 50, "number of workers to spin up to evaluate tag queries")
memoryIdx.IntVar(&matchCacheSize, "match-cache-size", 1000, "size of regular expression cache in tag query evaluation")
memoryIdx.StringVar(&indexRulesFile, "rules-file", "/etc/metrictank/index-rules.conf", "path to index-rules.conf file")
Expand All @@ -87,6 +89,23 @@ func ConfigProcess() {
}
}

// interface implemented by both MemoryIdx and PartitionedMemoryIdx
// this is needed to support unit tests.
type MemoryIndex interface {
idx.MetricIndex
LoadPartition(int32, []schema.MetricDefinition) int
UpdateArchive(idx.Archive)
add(*schema.MetricDefinition) idx.Archive
idsByTagQuery(uint32, TagQuery) IdSet
}

func New() MemoryIndex {
if Partitioned {
return NewPartitionedMemoryIdx()
}
return NewMemoryIdx()
}

type Tree struct {
Items map[string]*Node // key is the full path of the node.
}
Expand Down Expand Up @@ -374,6 +393,12 @@ func (m *MemoryIdx) deindexTags(tags TagIndex, def *schema.MetricDefinition) boo
return true
}

// Used to rebuild the index from an existing set of metricDefinitions for a specific paritition.
func (m *MemoryIdx) LoadPartition(partition int32, defs []schema.MetricDefinition) int {
// MemoryIdx isnt partitioned, so just ignore the partition passed and call Load()
return m.Load(defs)
}

// Used to rebuild the index from an existing set of metricDefinitions.
func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int {
m.Lock()
Expand Down
Loading

0 comments on commit f4222c0

Please sign in to comment.