Skip to content

Commit

Permalink
feat(distributor): Relabel profiles at ingest (#3369)
Browse files Browse the repository at this point in the history
* Relabel profiles at ingest

This is an important operational tool: It allows us to deal with
problems in the ingested profile labels.

The relabeling rules cover both series label and also sample labes.

By default we supply relabeling rules, which do remove the godeltaprof_
prefix from profiles generated by pyroscope-go.

* Merge samples with identical stacktraces

I have also added the most common test cases I could think of.
  • Loading branch information
simonswine committed Jun 26, 2024
1 parent 359ecf1 commit b6816e3
Show file tree
Hide file tree
Showing 13 changed files with 1,614 additions and 77 deletions.
227 changes: 194 additions & 33 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"sync"
"time"
"unsafe"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
Expand All @@ -31,14 +32,15 @@ import (
"github.com/prometheus/common/model"
"go.uber.org/atomic"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
connectapi "github.com/grafana/pyroscope/pkg/api/connect"
"github.com/grafana/pyroscope/pkg/clientpool"
"github.com/grafana/pyroscope/pkg/distributor/aggregator"
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/model/relabel"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/slices"
"github.com/grafana/pyroscope/pkg/tenant"
Expand Down Expand Up @@ -123,6 +125,7 @@ type Limits interface {
MaxProfileSymbolValueLength(tenantID string) int
MaxSessionsPerSeries(tenantID string) int
EnforceLabelsOrder(tenantID string) bool
IngestionRelabelingRules(tenantID string) []*relabel.Config
validation.ProfileValidationLimits
aggregator.Limits
}
Expand Down Expand Up @@ -399,8 +402,11 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu
series.Labels = d.limitMaxSessionsPerSeries(maxSessionsPerSeries, series.Labels)
}

// Next we split profiles by labels.
profileSeries := extractSampleSeries(req)
// Next we split profiles by labels and apply relabel rules.
profileSeries, bytesRelabelDropped, profilesRelabelDropped := extractSampleSeries(req, d.limits.IngestionRelabelingRules(tenantID))
validation.DiscardedBytes.WithLabelValues(string(validation.RelabelRules), tenantID).Add(bytesRelabelDropped)
validation.DiscardedProfiles.WithLabelValues(string(validation.RelabelRules), tenantID).Add(profilesRelabelDropped)

// Filter our series and profiles without samples.
for _, series := range profileSeries {
series.Samples = slices.RemoveInPlace(series.Samples, func(sample *distributormodel.ProfileSample, _ int) bool {
Expand Down Expand Up @@ -491,8 +497,20 @@ func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.Pu
}
}

// sampleSize returns the size of a samples in bytes.
func sampleSize(stringTable []string, samplesSlice []*profilev1.Sample) int64 {
var size int64
for _, s := range samplesSlice {
size += int64(s.SizeVT())
for _, l := range s.Label {
size += int64(len(stringTable[l.Key]) + len(stringTable[l.Str]) + len(stringTable[l.NumUnit]))
}
}
return size
}

// profileSizeBytes returns the size of symbols and samples in bytes.
func profileSizeBytes(p *googlev1.Profile) (symbols, samples int64) {
func profileSizeBytes(p *profilev1.Profile) (symbols, samples int64) {
fullSize := p.SizeVT()
// remove samples
samplesSlice := p.Sample
Expand All @@ -516,7 +534,7 @@ func profileSizeBytes(p *googlev1.Profile) (symbols, samples int64) {
return
}

func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels, profile *googlev1.Profile) (func() (*pprof.ProfileMerge, error), bool, error) {
func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels, profile *profilev1.Profile) (func() (*pprof.ProfileMerge, error), bool, error) {
a, ok := d.aggregator.AggregatorForTenant(tenantID)
if !ok {
return nil, false, nil
Expand All @@ -534,7 +552,7 @@ func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels,
return r.Handler(), true, nil
}

func mergeProfile(profile *googlev1.Profile) aggregator.AggregateFn[*pprof.ProfileMerge] {
func mergeProfile(profile *profilev1.Profile) aggregator.AggregateFn[*pprof.ProfileMerge] {
return func(m *pprof.ProfileMerge) (*pprof.ProfileMerge, error) {
if m == nil {
m = new(pprof.ProfileMerge)
Expand Down Expand Up @@ -633,7 +651,106 @@ func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

func extractSampleSeries(req *distributormodel.PushRequest) []*distributormodel.ProfileSeries {
type sampleKey struct {
stacktrace string
// note this is an index into the string table, rather than span ID
spanIDIdx int64
}

func sampleKeyFromSample(stringTable []string, s *profilev1.Sample) sampleKey {
var k sampleKey

// populate spanID if present
for _, l := range s.Label {
if stringTable[int(l.Key)] == pprof.SpanIDLabelName {
k.spanIDIdx = l.Str
}
}
if len(s.LocationId) > 0 {
k.stacktrace = unsafe.String(
(*byte)(unsafe.Pointer(&s.LocationId[0])),
len(s.LocationId)*8,
)
}
return k
}

type lazyGroup struct {
sampleGroup pprof.SampleGroup
// The map is only initialized when the group is being modified. Key is the
// string representation (unsafe) of the sample stack trace and its potential
// span ID.
sampleMap map[sampleKey]*profilev1.Sample
labels phlaremodel.Labels
}

func (g *lazyGroup) addSampleGroup(stringTable []string, sg pprof.SampleGroup) {
if len(g.sampleGroup.Samples) == 0 {
g.sampleGroup = sg
return
}

// If the group is already initialized, we need to merge the samples.
if g.sampleMap == nil {
g.sampleMap = make(map[sampleKey]*profilev1.Sample)
for _, s := range g.sampleGroup.Samples {
g.sampleMap[sampleKeyFromSample(stringTable, s)] = s
}
}

for _, s := range sg.Samples {
k := sampleKeyFromSample(stringTable, s)
if _, ok := g.sampleMap[k]; !ok {
g.sampleGroup.Samples = append(g.sampleGroup.Samples, s)
g.sampleMap[k] = s
} else {
// merge the samples
for idx := range s.Value {
g.sampleMap[k].Value[idx] += s.Value[idx]
}
}
}
}

type groupsWithFingerprints struct {
m map[uint64][]lazyGroup
order []uint64
}

func newGroupsWithFingerprints() *groupsWithFingerprints {
return &groupsWithFingerprints{
m: make(map[uint64][]lazyGroup),
}
}

func (g *groupsWithFingerprints) add(stringTable []string, lbls phlaremodel.Labels, group pprof.SampleGroup) {
fp := lbls.Hash()
idxs, ok := g.m[fp]
if ok {
// fingerprint matches, check if the labels are the same
for _, idx := range idxs {
if phlaremodel.CompareLabelPairs(idx.labels, lbls) == 0 {
// append samples to the group
idx.addSampleGroup(stringTable, group)
return
}
}
} else {
g.order = append(g.order, fp)
}

// add the labels to the list
g.m[fp] = append(g.m[fp], lazyGroup{
sampleGroup: group,
labels: lbls,
})
}

func extractSampleSeries(req *distributormodel.PushRequest, relabelRules []*relabel.Config) (result []*distributormodel.ProfileSeries, bytesRelabelDropped, profilesRelabelDropped float64) {
var (
lblbuilder = phlaremodel.NewLabelsBuilder(phlaremodel.EmptyLabels())
)

profileSeries := make([]*distributormodel.ProfileSeries, 0, len(req.Series))
for _, series := range req.Series {
s := &distributormodel.ProfileSeries{
Expand All @@ -643,33 +760,74 @@ func extractSampleSeries(req *distributormodel.PushRequest) []*distributormodel.
for _, raw := range series.Samples {
pprof.RenameLabel(raw.Profile.Profile, pprof.ProfileIDLabelName, pprof.SpanIDLabelName)
groups := pprof.GroupSamplesWithoutLabels(raw.Profile.Profile, pprof.SpanIDLabelName)

if len(groups) == 0 || (len(groups) == 1 && len(groups[0].Labels) == 0) {
// No sample labels in the profile.

// relabel the labels of the series
lblbuilder.Reset(series.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
bytesRelabelDropped += float64(raw.Profile.SizeVT())
profilesRelabelDropped++ // in this case we dropped a whole profile
continue
}
}

// Copy over the labels from the builder
s.Labels = lblbuilder.Labels()

// We do not modify the request.
s.Samples = append(s.Samples, raw)

continue
}
e := pprof.NewSampleExporter(raw.Profile.Profile)

// iterate through groups relabel them and find relevant overlapping labelsets
groupsKept := newGroupsWithFingerprints()
for _, group := range groups {
// exportSamples creates a new profile with the samples provided.
// The samples are obtained via GroupSamples call, which means
// the underlying capacity is referenced by the source profile.
// Therefore, the slice has to be copied and samples zeroed to
// avoid ownership issues.
profile := exportSamples(e, group.Samples)
// Note that group.Labels reference strings from the source profile.
labels := mergeSeriesAndSampleLabels(raw.Profile.Profile, series.Labels, group.Labels)
profileSeries = append(profileSeries, &distributormodel.ProfileSeries{
Labels: labels,
Samples: []*distributormodel.ProfileSample{{Profile: profile}},
})
lblbuilder.Reset(series.Labels)
addSampleLabelsToLabelsBuilder(lblbuilder, raw.Profile.Profile, group.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
bytesRelabelDropped += float64(sampleSize(raw.Profile.Profile.StringTable, group.Samples))
continue
}
}

// add the group to the list
groupsKept.add(raw.Profile.StringTable, lblbuilder.Labels(), group)
}

if len(groupsKept.m) == 0 {
// no groups kept, count the whole profile as dropped
profilesRelabelDropped++
continue
}

e := pprof.NewSampleExporter(raw.Profile.Profile)
for _, idx := range groupsKept.order {
for _, group := range groupsKept.m[idx] {
// exportSamples creates a new profile with the samples provided.
// The samples are obtained via GroupSamples call, which means
// the underlying capacity is referenced by the source profile.
// Therefore, the slice has to be copied and samples zeroed to
// avoid ownership issues.
profile := exportSamples(e, group.sampleGroup.Samples)
profileSeries = append(profileSeries, &distributormodel.ProfileSeries{
Labels: group.labels,
Samples: []*distributormodel.ProfileSample{{Profile: profile}},
})
}
}
}
if len(s.Samples) > 0 {
profileSeries = append(profileSeries, s)
}
}
return profileSeries
return profileSeries, bytesRelabelDropped, profilesRelabelDropped
}

func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels phlaremodel.Labels) phlaremodel.Labels {
Expand Down Expand Up @@ -712,22 +870,25 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushReque
return nil
}

// mergeSeriesAndSampleLabels merges sample labels with
// series labels. Series labels take precedence.
func mergeSeriesAndSampleLabels(p *googlev1.Profile, sl []*typesv1.LabelPair, pl []*googlev1.Label) []*typesv1.LabelPair {
m := phlaremodel.Labels(sl).Clone()
// addSampleLabelsToLabelsBuilder: adds sample label that don't exists yet on the profile builder. So the existing labels take precedence.
func addSampleLabelsToLabelsBuilder(b *phlaremodel.LabelsBuilder, p *profilev1.Profile, pl []*profilev1.Label) {
var name string
for _, l := range pl {
m = append(m, &typesv1.LabelPair{
Name: p.StringTable[l.Key],
Value: p.StringTable[l.Str],
})
name = p.StringTable[l.Key]
if l.Str <= 0 {
// skip if label value is not a string
continue
}
if b.Get(name) != "" {
// do nothing if label name already exists
continue
}
b.Set(name, p.StringTable[l.Str])
}
sort.Stable(m)
return m.Unique()
}

func exportSamples(e *pprof.SampleExporter, samples []*googlev1.Sample) *pprof.Profile {
samplesCopy := make([]*googlev1.Sample, len(samples))
func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile {
samplesCopy := make([]*profilev1.Sample, len(samples))
copy(samplesCopy, samples)
slices.Clear(samples)
n := pprof.NewProfile()
Expand Down
Loading

0 comments on commit b6816e3

Please sign in to comment.