Skip to content
This repository was archived by the owner on Jul 19, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions pkg/phlaredb/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (d *deltaProfiles) computeDelta(ps *schemav1.Profile, lbs phlaremodel.Label
if !ok {
// if we don't have the last profile, we can't compute the delta.
// so we remove the delta from the list of labels and profiles.
d.highestSamples[ps.SeriesFingerprint] = ps.Samples
d.highestSamples[ps.SeriesFingerprint] = copySampleSlice(ps.Samples)

return nil
}
Expand All @@ -54,7 +54,13 @@ func (d *deltaProfiles) computeDelta(ps *schemav1.Profile, lbs phlaremodel.Label
return ps
}

highestSamples := deltaSamples(lastSamples, ps.Samples)
highestSamples, reset := deltaSamples(lastSamples, ps.Samples)
if reset {
// if we reset the delta, we can't compute the delta anymore.
// so we remove the delta from the list of labels and profiles.
d.highestSamples[ps.SeriesFingerprint] = copySampleSlice(ps.Samples)
return nil
}

// remove samples that are all zero
i := 0
Expand All @@ -64,11 +70,32 @@ func (d *deltaProfiles) computeDelta(ps *schemav1.Profile, lbs phlaremodel.Label
i++
}
}
ps.Samples = ps.Samples[:i]
ps.Samples = copySlice(ps.Samples[:i])
d.highestSamples[ps.SeriesFingerprint] = highestSamples
return ps
}

func copySampleSlice(s []*schemav1.Sample) []*schemav1.Sample {
if s == nil {
return nil
}
r := make([]*schemav1.Sample, len(s))
for i := range s {
r[i] = copySample(s[i])
}
return r
}

func copySample(s *schemav1.Sample) *schemav1.Sample {
if s == nil {
return nil
}
return &schemav1.Sample{
StacktraceID: s.StacktraceID,
Value: s.Value,
}
}

func isDelta(lbs phlaremodel.Labels) bool {
if lbs.Get(model.MetricNameLabel) == memoryProfileName {
ty := lbs.Get(phlaremodel.LabelNameType)
Expand All @@ -79,7 +106,7 @@ func isDelta(lbs phlaremodel.Labels) bool {
return false
}

func deltaSamples(highest, new []*schemav1.Sample) []*schemav1.Sample {
func deltaSamples(highest, new []*schemav1.Sample) ([]*schemav1.Sample, bool) {
stacktraces := make(map[uint64]*schemav1.Sample)
for _, h := range highest {
stacktraces[h.StacktraceID] = h
Expand All @@ -91,11 +118,12 @@ func deltaSamples(highest, new []*schemav1.Sample) []*schemav1.Sample {
n.Value -= s.Value
s.Value = newMax
} else {
s.Value = n.Value
// this is a reset, we can't compute the delta anymore.
return nil, true
}
continue
}
highest = append(highest, n)
highest = append(highest, copySample(n))
}
return highest
return highest, false
}
27 changes: 10 additions & 17 deletions pkg/phlaredb/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestDeltaSample(t *testing.T) {
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 1},
}
highest := deltaSamples([]*schemav1.Sample{}, new)
highest, _ := deltaSamples([]*schemav1.Sample{}, new)
require.Equal(t, 2, len(highest))
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 2, Value: 1},
Expand All @@ -106,7 +106,7 @@ func TestDeltaSample(t *testing.T) {
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 1},
}
highest = deltaSamples(highest, new)
highest, _ = deltaSamples(highest, new)
require.Equal(t, 2, len(highest))
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 2, Value: 1},
Expand All @@ -123,7 +123,7 @@ func TestDeltaSample(t *testing.T) {
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 1},
}
highest = deltaSamples(highest, new)
highest, _ = deltaSamples(highest, new)
require.Equal(t, 2, len(highest))
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 2, Value: 1},
Expand All @@ -140,29 +140,22 @@ func TestDeltaSample(t *testing.T) {
{StacktraceID: 3, Value: 6},
{StacktraceID: 5, Value: 1},
}
highest = deltaSamples(highest, new)
highest, _ = deltaSamples(highest, new)
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 6},
{StacktraceID: 5, Value: 1},
}, highest)
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 3, Value: 5},
{StacktraceID: 5, Value: 1},
}, new)
})

