Skip to content

Commit

Permalink
feat: optimized pprof symbolication (#2679)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 23, 2023
1 parent e7f202e commit 104465c
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 434 deletions.
16 changes: 0 additions & 16 deletions pkg/model/profile.go
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/cespare/xxhash/v2"
"github.com/gogo/status"
"github.com/google/pprof/profile"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -54,21 +53,6 @@ func SelectorFromProfileType(profileType *typesv1.ProfileType) *labels.Matcher {
}
}

// SetProfileMetadata sets the metadata on the profile.
func SetProfileMetadata(p *profile.Profile, ty *typesv1.ProfileType) {
p.SampleType = []*profile.ValueType{{Type: ty.SampleType, Unit: ty.SampleUnit}}
p.DefaultSampleType = ty.SampleType
p.PeriodType = &profile.ValueType{Type: ty.PeriodType, Unit: ty.PeriodUnit}
switch ty.Name {
case "process_cpu": // todo: this should support other types of cpu profiles
p.Period = 1000000000
case "memory":
p.Period = 512 * 1024
default:
p.Period = 1
}
}

type SpanSelector map[uint64]struct{}

func NewSpanSelector(spans []string) (SpanSelector, error) {
Expand Down
70 changes: 52 additions & 18 deletions pkg/model/stacktraces.go
Expand Up @@ -143,33 +143,33 @@ func NewStacktraceTree(size int) *StacktraceTree {

const sentinel = -1

func (t *StacktraceTree) Insert(locations []int32, value int64) {
func (t *StacktraceTree) Insert(locations []int32, value int64) int32 {
var (
n = &t.Nodes[0]
i = n.FirstChild
x int32
n = &t.Nodes[0]
next = n.FirstChild
cur int32
)

for j := len(locations) - 1; j >= 0; {
r := locations[j]
if i == sentinel {
if next == sentinel {
ni := int32(len(t.Nodes))
n.FirstChild = ni
t.Nodes = append(t.Nodes, StacktraceNode{
Parent: x,
Parent: cur,
FirstChild: sentinel,
NextSibling: sentinel,
Location: r,
})
x = ni
cur = ni
n = &t.Nodes[ni]
} else {
x = i
n = &t.Nodes[i]
cur = next
n = &t.Nodes[next]
}
if n.Location == r {
n.Total += value
i = n.FirstChild
next = n.FirstChild
j--
continue
}
Expand All @@ -182,10 +182,46 @@ func (t *StacktraceTree) Insert(locations []int32, value int64) {
Location: r,
})
}
i = n.NextSibling
next = n.NextSibling
}

t.Nodes[x].Value += value
t.Nodes[cur].Value += value
return cur
}

func (t *StacktraceTree) Truncate(min int64) int {
if min < 1 {
return 0
}
var c int
for i := range t.Nodes {
if t.Nodes[i].Total < min {
// Make the node leaf.
n := &t.Nodes[i]
n.Location = sentinel
n.FirstChild = sentinel
n.Value = n.Total
c++
}
}
return c
}

func (t *StacktraceTree) Resolve(dst []int32, id int32) []int32 {
dst = dst[:0]
if id >= int32(len(t.Nodes)) {
return dst
}
for i := id; i > 0; i = t.Nodes[i].Parent {
n := t.Nodes[i]
// If the stack trace is truncated,
// we only keep a single stub frame.
if n.Location == sentinel && len(dst) > 0 {
continue
}
dst = append(dst, n.Location)
}
return dst
}

// MinValue returns the minimum "total" value a node in a tree has to have.
Expand Down Expand Up @@ -226,13 +262,13 @@ func (t *StacktraceTree) Traverse(maxNodes int64, fn StacktraceTreeTraverseFn) e
current, nodes, children = nodes[len(nodes)-1], nodes[:len(nodes)-1], children[:0]
var truncated int64
n := &t.Nodes[current]
if n.Location == stacktraceTreeNodeTruncated {
if n.Location == sentinel {
goto call
}

for x := n.FirstChild; x > 0; {
child := &t.Nodes[x]
if child.Total >= min && child.Location != stacktraceTreeNodeTruncated {
if child.Total >= min && child.Location != sentinel {
children = append(children, x)
} else {
truncated += child.Total
Expand All @@ -244,7 +280,7 @@ func (t *StacktraceTree) Traverse(maxNodes int64, fn StacktraceTreeTraverseFn) e
// Create a stub for removed nodes.
i := len(t.Nodes)
t.Nodes = append(t.Nodes, StacktraceNode{
Location: stacktraceTreeNodeTruncated,
Location: sentinel,
Value: truncated,
})
children = append(children, int32(i))
Expand All @@ -263,8 +299,6 @@ func (t *StacktraceTree) Traverse(maxNodes int64, fn StacktraceTreeTraverseFn) e
return nil
}

const stacktraceTreeNodeTruncated = -1

var lostDuringSerializationNameBytes = []byte(truncatedNodeName)

func (t *StacktraceTree) Bytes(dst io.Writer, maxNodes int64, funcs []string) {
Expand All @@ -280,7 +314,7 @@ func (t *StacktraceTree) Bytes(dst io.Writer, maxNodes int64, funcs []string) {
// It is guaranteed that funcs slice and its contents are immutable,
// and the byte slice backing capacity is managed by GC.
name = unsafeStringBytes(funcs[n.Location])
case stacktraceTreeNodeTruncated:
case sentinel:
name = lostDuringSerializationNameBytes
}

Expand Down
41 changes: 16 additions & 25 deletions pkg/phlaredb/block_querier.go
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/google/pprof/profile"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
Expand All @@ -31,7 +30,9 @@ import (
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/iter"
Expand All @@ -44,6 +45,7 @@ import (
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/grafana/pyroscope/pkg/phlaredb/tsdb/index"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/util"
)

Expand Down Expand Up @@ -547,7 +549,7 @@ type Querier interface {
MergeBySpans(ctx context.Context, rows iter.Iterator[Profile], spans phlaremodel.SpanSelector) (*phlaremodel.Tree, error)
MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error)
SelectMergeByLabels(ctx context.Context, params *ingestv1.SelectProfilesRequest, by ...string) ([]*typesv1.Series, error)
MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error)
MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profilev1.Profile, error)
Series(ctx context.Context, params *ingestv1.SeriesRequest) ([]*typesv1.Labels, error)
ProfileTypes(context.Context, *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
Expand Down Expand Up @@ -1158,7 +1160,7 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
}

var lock sync.Mutex
result := make([]*profile.Profile, 0, len(queriers))
var result pprof.ProfileMerge
g, ctx := errgroup.WithContext(ctx)

// depending on if new need deduplication or not there are two different code paths.
Expand Down Expand Up @@ -1190,15 +1192,15 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
return nil
}

merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(profiles)))
p, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(profiles)))
if err != nil {
return err
}

