Skip to content

Commit

Permalink
app/vmselect: reduce lock contention for heavy aggregation requests (#…
Browse files Browse the repository at this point in the history
…5119)

reduce lock contention for heavy aggregation requests
previously lock contetion may happen on machine with big number of CPU due to enabled string interning. sync.Map was a choke point for all aggregation requests.
Now instead of interning, new string is created. It may increase CPU and memory usage for some cases.
#5087
  • Loading branch information
f41gh7 committed Oct 10, 2023
1 parent b52f1d1 commit 4a50e94
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 21 deletions.
3 changes: 1 addition & 2 deletions app/vmselect/promql/aggr.go
Expand Up @@ -8,7 +8,6 @@ import (
"strconv"
"strings"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
Expand Down Expand Up @@ -130,7 +129,7 @@ func aggrPrepareSeries(argOrig []*timeseries, modifier *metricsql.ModifierExpr,
for i, ts := range arg {
removeGroupTags(&ts.MetricName, modifier)
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
if keepOriginal {
ts = argOrig[i]
}
Expand Down
3 changes: 1 addition & 2 deletions app/vmselect/promql/aggr_incremental.go
Expand Up @@ -6,7 +6,6 @@ import (
"unsafe"

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/metricsql"
)

Expand Down Expand Up @@ -105,7 +104,7 @@ func (iafc *incrementalAggrFuncContext) updateTimeseries(tsOrig *timeseries, wor
removeGroupTags(&ts.MetricName, &iafc.ae.Modifier)
bb := bbPool.Get()
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
iac := m[k]
if iac == nil {
if iafc.ae.Limit > 0 && len(m) >= iafc.ae.Limit {
Expand Down
6 changes: 2 additions & 4 deletions app/vmselect/promql/binary_op.go
Expand Up @@ -5,7 +5,6 @@ import (
"math"
"strings"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metricsql"
Expand Down Expand Up @@ -261,8 +260,7 @@ func groupJoin(singleTimeseriesSide string, be *metricsql.BinaryOpExpr, rvsLeft,
bb.B = marshalMetricTagsSorted(bb.B[:0], &tsCopy.MetricName)
pair, ok := m[string(bb.B)]
if !ok {
k := bytesutil.InternBytes(bb.B)
m[k] = &tsPair{
m[string(bb.B)] = &tsPair{
left: &tsCopy,
right: tsRight,
}
Expand Down Expand Up @@ -524,7 +522,7 @@ func createTimeseriesMapByTagSet(be *metricsql.BinaryOpExpr, left, right []*time
logger.Panicf("BUG: unexpected binary op modifier %q", groupOp)
}
bb.B = marshalMetricTagsSorted(bb.B[:0], mn)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
m[k] = append(m[k], ts)
}
storage.PutMetricName(mn)
Expand Down
7 changes: 3 additions & 4 deletions app/vmselect/promql/eval.go
Expand Up @@ -563,8 +563,8 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
for _, tag := range ts.MetricName.Tags {
vc, ok := m[string(tag.Key)]
if !ok {
k := bytesutil.InternBytes(tag.Key)
v := bytesutil.InternBytes(tag.Value)
k := string(tag.Key)
v := string(tag.Value)
m[k] = &valuesCounter{
values: map[string]struct{}{
v: {},
Expand All @@ -581,8 +581,7 @@ func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
}
vc.count++
if _, ok := vc.values[string(tag.Value)]; !ok {
v := bytesutil.InternBytes(tag.Value)
vc.values[v] = struct{}{}
vc.values[string(tag.Value)] = struct{}{}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions app/vmselect/promql/exec.go
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
Expand Down Expand Up @@ -129,7 +128,7 @@ func timeseriesToResult(tss []*timeseries, maySort bool) ([]netstorage.Result, e
bb := bbPool.Get()
for i, ts := range tss {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
if _, ok := m[k]; ok {
return nil, fmt.Errorf(`duplicate output timeseries: %s`, stringMetricName(&ts.MetricName))
}
Expand Down
5 changes: 2 additions & 3 deletions app/vmselect/promql/rollup_result_cache.go
Expand Up @@ -442,8 +442,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
defer bbPool.Put(bb)
for _, ts := range a {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
m[k] = ts
m[string(bb.B)] = ts
}

rvs := make([]*timeseries, 0, len(a))
Expand All @@ -455,7 +454,7 @@ func mergeTimeseries(a, b []*timeseries, bStart int64, ec *EvalConfig) []*timese
tmp.MetricName.MoveFrom(&tsB.MetricName)

bb.B = marshalMetricNameSorted(bb.B[:0], &tmp.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
tsA := m[k]
if tsA == nil {
tStart := ec.Start
Expand Down
8 changes: 4 additions & 4 deletions app/vmselect/promql/transform.go
Expand Up @@ -420,7 +420,7 @@ func transformBucketsLimit(tfa *transformFuncArg) ([]*timeseries, error) {
mn.CopyFrom(&ts.MetricName)
mn.RemoveTag("le")
b = marshalMetricNameSorted(b[:0], &mn)
k := bytesutil.InternBytes(b)
k := bytesutil.ToUnsafeString(b)
m[k] = append(m[k], x{
le: le,
ts: ts,
Expand Down Expand Up @@ -523,7 +523,7 @@ func vmrangeBucketsToLE(tss []*timeseries) []*timeseries {
ts.MetricName.RemoveTag("le")
ts.MetricName.RemoveTag("vmrange")
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
m[k] = append(m[k], x{
startStr: startStr,
endStr: endStr,
Expand Down Expand Up @@ -1023,7 +1023,7 @@ func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
ts.MetricName.ResetMetricGroup()
ts.MetricName.RemoveTag("le")
bb.B = marshalMetricTagsSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
m[k] = append(m[k], leTimeseries{
le: le,
ts: ts,
Expand Down Expand Up @@ -1657,7 +1657,7 @@ func transformUnion(tfa *transformFuncArg) ([]*timeseries, error) {
for _, arg := range args {
for _, ts := range arg {
bb.B = marshalMetricNameSorted(bb.B[:0], &ts.MetricName)
k := bytesutil.InternBytes(bb.B)
k := string(bb.B)
if m[k] {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Expand Up @@ -40,6 +40,8 @@ The sandbox cluster installation is running under the constant load generated by

* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): strip sensitive information such as auth headers or passwords from datasource, remote-read, remote-write or notifier URLs in log messages or UI. This behavior is by default and is controlled via `-datasource.showURL`, `-remoteRead.showURL`, `remoteWrite.showURL` or `-notifier.showURL` cmd-line flags. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5044).

* BUGFIX: [vmselect](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve performance and memory usage during query processing on machines with big number of CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5087) for details.

## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)

Released at 2023-10-02
Expand Down

0 comments on commit 4a50e94

Please sign in to comment.