Skip to content

Commit

Permalink
app/vmselect: reduce lock contention for heavy aggregation requests
Browse files Browse the repository at this point in the history
previously lock contetion may happen on machine with big number of CPU due to enabled string interning. sync.Map was a choke point for all aggregation requests.
Now instead of interning, new string is created. It may increase CPU and memory usage for some cases.
#5087
  • Loading branch information
f41gh7 committed Oct 3, 2023
1 parent dc71db9 commit 6d6939c
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 14 deletions.
3 changes: 2 additions & 1 deletion app/vmselect/promql/aggr.go
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
Expand Down Expand Up @@ -130,7 +131,7 @@ func aggrPrepareSeries(argOrig []*timeseries, modifier *metricsql.ModifierExpr,
for i, ts := range arg {
removeGroupTags(&ts.MetricName, modifier)
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
if keepOriginal {
ts = argOrig[i]
}
Expand Down
2 changes: 1 addition & 1 deletion app/vmselect/promql/aggr_incremental.go
Expand Up @@ -105,7 +105,7 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
bb := bbPool.Get()
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
iac := m[k]
if iac == nil {
if iafc.ae.Limit > 0 && len(m) >= iafc.ae.Limit {
Expand Down
4 changes: 2 additions & 2 deletions app/vmselect/promql/binary_op.go
Expand Up @@ -261,7 +261,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
pair, ok := m[string(bb.B)]
if !ok {
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
m[k] = &tsPair{
left: &tsCopy,
right: tsRight,
Expand Down Expand Up @@ -524,7 +524,7 @@ func createTimeseriesMapByTagSet(be *metricsql.BinaryOpExpr, left, right []*time
logger.Panicf("BUG: unexpected binary op modifier %q", groupOp)
}
bb.B = marshalMetricTagsSorted(bb.B[:0], mn)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
m[k] = append(m[k], ts)
}
storage.PutMetricName(mn)
Expand Down
6 changes: 3 additions & 3 deletions app/vmselect/promql/eval.go
Expand Up @@ -563,8 +563,8 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
for _, tag := range ts.MetricName.Tags {
vc, ok := m[string(tag.Key)]
if !ok {
k := bytesutil.InternBytes(tag.Key)
v := bytesutil.InternBytes(tag.Value)
k := bytesutil.ToStringOwned(tag.Key)
v := bytesutil.ToStringOwned(tag.Value)
m[k] = &valuesCounter{
values: map[string]struct{}{
v: {},
Expand All @@ -581,7 +581,7 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
}
vc.count++
if _, ok := vc.values[string(tag.Value)]; !ok {
v := bytesutil.InternBytes(tag.Value)
v := bytesutil.ToStringOwned(tag.Value)
vc.values[v] = struct{}{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/vmselect/promql/exec.go
Expand Up @@ -129,7 +129,7 @@ func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, e
bb := bbPool.Get()
for i, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
if _, ok := m[k]; ok {
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
}
Expand Down
4 changes: 2 additions & 2 deletions app/vmselect/promql/rollup_result_cache.go
Expand Up @@ -442,7 +442,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
defer bbPool.Put(bb)
for _, ts := range a {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
m[k] = ts
}

Expand All @@ -455,7 +455,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
tmp.MetricName.MoveFrom(&tsB.MetricName)

bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
tsA := m[k]
if tsA == nil {
tStart := ec.Start
Expand Down
8 changes: 4 additions & 4 deletions app/vmselect/promql/transform.go
Expand Up @@ -420,7 +420,7 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
mn.CopyFrom(&ts.MetricName)
mn.RemoveTag("le")
b = marshalMetricNameSorted(b[:0], &mn)
k := bytesutil.InternBytes(b)
k := bytesutil.ToUnsafeString(b)
m[k] = append(m[k], x{
le: le,
ts: ts,
Expand Down Expand Up @@ -523,7 +523,7 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
ts.MetricName.RemoveTag("le")
ts.MetricName.RemoveTag("vmrange")
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
m[k] = append(m[k], x{
startStr: startStr,
endStr: endStr,
Expand Down Expand Up @@ -1023,7 +1023,7 @@ func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
ts.MetricName.ResetMetricGroup()
ts.MetricName.RemoveTag("le")
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
m[k] = append(m[k], leTimeseries{
le: le,
ts: ts,
Expand Down Expand Up @@ -1657,7 +1657,7 @@ func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) {
for _, arg := range args {
for _, ts := range arg {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := bytesutil.ToStringOwned(bb.B)
if m[k] {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Expand Up @@ -27,6 +27,8 @@ The sandbox cluster installation is running under the constant load generated by
[prometheus-benchmark](https://github.com/VictoriaMetrics/prometheus-benchmark) and used for testing latest releases.

## tip
*
* BUGFIX: [vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): Improves query performance on machines with a big number of cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details.

## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)

Expand Down
10 changes: 10 additions & 0 deletions lib/bytesutil/bytesutil.go
Expand Up @@ -90,3 +90,13 @@ func LimitStringLen(s string, maxLen int) string {
n := maxLen/2 - 1
return s[:n] + ".." + s[len(s)-n:]
}

// ToStringOwned converts bytes into string and returns its copy
func ToStringOwned(b []byte) string {
if len(b) == 0 {
return ""
}
s := make([]byte, len(b))
copy(s, b)
return unsafe.String(&s[0], len(s))

Check warning on line 101 in lib/bytesutil/bytesutil.go

View check run for this annotation

Codecov / codecov/patch

lib/bytesutil/bytesutil.go#L95-L101

Added lines #L95 - L101 were not covered by tests
}

0 comments on commit 6d6939c

Please sign in to comment.