lock.Lock()
result = append(result, merge)
err = result.Merge(p)
lock.Unlock()
return nil
return err
}))
}
} else {
Expand Down Expand Up @@ -1226,14 +1228,14 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
// Sort profiles for better read locality.
// Merge async the result so we can continue streaming profiles.
g.Go(util.RecoverPanic(func() error {
merge, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
p, err := querier.MergePprof(ctx, iter.NewSliceIterator(querier.Sort(selectedProfiles[i])))
if err != nil {
return err
}
lock.Lock()
result = append(result, merge)
err = result.Merge(p)
lock.Unlock()
return nil
return err
}))
}

Expand All @@ -1249,27 +1251,16 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
return err
}

if len(result) == 0 {
result = append(result, &profile.Profile{})
}
for _, p := range result {
phlaremodel.SetProfileMetadata(p, request.Type)
p.TimeNanos = model.Time(r.Request.End).UnixNano()
}
p, err := profile.Merge(result)
if err != nil {
return err
}
mergedProfile := result.Profile()
pprof.SetProfileMetadata(mergedProfile, request.Type, model.Time(r.Request.End).UnixNano(), 0)

// connect go already handles compression.
var buf bytes.Buffer
if err := p.WriteUncompressed(&buf); err != nil {
pprofBytes, err := proto.Marshal(mergedProfile)
if err != nil {
return err
}
// sends the final result to the client.
err = stream.Send(&ingestv1.MergeProfilesPprofResponse{
Result: buf.Bytes(),
})
err = stream.Send(&ingestv1.MergeProfilesPprofResponse{Result: pprofBytes})
if err != nil {
if errors.Is(err, io.EOF) {
return connect.NewError(connect.CodeCanceled, errors.New("client closed stream"))
Expand Down
10 changes: 5 additions & 5 deletions pkg/phlaredb/head_queriers.go
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/bufbuild/connect-go"
"github.com/go-kit/log/level"
"github.com/google/pprof/profile"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/iter"
Expand Down Expand Up @@ -191,15 +191,15 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It
return r.Tree()
}

func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profilev1.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergePprof")
defer sp.Finish()
r := symdb.NewResolver(ctx, q.head.symdb)
defer r.Release()
if err := mergeByStacktraces(ctx, q.rowGroup(), rows, r); err != nil {
return nil, err
}
return r.Profile()
return r.Pprof(0)
}

func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
Expand Down Expand Up @@ -420,7 +420,7 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.
return r.Tree()
}

func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profilev1.Profile, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory")
defer sp.Finish()
r := symdb.NewResolver(ctx, q.head.symdb)
Expand All @@ -435,7 +435,7 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator
if err := rows.Err(); err != nil {
return nil, err
}
return r.Profile()
return r.Pprof(0)
}

func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/phlaredb/sample_merge.go
Expand Up @@ -5,13 +5,13 @@ import (
"sort"
"strings"

"github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/common/model"
"github.com/samber/lo"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/iter"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
Expand All @@ -33,7 +33,7 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I
return r.Tree()
}

func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profilev1.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergePprof - Block")
defer sp.Finish()
sp.SetTag("block ULID", b.meta.ULID.String())
Expand All @@ -43,7 +43,7 @@ func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[
if err := mergeByStacktraces(ctx, b.profiles.file, rows, r); err != nil {
return nil, err
}
return r.Profile()
return r.Pprof(0)
}

func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/phlaredb/sample_merge_test.go
Expand Up @@ -517,7 +517,11 @@ func TestMergePprof(t *testing.T) {
for _, sample := range expected.Sample {
sample.Value = []int64{sample.Value[0] * 3}
}
compareProfile(t, expected.Compact(), result)
data, err = proto.Marshal(result)
require.NoError(t, err)
actual, err := profile.ParseUncompressed(data)
require.NoError(t, err)
compareProfile(t, expected.Compact(), actual.Compact())
}

func TestHeadMergePprof(t *testing.T) {
Expand Down Expand Up @@ -562,7 +566,11 @@ func TestHeadMergePprof(t *testing.T) {
for _, sample := range expected.Sample {
sample.Value = []int64{sample.Value[0] * 3}
}
compareProfile(t, expected.Compact(), result)
data, err = proto.Marshal(result)
require.NoError(t, err)
actual, err := profile.ParseUncompressed(data)
require.NoError(t, err)
compareProfile(t, expected.Compact(), actual.Compact())
}

func TestMergeSpans(t *testing.T) {
Expand Down

0 comments on commit 104465c

Please sign in to comment.