Skip to content

Commit

Permalink
Memory improvements first pass (open-telemetry#1293)
Browse files Browse the repository at this point in the history
* Memory improvements first pass

* Comments, store hash

* Fix linting and tests

* Update, more tests and benchmarks, notes

* linting
  • Loading branch information
jaronoff97 committed Dec 6, 2022
1 parent a66f2aa commit 1339b7f
Show file tree
Hide file tree
Showing 16 changed files with 431 additions and 230 deletions.
60 changes: 51 additions & 9 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ type consistentHashingAllocator struct {
consistentHasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
// collectorKey -> collector pointer
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
// targetItem hash -> target item pointer
targetItems map[string]*target.Item

// collectorKey -> job -> target item hash -> true
targetItemsPerJobPerCollector map[string]map[string]map[string]bool

log logr.Logger

filter Filter
Expand All @@ -62,10 +67,11 @@ func newConsistentHashingAllocator(log logr.Logger, opts ...AllocationOption) Al
}
consistentHasher := consistent.New(nil, config)
chAllocator := &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
log: log,
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*target.Item),
targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool),
log: log,
}
for _, opt := range opts {
opt(chAllocator)
Expand All @@ -79,19 +85,36 @@ func (c *consistentHashingAllocator) SetFilter(filter Filter) {
c.filter = filter
}

// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets
// this allows the allocator to respond without any extra allocations to http calls. The caller of this method
// has to acquire a lock.
func (c *consistentHashingAllocator) addCollectorTargetItemMapping(tg *target.Item) {
if c.targetItemsPerJobPerCollector[tg.CollectorName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool)
}
if c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil {
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool)
}
c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap.
// INVARIANT: c.collectors must have at least 1 collector set.
// NOTE: by not creating a new target item, there is the potential for a race condition where we modify this target
// item while it's being encoded by the server JSON handler.
func (c *consistentHashingAllocator) addTargetToTargetItems(tg *target.Item) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[tg.CollectorName]; ok {
previousColName.NumTargets--
delete(c.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName], tg.Hash())
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(tg.Hash()))
targetItem := target.NewItem(tg.JobName, tg.TargetURL, tg.Label, colOwner.String())
c.targetItems[targetItem.Hash()] = targetItem
tg.CollectorName = colOwner.String()
c.targetItems[tg.Hash()] = tg
c.addCollectorTargetItemMapping(tg)
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}
Expand All @@ -107,6 +130,7 @@ func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*target.Ite
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
delete(c.targetItemsPerJobPerCollector[target.CollectorName][target.JobName], target.Hash())
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}
Expand All @@ -130,6 +154,7 @@ func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collect
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
delete(c.targetItemsPerJobPerCollector, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
Expand All @@ -155,7 +180,7 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
if c.filter != nil {
targets = c.filter.Apply(targets)
}
RecordTargetsKeptPerJob(targets)
RecordTargetsKept(targets)

c.m.Lock()
defer c.m.Unlock()
Expand All @@ -175,13 +200,12 @@ func (c *consistentHashingAllocator) SetTargets(targets map[string]*target.Item)
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
c.log.Info("No collector instances present")
return
}

Expand All @@ -195,6 +219,24 @@ func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collec
}
}

func (c *consistentHashingAllocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item {
c.m.RLock()
defer c.m.RUnlock()
if _, ok := c.targetItemsPerJobPerCollector[collector]; !ok {
return []*target.Item{}
}
if _, ok := c.targetItemsPerJobPerCollector[collector][job]; !ok {
return []*target.Item{}
}
targetItemsCopy := make([]*target.Item, len(c.targetItemsPerJobPerCollector[collector][job]))
index := 0
for targetHash := range c.targetItemsPerJobPerCollector[collector][job] {
targetItemsCopy[index] = c.targetItems[targetHash]
index++
}
return targetItemsCopy
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*target.Item {
c.m.RLock()
Expand Down
1 change: 0 additions & 1 deletion cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func TestRelativelyEvenDistribution(t *testing.T) {
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}
Expand Down
55 changes: 9 additions & 46 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,24 @@ import (
"fmt"
"net/url"

"github.com/prometheus/common/model"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target"
)

type collectorJSON struct {
Link string `json:"_link"`
Jobs []targetGroupJSON `json:"targets"`
}

type targetGroupJSON struct {
Targets []string `json:"targets"`
Labels model.LabelSet `json:"labels"`
Link string `json:"_link"`
Jobs []*target.Item `json:"targets"`
}

func GetAllTargetsByJob(job string, cMap map[string][]target.Item, allocator Allocator) map[string]collectorJSON {
// GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes.
func GetAllTargetsByJob(allocator Allocator, job string) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []target.Item
targetList = append(targetList, cMap[j.CollectorName+j.JobName]...)

var targetGroupList []targetGroupJSON

for _, t := range targetList {
targetGroupList = append(targetGroupList, targetGroupJSON{
Targets: []string{t.TargetURL},
Labels: t.Label,
})
}

displayData[j.CollectorName] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(j.JobName), j.CollectorName), Jobs: targetGroupList}

}
for _, col := range allocator.Collectors() {
items := allocator.GetTargetsForCollectorAndJob(col.Name, job)
displayData[col.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(job), col.Name), Jobs: items}
}
return displayData
}

func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[string][]target.Item, allocator Allocator) []targetGroupJSON {
var tgs []targetGroupJSON
group := make(map[string]target.Item)
labelSet := make(map[string]model.LabelSet)
if _, ok := allocator.Collectors()[collector]; ok {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
if targetItem.CollectorName == collector && targetItem.JobName == job {
group[targetItem.Label.String()] = targetItem
labelSet[targetItem.Hash()] = targetItem.Label
}
}
}
}
for _, v := range group {
tgs = append(tgs, targetGroupJSON{Targets: []string{v.TargetURL}, Labels: labelSet[v.Hash()]})
}

return tgs
func GetAllTargetsByCollectorAndJob(allocator Allocator, collector string, job string) []*target.Item {
return allocator.GetTargetsForCollectorAndJob(collector, job)
}
Loading

0 comments on commit 1339b7f

Please sign in to comment.