Skip to content

Commit

Permalink
lib/streamaggr: removed unused resetState arg
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewChubatiuk committed May 7, 2024
1 parent 301bd38 commit 2a84e90
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 163 deletions.
14 changes: 5 additions & 9 deletions lib/streamaggr/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,18 @@ func (as *avgAggrState) pushSamples(samples []pushSample) {
}
}

func (as *avgAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *avgAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/count_samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,18 @@ func (as *countSamplesAggrState) pushSamples(samples []pushSample) {
}
}

func (as *countSamplesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *countSamplesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/count_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,18 @@ func (as *countSeriesAggrState) pushSamples(samples []pushSample) {
}
}

func (as *countSeriesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *countSeriesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := len(sv.m)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
8 changes: 4 additions & 4 deletions lib/streamaggr/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (ctx *dedupFlushCtx) reset() {
ctx.samples = ctx.samples[:0]
}

func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
func (da *dedupAggr) flush(f func(samples []pushSample)) {
var wg sync.WaitGroup
for i := range da.shards {
flushConcurrencyCh <- struct{}{}
Expand All @@ -125,7 +125,7 @@ func (da *dedupAggr) flush(f func(samples []pushSample), resetState bool) {
}()

ctx := getDedupFlushCtx()
shard.flush(ctx, f, resetState)
shard.flush(ctx, f)
putDedupFlushCtx(ctx)
}(&da.shards[i])
}
Expand Down Expand Up @@ -191,11 +191,11 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample) {
}
}

func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample), resetState bool) {
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f func(samples []pushSample)) {
das.mu.Lock()

m := das.m
if resetState && len(m) > 0 {
if len(m) > 0 {
das.m = make(map[string]dedupAggrSample, len(m))
}

Expand Down
2 changes: 1 addition & 1 deletion lib/streamaggr/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDedupAggrSerial(t *testing.T) {
}
mu.Unlock()
}
da.flush(flushSamples, true)
da.flush(flushSamples)

if !reflect.DeepEqual(expectedSamplesMap, flushedSamplesMap) {
t.Fatalf("unexpected samples;\ngot\n%v\nwant\n%v", flushedSamplesMap, expectedSamplesMap)
Expand Down
6 changes: 2 additions & 4 deletions lib/streamaggr/dedup_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)

func BenchmarkDedupAggr(b *testing.B) {
Expand All @@ -28,7 +27,7 @@ func BenchmarkDedupAggrFlushSerial(b *testing.B) {
b.ReportAllocs()
b.SetBytes(int64(len(benchSamples)))
for i := 0; i < b.N; i++ {
da.flush(as.pushSamples, false)
da.flush(as.pushSamples)
}
}

Expand All @@ -50,7 +49,6 @@ func benchmarkDedupAggr(b *testing.B, samplesPerPush int) {
}

func newBenchSamples(count int) []pushSample {
var lc promutils.LabelsCompressor
labels := []prompbmarshal.Label{
{
Name: "app",
Expand Down Expand Up @@ -82,7 +80,7 @@ func newBenchSamples(count int) []pushSample {
Name: "app",
Value: fmt.Sprintf("instance-%d", i),
})
keyBuf = compressLabels(keyBuf[:0], &lc, labels[:labelsLen], labels[labelsLen:])
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:])
sample.key = string(keyBuf)
sample.value = float64(i)
}
Expand Down
17 changes: 4 additions & 13 deletions lib/streamaggr/deduplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// Deduplicator deduplicates samples per each time series.
type Deduplicator struct {
da *dedupAggr
lc promutils.LabelsCompressor

dropLabels []string

Expand All @@ -38,8 +37,7 @@ type Deduplicator struct {
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator {
d := &Deduplicator{
da: newDedupAggr(),

da: newDedupAggr(),
dropLabels: dropLabels,

stopCh: make(chan struct{}),
Expand All @@ -54,13 +52,6 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
return float64(d.da.itemsCount())
})

_ = ms.NewGauge(`vm_streamaggr_labels_compressor_size_bytes`, func() float64 {
return float64(d.lc.SizeBytes())
})
_ = ms.NewGauge(`vm_streamaggr_labels_compressor_items_count`, func() float64 {
return float64(d.lc.ItemsCount())
})

d.dedupFlushDuration = ms.GetOrCreateHistogram(`vm_streamaggr_dedup_flush_duration_seconds`)
d.dedupFlushTimeouts = ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`)

Expand Down Expand Up @@ -103,7 +94,7 @@ func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries) {
}
labels.Sort()

buf = d.lc.Compress(buf[:0], labels.Labels)
buf = lc.Compress(buf[:0], labels.Labels)
key := bytesutil.InternBytes(buf)
for _, s := range ts.Samples {
pss = append(pss, pushSample{
Expand Down Expand Up @@ -155,7 +146,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
samples := ctx.samples
for _, ps := range pss {
labelsLen := len(labels)
labels = decompressLabels(labels, &d.lc, ps.key)
labels = decompressLabels(labels, ps.key)

samplesLen := len(samples)
samples = append(samples, prompbmarshal.Sample{
Expand All @@ -174,7 +165,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc, dedupInterval time.Duration) {
ctx.labels = labels
ctx.samples = samples
putDeduplicatorFlushCtx(ctx)
}, true)
})

duration := time.Since(startTime)
d.dedupFlushDuration.Update(duration.Seconds())
Expand Down
3 changes: 1 addition & 2 deletions lib/streamaggr/histogram_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
})
}

func (as *histogramBucketAggrState) flushState(ctx *flushCtx, resetState bool) {
_ = resetState // it isn't used here
func (as *histogramBucketAggrState) flushState(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000

Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/last.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,18 @@ func (as *lastAggrState) pushSamples(samples []pushSample) {
}
}

func (as *lastAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *lastAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,18 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
}
}

func (as *maxAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *maxAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,18 @@ func (as *minAggrState) pushSamples(samples []pushSample) {
}
}

func (as *minAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *minAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, "min", currentTimeMsec, min)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,22 @@ func (as *quantilesAggrState) pushSamples(samples []pushSample) {
}
}

func (as *quantilesAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *quantilesAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/stddev.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,18 @@ func (as *stddevAggrState) pushSamples(samples []pushSample) {
}
}

func (as *stddevAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *stddevAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down
14 changes: 5 additions & 9 deletions lib/streamaggr/stdvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,18 @@ func (as *stdvarAggrState) pushSamples(samples []pushSample) {
}
}

func (as *stdvarAggrState) flushState(ctx *flushCtx, resetState bool) {
func (as *stdvarAggrState) flushState(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
m := &as.m
m.Range(func(k, v interface{}) bool {
if resetState {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
}
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)

sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
if resetState {
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()

key := k.(string)
Expand Down

0 comments on commit 2a84e90

Please sign in to comment.