diff --git a/pkg/storage/exemplars.go b/pkg/storage/exemplars.go index f3b06efa0c..5bf7a24bf0 100644 --- a/pkg/storage/exemplars.go +++ b/pkg/storage/exemplars.go @@ -50,13 +50,13 @@ var ( ) type exemplarsBatch struct { - entries map[string]*exemplarsBatchEntry + entries map[string]*exemplarEntry config *Config metrics *metrics dicts BadgerDBWithCache } -type exemplarsBatchEntry struct { +type exemplarEntry struct { // DB exemplar key and its parts. Key []byte AppName string @@ -74,7 +74,7 @@ func (e *exemplars) newExemplarsBatch() *exemplarsBatch { metrics: e.metrics, config: e.config, dicts: e.dicts, - entries: make(map[string]*exemplarsBatchEntry, exemplarsPerBatch), + entries: make(map[string]*exemplarEntry, exemplarsPerBatch), } } @@ -261,7 +261,7 @@ func (e *exemplars) insert(ctx context.Context, input *PutInput) error { return err } -func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []string, fn func(*tree.Tree) error) error { +func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []string, fn func(exemplarEntry) error) error { d, ok := e.dicts.Lookup(appName) if !ok { return nil @@ -280,11 +280,11 @@ func (e *exemplars) fetch(ctx context.Context, appName string, profileIDs []stri case err == nil: err = item.Value(func(val []byte) error { e.metrics.exemplarsReadBytes.Observe(float64(len(val))) - var x exemplarsBatchEntry + var x exemplarEntry if err = x.Deserialize(dx, val); err != nil { return err } - return fn(x.Tree) + return fn(x) }) if err != nil { return err @@ -378,7 +378,7 @@ func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error { e.updateTime(input.StartTime.UnixNano(), input.EndTime.UnixNano()) return nil } - b.entries[key] = &exemplarsBatchEntry{ + b.entries[key] = &exemplarEntry{ Key: k, AppName: appName, ProfileID: profileID, @@ -391,7 +391,7 @@ func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error { return nil } -func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEntry) error { +func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarEntry) error { k, ok := exemplarKeyToTimestampKey(e.Key, e.EndTime) if !ok { return fmt.Errorf("invalid exemplar key") @@ -419,7 +419,7 @@ func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEnt // it's not happening: only the first EndTime is honored. err = item.Value(func(val []byte) error { b.metrics.exemplarsReadBytes.Observe(float64(len(val))) - var x exemplarsBatchEntry + var x exemplarEntry if err = x.Deserialize(dx, val); err == nil { e = x.Merge(e) } @@ -441,14 +441,14 @@ func (b *exemplarsBatch) writeExemplarToDB(txn *badger.Txn, e *exemplarsBatchEnt return nil } -func (e *exemplarsBatchEntry) Merge(src *exemplarsBatchEntry) *exemplarsBatchEntry { +func (e *exemplarEntry) Merge(src *exemplarEntry) *exemplarEntry { e.updateTime(src.StartTime, src.EndTime) e.Tree.Merge(src.Tree) e.Key = src.Key return e } -func (e *exemplarsBatchEntry) updateTime(st, et int64) { +func (e *exemplarEntry) updateTime(st, et int64) { if st < e.StartTime { e.StartTime = st } @@ -457,7 +457,7 @@ func (e *exemplarsBatchEntry) updateTime(st, et int64) { } } -func (e *exemplarsBatchEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, error) { +func (e *exemplarEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, error) { b := bytes.NewBuffer(make([]byte, 0, 1<<10)) // 1 KB. b.WriteByte(exemplarsCurrentFormat) // Version. if err := e.Tree.SerializeTruncate(d, maxNodes, b); err != nil { @@ -487,7 +487,7 @@ func (e *exemplarsBatchEntry) Serialize(d *dict.Dict, maxNodes int) ([]byte, err return b.Bytes(), nil } -func (e *exemplarsBatchEntry) Deserialize(d *dict.Dict, b []byte) error { +func (e *exemplarEntry) Deserialize(d *dict.Dict, b []byte) error { buf := bytes.NewBuffer(b) v, err := buf.ReadByte() if err != nil { @@ -503,7 +503,7 @@ func (e *exemplarsBatchEntry) Deserialize(d *dict.Dict, b []byte) error { } } -func (e *exemplarsBatchEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) error { +func (e *exemplarEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) error { t, err := tree.Deserialize(d, src) if err != nil { return err @@ -512,7 +512,7 @@ func (e *exemplarsBatchEntry) deserializeV1(d *dict.Dict, src *bytes.Buffer) err return nil } -func (e *exemplarsBatchEntry) deserializeV2(d *dict.Dict, src *bytes.Buffer) error { +func (e *exemplarEntry) deserializeV2(d *dict.Dict, src *bytes.Buffer) error { t, err := tree.Deserialize(d, src) if err != nil { return err diff --git a/pkg/storage/exemplars_test.go b/pkg/storage/exemplars_test.go index 9f862765fe..6b131bc586 100644 --- a/pkg/storage/exemplars_test.go +++ b/pkg/storage/exemplars_test.go @@ -21,12 +21,82 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/flameql" "github.com/pyroscope-io/pyroscope/pkg/health" "github.com/pyroscope-io/pyroscope/pkg/storage/dict" + "github.com/pyroscope-io/pyroscope/pkg/storage/metadata" "github.com/pyroscope-io/pyroscope/pkg/storage/segment" "github.com/pyroscope-io/pyroscope/pkg/storage/tree" "github.com/pyroscope-io/pyroscope/pkg/testing" ) -var _ = Describe("MergeProfiles", func() { +var _ = Describe("Exemplar query", func() { + testing.WithConfig(func(cfg **config.Config) { + JustBeforeEach(func() { + var err error + s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller)) + Expect(err).ToNot(HaveOccurred()) + }) + Context("when profiles with ID ingested", func() { + It("queries profiling data correctly", func() { + defer s.Close() + + tree := tree.New() + tree.Insert([]byte("a;b"), uint64(1)) + tree.Insert([]byte("a;c"), uint64(2)) + st := testing.SimpleTime(10) + et := testing.SimpleTime(19) + + Expect(s.Put(context.TODO(), &PutInput{ + StartTime: st, + EndTime: et, + Val: tree.Clone(big.NewRat(1, 1)), + Key: segment.NewKey(map[string]string{ + "__name__": "app.cpu", + "span_name": "SomeSpanName", + segment.ProfileIDLabelName: "my-profile-id", + }), + })).ToNot(HaveOccurred()) + + Expect(s.Put(context.TODO(), &PutInput{ + StartTime: st, + EndTime: et, + Key: segment.NewKey(map[string]string{ + "__name__": "app.cpu", + "span_name": "SomeSpanName", + }), + Val: tree.Clone(big.NewRat(1, 1)), + AggregationType: metadata.AverageAggregationType, + Units: metadata.BytesUnits, + SpyName: "debugspy", + SampleRate: 42, + })).ToNot(HaveOccurred()) + + s.exemplars.Sync() + o, err := s.Get(context.Background(), &GetInput{ + Query: &flameql.Query{ + AppName: "app.cpu", + Matchers: []*flameql.TagMatcher{ + { + Key: segment.ProfileIDLabelName, + Value: "my-profile-id", + Op: flameql.OpEqual, + }, + }, + }, + }) + + Expect(err).ToNot(HaveOccurred()) + Expect(o.Tree).ToNot(BeNil()) + Expect(o.Tree.Samples()).To(Equal(uint64(3))) + + Expect(o.AggregationType).To(Equal(metadata.AverageAggregationType)) + Expect(o.Units).To(Equal(metadata.BytesUnits)) + Expect(o.SpyName).To(Equal("debugspy")) + Expect(o.SampleRate).To(Equal(uint32(42))) + }) + }) + }) +}) + +var _ = Describe("Exemplar merge", func() { testing.WithConfig(func(cfg **config.Config) { JustBeforeEach(func() { var err error @@ -95,7 +165,7 @@ var _ = Describe("MergeProfiles", func() { }) }) -var _ = Describe("Profiles retention policy", func() { +var _ = Describe("Exemplars retention policy", func() { testing.WithConfig(func(cfg **config.Config) { JustBeforeEach(func() { var err error @@ -159,7 +229,7 @@ func randomBytesHex(n int) string { return hex.EncodeToString(b) } -var _ = Describe("ConcurrentExemplarsInsert", func() { +var _ = Describe("Concurrent exemplars insertion", func() { testing.WithConfig(func(cfg **config.Config) { JustBeforeEach(func() { var err error @@ -228,7 +298,7 @@ var _ = Describe("Exemplar serialization", func() { t.Insert([]byte("a;b"), uint64(1)) t.Insert([]byte("a;c"), uint64(2)) - e := exemplarsBatchEntry{ + e := exemplarEntry{ Key: exemplarKey(appName, profileID), AppName: appName, ProfileID: profileID, @@ -248,7 +318,7 @@ var _ = Describe("Exemplar serialization", func() { b, err := e.Serialize(d, 1<<10) Expect(err).ToNot(HaveOccurred()) - var n exemplarsBatchEntry + var n exemplarEntry Expect(n.Deserialize(d, b)).ToNot(HaveOccurred()) Expect(e.StartTime).To(Equal(n.StartTime)) @@ -266,7 +336,7 @@ var _ = Describe("Exemplar serialization", func() { b, err := os.ReadFile("./testdata/exemplar.v1.bin") Expect(err).ToNot(HaveOccurred()) - var n exemplarsBatchEntry + var n exemplarEntry Expect(n.Deserialize(dict.New(), b)).ToNot(HaveOccurred()) Expect(n.Tree.Samples()).To(Equal(uint64(81255))) diff --git a/pkg/storage/metadata/metadata.go b/pkg/storage/metadata/metadata.go index 0a680a3b8c..2d353d309b 100644 --- a/pkg/storage/metadata/metadata.go +++ b/pkg/storage/metadata/metadata.go @@ -8,11 +8,11 @@ func (u Units) String() string { const ( SamplesUnits Units = "samples" - ObjectsUnits = "objects" - GoroutinesUnits = "goroutines" - BytesUnits = "bytes" - LockNanosecondsUnits = "lock_nanoseconds" - LockSamplesUnits = "lock_samples" + ObjectsUnits Units = "objects" + GoroutinesUnits Units = "goroutines" + BytesUnits Units = "bytes" + LockNanosecondsUnits Units = "lock_nanoseconds" + LockSamplesUnits Units = "lock_samples" ) type AggregationType string diff --git a/pkg/storage/storage_get.go b/pkg/storage/storage_get.go index 0b8c2d5ebe..f826abfc37 100644 --- a/pkg/storage/storage_get.go +++ b/pkg/storage/storage_get.go @@ -68,36 +68,15 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) { logger.Debug("storage.Get") trace.Logf(ctx, traceCatGetKey, "%+v", gi) - // For backward compatibility, profiles can be fetched by ID using query. + // Profiles can be fetched by ID using query. // If a query includes 'profile_id' matcher others are ignored. if gi.Query != nil { - ids := make([]string, 0, len(gi.Query.Matchers)) - for _, m := range gi.Query.Matchers { - if m.Key != segment.ProfileIDLabelName { - continue - } - if m.Op != flameql.OpEqual { - return nil, fmt.Errorf("only '=' operator is allowed for %q label", segment.ProfileIDLabelName) - } - ids = append(ids, m.Value) + out, ok, err := s.tryGetExemplar(ctx, gi) + if err != nil { + return nil, err } - if len(ids) > 0 { - // TODO(kolesnikovae): load from segment. - o := GetOutput{ - SpyName: "gospy", - Units: "samples", - SampleRate: 100, - Timeline: segment.GenerateTimeline(gi.StartTime, gi.EndTime), - Tree: tree.New(), - } - err := s.exemplars.fetch(ctx, gi.Query.AppName, ids, func(t *tree.Tree) error { - o.Tree.Merge(t) - return nil - }) - if err != nil { - return nil, err - } - return &o, nil + if ok { + return out, nil } } @@ -168,6 +147,58 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) { }, nil } +func (s *Storage) tryGetExemplar(ctx context.Context, gi *GetInput) (*GetOutput, bool, error) { + ids := make([]string, 0, len(gi.Query.Matchers)) + for _, m := range gi.Query.Matchers { + if m.Key != segment.ProfileIDLabelName { + continue + } + if m.Op != flameql.OpEqual { + return nil, true, fmt.Errorf("only '=' operator is allowed for %q label", segment.ProfileIDLabelName) + } + ids = append(ids, m.Value) + } + if len(ids) == 0 { + return nil, false, nil + } + + var ( + t = tree.New() + l = make(map[string]string) + ) + err := s.exemplars.fetch(ctx, gi.Query.AppName, ids, func(e exemplarEntry) error { + t.Merge(e.Tree) + l = e.Labels + return nil + }) + if err != nil { + return nil, true, err + } + + o := GetOutput{ + Timeline: segment.GenerateTimeline(gi.StartTime, gi.EndTime), + Tree: t, + // Defaults: actual values should be loaded from the segment metadata. + SpyName: "gospy", + Units: "samples", + AggregationType: "sum", + SampleRate: 100, + } + + // Exemplar labels map does not contain the app name. + l["__name__"] = gi.Query.AppName + r, ok := s.segments.Lookup(segment.NewKey(l).Normalized()) + if ok { + seg := r.(*segment.Segment) + o.SpyName = seg.SpyName() + o.Units = seg.Units() + o.SampleRate = seg.SampleRate() + o.AggregationType = seg.AggregationType() + } + + return &o, true, nil +} + func (s *Storage) execQuery(_ context.Context, qry *flameql.Query) []dimension.Key { app, found := s.lookupAppDimension(qry.AppName) if !found { diff --git a/pkg/storage/storage_merge.go b/pkg/storage/storage_merge.go index 0e23e142eb..35b26375fa 100644 --- a/pkg/storage/storage_merge.go +++ b/pkg/storage/storage_merge.go @@ -17,8 +17,8 @@ type MergeProfilesOutput struct { func (s *Storage) MergeProfiles(ctx context.Context, mi MergeProfilesInput) (o MergeProfilesOutput, err error) { o.Tree = tree.New() - return o, s.exemplars.fetch(ctx, mi.AppName, mi.Profiles, func(t *tree.Tree) error { - o.Tree.Merge(t) + return o, s.exemplars.fetch(ctx, mi.AppName, mi.Profiles, func(e exemplarEntry) error { + o.Tree.Merge(e.Tree) return nil }) }