t.Run("same stacktraces, counter samples resetting", func(t *testing.T) {
new = []*schemav1.Sample{
{StacktraceID: 3, Value: 1},
{StacktraceID: 5, Value: 0},
}
highest = deltaSamples(highest, new)
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 1},
{StacktraceID: 5, Value: 0},
}, highest)
highest, reset := deltaSamples(highest, new)
require.Nil(t, highest)
require.True(t, reset)
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 3, Value: 1},
{StacktraceID: 5, Value: 0},
Expand All @@ -175,16 +168,16 @@ func TestDeltaSample(t *testing.T) {
{StacktraceID: 1, Value: 2},
{StacktraceID: 7, Value: 1},
}
highest = deltaSamples(highest, new)
highest, _ = deltaSamples(highest, new)
sort.Slice(highest, func(i, j int) bool {
return highest[i].StacktraceID < highest[j].StacktraceID
})
require.Equal(t, []*schemav1.Sample{
{StacktraceID: 0, Value: 10},
{StacktraceID: 1, Value: 2},
{StacktraceID: 2, Value: 1},
{StacktraceID: 3, Value: 1},
{StacktraceID: 5, Value: 0},
{StacktraceID: 3, Value: 6},
{StacktraceID: 5, Value: 1},
{StacktraceID: 7, Value: 1},
}, highest)
require.Equal(t, []*schemav1.Sample{
Expand Down
34 changes: 28 additions & 6 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
phlarecontext "github.com/grafana/phlare/pkg/phlare/context"
"github.com/grafana/phlare/pkg/phlaredb/block"
schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1"
"github.com/grafana/phlare/pkg/slices"
)

func copySlice[T any](in []T) []T {
Expand Down Expand Up @@ -348,10 +349,32 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e

var profileIngested bool
for idxType := range samplesPerType {
samples := samplesPerType[idxType]
// Sort samples per stacktraceID and aggregate duplicate stacktraceIDs into
// a single value to make sure we won't have any duplicates, as this is not recognized as part of the delta calculation.
sort.Slice(samples, func(i, j int) bool {
return samples[i].StacktraceID > samples[j].StacktraceID
})
total := len(samples)
samples = slices.RemoveInPlace(samples, func(s *schemav1.Sample, i int) bool {
if s.Value == 0 {
return true
}
if i < len(p.Sample)-1 && s.StacktraceID == samples[i+1].StacktraceID {
samples[i+1].Value += s.Value
// TODO: Currently we're not aggregating labels, and we should probably decide what to do with them in this case.
return true
}
return false
})
if total != len(samples) {
// copy samples if there are less than received to avoid retaining memory.
samples = copySlice(samples)
}
profile := &schemav1.Profile{
ID: id,
SeriesFingerprint: seriesFingerprints[idxType],
Samples: samplesPerType[idxType],
Samples: samples,
DropFrames: p.DropFrames,
KeepFrames: p.KeepFrames,
TimeNanos: p.TimeNanos,
Expand All @@ -363,6 +386,7 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e
profile = h.delta.computeDelta(profile, labels[idxType])

if profile == nil {
level.Debug(h.logger).Log("msg", "profile is empty after delta computation", "metricName", metricName)
continue
}

Expand All @@ -371,6 +395,9 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e
}

profileIngested = true
h.totalSamples.Add(uint64(len(profile.Samples)))
h.metrics.sampleValuesIngested.WithLabelValues(metricName).Add(float64(len(profile.Samples)))
h.metrics.sampleValuesReceived.WithLabelValues(metricName).Add(float64(len(p.Sample)))
}

if !profileIngested {
Expand All @@ -387,11 +414,6 @@ func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, e
}
h.metaLock.Unlock()

samplesInProfile := len(samplesPerType[0]) * len(labels)
h.totalSamples.Add(sampleSize)
h.metrics.sampleValuesIngested.WithLabelValues(metricName).Add(float64(samplesInProfile))
h.metrics.sampleValuesReceived.WithLabelValues(metricName).Add(float64(len(p.Sample) * len(labels)))

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/phlaredb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestHeadIngestStacktraces(t *testing.T) {
}
}
// expect 4 samples, 3 of which distinct
require.Equal(t, []uint64{0, 1, 2, 2}, samples)
require.Equal(t, []uint64{1, 0, 2, 2}, samples)
}

func TestHeadLabelValues(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (s *profileStore) cutRowGroup() (err error) {

level.Debug(s.logger).Log("msg", "cut row group segment", "path", path, "numProfiles", n)

for i := range s.slice {
// don't retain profiles and samples in memory as re-slice.
s.slice[i] = nil
}
// reset slice and metrics
s.slice = s.slice[:0]
s.size.Store(0)
Expand Down
1 change: 1 addition & 0 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func SplitFiltersAndMatchers(allMatchers []*labels.Matcher) (filters, matchers [
return
}

// nolint unused
const (
profileSize = uint64(unsafe.Sizeof(schemav1.Profile{}))
sampleSize = uint64(unsafe.Sizeof(schemav1.Sample{}))
Expand Down