Skip to content

Commit

Permalink
fix: load exemplar metadata from segment (#1185)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Jul 5, 2022
1 parent 7d2dffa commit e869730
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 55 deletions.
30 changes: 15 additions & 15 deletions pkg/storage/exemplars.go
Expand Up @@ -50,13 +50,13 @@ var (
)

type exemplarsBatch struct {
entries map[string]*exemplarsBatchEntry
entries map[string]*exemplarEntry
config *Config
metrics *metrics
dicts BadgerDBWithCache
}

type exemplarsBatchEntry struct {
type exemplarEntry struct {
// DB exemplar key and its parts.
Key []byte
AppName string
Expand All @@ -74,7 +74,7 @@ func (e *exemplars) newExemplarsBatch() *exemplarsBatch {
metrics: e.metrics,
config: e.config,
dicts: e.dicts,
entries: make(map[string]*exemplarsBatchEntry, exemplarsPerBatch),
entries: make(map[string]*exemplarEntry, exemplarsPerBatch),
}
}

Expand Down Expand Up @@ -261,7 +261,7 @@ func (e *exemplars) insert(ctx context.Context, input *PutInput) error {
return err
}

func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []string, fn func(*tree.Tree) error) error {
func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []string, fn func(exemplarEntry) error) error {
d, ok := e.dicts.Lookup(appName)
if !ok {
return nil
Expand All @@ -280,11 +280,11 @@ func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []stri
case err == nil:
err = item.Value(func(val []byte) error {
e.metrics.exemplarsReadBytes.Observe(float64(len(val)))
var x exemplarsBatchEntry
var x exemplarEntry
if err = x.Deserialize(dx, val); err != nil {
return err
}
return fn(x.Tree)
return fn(x)
})
if err != nil {
return err
Expand Down Expand Up @@ -378,7 +378,7 @@ func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error {
e.updateTime(input.StartTime.UnixNano(), input.EndTime.UnixNano())
return nil
}
b.entries[key] = &exemplarsBatchEntry{
b.entries[key] = &exemplarEntry{
Key: k,
AppName: appName,
ProfileID: profileID,
Expand All @@ -391,7 +391,7 @@ func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error {
return nil
}

func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEntry) error {
func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarEntry) error {
k, ok := exemplarKeyToTimestampKey(e.Key, e.EndTime)
if !ok {
return fmt.Errorf("invalid exemplar key")
Expand Down Expand Up @@ -419,7 +419,7 @@ func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEnt
// it's not happening: only the first EndTime is honored.
err = item.Value(func(val []byte) error {
b.metrics.exemplarsReadBytes.Observe(float64(len(val)))
var x exemplarsBatchEntry
var x exemplarEntry
if err = x.Deserialize(dx, val); err == nil {
e = x.Merge(e)
}
Expand All @@ -441,14 +441,14 @@ func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEnt
return nil
}

func (e *exemplarsBatchEntry) Merge(src *exemplarsBatchEntry) *exemplarsBatchEntry {
func (e *exemplarEntry) Merge(src *exemplarEntry) *exemplarEntry {
e.updateTime(src.StartTime, src.EndTime)
e.Tree.Merge(src.Tree)
e.Key = src.Key
return e
}

func (e *exemplarsBatchEntry) updateTime(st, et int64) {
func (e *exemplarEntry) updateTime(st, et int64) {
if st < e.StartTime {
e.StartTime = st
}
Expand All @@ -457,7 +457,7 @@ func (e *exemplarsBatchEntry) updateTime(st, et int64) {
}
}

func (e *exemplarsBatchEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, error) {
func (e *exemplarEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, error) {
b := bytes.NewBuffer(make([]byte, 0, 1<<10)) // 1 KB.
b.WriteByte(exemplarsCurrentFormat) // Version.
if err := e.Tree.SerializeTruncate(d, maxNodes, b); err != nil {
Expand Down Expand Up @@ -487,7 +487,7 @@ func (e *exemplarsBatchEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, err
return b.Bytes(), nil
}

func (e *exemplarsBatchEntry) Deserialize(d *dict.Dict, b []byte) error {
func (e *exemplarEntry) Deserialize(d *dict.Dict, b []byte) error {
buf := bytes.NewBuffer(b)
v, err := buf.ReadByte()
if err != nil {
Expand All @@ -503,7 +503,7 @@ func (e *exemplarsBatchEntry) Deserialize(d *dict.Dict, b []byte) error {
}
}

func (e *exemplarsBatchEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) error {
func (e *exemplarEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) error {
t, err := tree.Deserialize(d, src)
if err != nil {
return err
Expand All @@ -512,7 +512,7 @@ func (e *exemplarsBatchEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) err
return nil
}

func (e *exemplarsBatchEntry) deserializeV2(d *dict.Dict, src *bytes.Buffer) error {
func (e *exemplarEntry) deserializeV2(d *dict.Dict, src *bytes.Buffer) error {
t, err := tree.Deserialize(d, src)
if err != nil {
return err
Expand Down
82 changes: 76 additions & 6 deletions pkg/storage/exemplars_test.go
Expand Up @@ -21,12 +21,82 @@ import (
"github.com/pyroscope-io/pyroscope/pkg/flameql"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage/dict"
"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)

var _ = Describe("MergeProfiles", func() {
var _ = Describe("Exemplar query", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
Context("when profiles with ID ingested", func() {
It("queries profiling data correctly", func() {
defer s.Close()

tree := tree.New()
tree.Insert([]byte("a;b"), uint64(1))
tree.Insert([]byte("a;c"), uint64(2))
st := testing.SimpleTime(10)
et := testing.SimpleTime(19)

Expect(s.Put(context.TODO(), &PutInput{
StartTime: st,
EndTime: et,
Val: tree.Clone(big.NewRat(1, 1)),
Key: segment.NewKey(map[string]string{
"__name__": "app.cpu",
"span_name": "SomeSpanName",
segment.ProfileIDLabelName: "my-profile-id",
}),
})).ToNot(HaveOccurred())

Expect(s.Put(context.TODO(), &PutInput{
StartTime: st,
EndTime: et,
Key: segment.NewKey(map[string]string{
"__name__": "app.cpu",
"span_name": "SomeSpanName",
}),
Val: tree.Clone(big.NewRat(1, 1)),
AggregationType: metadata.AverageAggregationType,
Units: metadata.BytesUnits,
SpyName: "debugspy",
SampleRate: 42,
})).ToNot(HaveOccurred())

s.exemplars.Sync()
o, err := s.Get(context.Background(), &GetInput{
Query: &flameql.Query{
AppName: "app.cpu",
Matchers: []*flameql.TagMatcher{
{
Key: segment.ProfileIDLabelName,
Value: "my-profile-id",
Op: flameql.OpEqual,
},
},
},
})

Expect(err).ToNot(HaveOccurred())
Expect(o.Tree).ToNot(BeNil())
Expect(o.Tree.Samples()).To(Equal(uint64(3)))

Expect(o.AggregationType).To(Equal(metadata.AverageAggregationType))
Expect(o.Units).To(Equal(metadata.BytesUnits))
Expect(o.SpyName).To(Equal("debugspy"))
Expect(o.SampleRate).To(Equal(uint32(42)))
})
})
})
})

var _ = Describe("Exemplar merge", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
Expand Down Expand Up @@ -95,7 +165,7 @@ var _ = Describe("MergeProfiles", func() {
})
})

var _ = Describe("Profiles retention policy", func() {
var _ = Describe("Exemplars retention policy", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
Expand Down Expand Up @@ -159,7 +229,7 @@ func randomBytesHex(n int) string {
return hex.EncodeToString(b)
}

var _ = Describe("ConcurrentExemplarsInsert", func() {
var _ = Describe("Concurrent exemplars insertion", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
Expand Down Expand Up @@ -228,7 +298,7 @@ var _ = Describe("Exemplar serialization", func() {
t.Insert([]byte("a;b"), uint64(1))
t.Insert([]byte("a;c"), uint64(2))

e := exemplarsBatchEntry{
e := exemplarEntry{
Key: exemplarKey(appName, profileID),
AppName: appName,
ProfileID: profileID,
Expand All @@ -248,7 +318,7 @@ var _ = Describe("Exemplar serialization", func() {
b, err := e.Serialize(d, 1<<10)
Expect(err).ToNot(HaveOccurred())

var n exemplarsBatchEntry
var n exemplarEntry
Expect(n.Deserialize(d, b)).ToNot(HaveOccurred())

Expect(e.StartTime).To(Equal(n.StartTime))
Expand All @@ -266,7 +336,7 @@ var _ = Describe("Exemplar serialization", func() {
b, err := os.ReadFile("./testdata/exemplar.v1.bin")
Expect(err).ToNot(HaveOccurred())

var n exemplarsBatchEntry
var n exemplarEntry
Expect(n.Deserialize(dict.New(), b)).ToNot(HaveOccurred())

Expect(n.Tree.Samples()).To(Equal(uint64(81255)))
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/metadata/metadata.go
Expand Up @@ -8,11 +8,11 @@ func (u Units) String() string {

const (
SamplesUnits Units = "samples"
ObjectsUnits = "objects"
GoroutinesUnits = "goroutines"
BytesUnits = "bytes"
LockNanosecondsUnits = "lock_nanoseconds"
LockSamplesUnits = "lock_samples"
ObjectsUnits Units = "objects"
GoroutinesUnits Units = "goroutines"
BytesUnits Units = "bytes"
LockNanosecondsUnits Units = "lock_nanoseconds"
LockSamplesUnits Units = "lock_samples"
)

type AggregationType string
Expand Down
85 changes: 58 additions & 27 deletions pkg/storage/storage_get.go
Expand Up @@ -68,36 +68,15 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
logger.Debug("storage.Get")
trace.Logf(ctx, traceCatGetKey, "%+v", gi)

// For backward compatibility, profiles can be fetched by ID using query.
// Profiles can be fetched by ID using query.
// If a query includes 'profile_id' matcher others are ignored.
if gi.Query != nil {
ids := make([]string, 0, len(gi.Query.Matchers))
for _, m := range gi.Query.Matchers {
if m.Key != segment.ProfileIDLabelName {
continue
}
if m.Op != flameql.OpEqual {
return nil, fmt.Errorf("only '=' operator is allowed for %q label", segment.ProfileIDLabelName)
}
ids = append(ids, m.Value)
out, ok, err := s.tryGetExemplar(ctx, gi)
if err != nil {
return nil, err
}
if len(ids) > 0 {
// TODO(kolesnikovae): load from segment.
o := GetOutput{
SpyName: "gospy",
Units: "samples",
SampleRate: 100,
Timeline: segment.GenerateTimeline(gi.StartTime, gi.EndTime),
Tree: tree.New(),
}
err := s.exemplars.fetch(ctx, gi.Query.AppName, ids, func(t *tree.Tree) error {
o.Tree.Merge(t)
return nil
})
if err != nil {
return nil, err
}
return &o, nil
if ok {
return out, nil
}
}

Expand Down Expand Up @@ -168,6 +147,58 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
}, nil
}

func (s *Storage) tryGetExemplar(ctx context.Context, gi *GetInput) (*GetOutput, bool, error) {
ids := make([]string, 0, len(gi.Query.Matchers))
for _, m := range gi.Query.Matchers {
if m.Key != segment.ProfileIDLabelName {
continue
}
if m.Op != flameql.OpEqual {
return nil, true, fmt.Errorf("only '=' operator is allowed for %q label", segment.ProfileIDLabelName)
}
ids = append(ids, m.Value)
}
if len(ids) == 0 {
return nil, false, nil
}

var (
t = tree.New()
l = make(map[string]string)
)
err := s.exemplars.fetch(ctx, gi.Query.AppName, ids, func(e exemplarEntry) error {
t.Merge(e.Tree)
l = e.Labels
return nil
})
if err != nil {
return nil, true, err
}

o := GetOutput{
Timeline: segment.GenerateTimeline(gi.StartTime, gi.EndTime),
Tree: t,
// Defaults: actual values should be loaded from the segment metadata.
SpyName: "gospy",
Units: "samples",
AggregationType: "sum",
SampleRate: 100,
}

// Exemplar labels map does not contain the app name.
l["__name__"] = gi.Query.AppName
r, ok := s.segments.Lookup(segment.NewKey(l).Normalized())
if ok {
seg := r.(*segment.Segment)
o.SpyName = seg.SpyName()
o.Units = seg.Units()
o.SampleRate = seg.SampleRate()
o.AggregationType = seg.AggregationType()
}

return &o, true, nil
}

func (s *Storage) execQuery(_ context.Context, qry *flameql.Query) []dimension.Key {
app, found := s.lookupAppDimension(qry.AppName)
if !found {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storage_merge.go
Expand Up @@ -17,8 +17,8 @@ type MergeProfilesOutput struct {

func (s *Storage) MergeProfiles(ctx context.Context, mi MergeProfilesInput) (o MergeProfilesOutput, err error) {
o.Tree = tree.New()
return o, s.exemplars.fetch(ctx, mi.AppName, mi.Profiles, func(t *tree.Tree) error {
o.Tree.Merge(t)
return o, s.exemplars.fetch(ctx, mi.AppName, mi.Profiles, func(e exemplarEntry) error {
o.Tree.Merge(e.Tree)
return nil
})
}

0 comments on commit e869730

Please sign in to comment.