Skip to content

Commit

Permalink
Adding more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Sep 16, 2022
1 parent 6011238 commit b233577
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 161 deletions.
9 changes: 7 additions & 2 deletions pkg/firedb/firedb.go
Expand Up @@ -379,8 +379,13 @@ func (f *FireDB) MergeProfilesStacktraces(ctx context.Context, stream *connect.B
return nil
}

type BidiServerMergeProfilesStacktraces interface {
Send(*ingestv1.MergeProfilesStacktracesResponse) error
Receive() (*ingestv1.MergeProfilesStacktracesRequest, error)
}

// filterProfiles sends profiles to the client and filters them via the bidi stream.
func filterProfiles(ctx context.Context, profiles iter.Iterator[Profile], batchProfileSize int, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) ([]Profile, error) {
func filterProfiles(ctx context.Context, profiles iter.Iterator[Profile], batchProfileSize int, stream BidiServerMergeProfilesStacktraces) ([]Profile, error) {
type labelWithIndex struct {
firemodel.Labels
index int
Expand All @@ -390,7 +395,7 @@ func filterProfiles(ctx context.Context, profiles iter.Iterator[Profile], batchP
Profiles: make([]*ingestv1.SeriesProfile, 0, batchProfileSize),
LabelsSets: make([]*commonv1.Labels, 0, batchProfileSize),
}
if err := iter.ReadBatchIterator(ctx, profiles, batchProfileSize, func(ctx context.Context, batch []Profile) error {
if err := iter.ReadBatch(ctx, profiles, batchProfileSize, func(ctx context.Context, batch []Profile) error {
sp, _ := opentracing.StartSpanFromContext(ctx, "Filtering batch")
sp.LogFields(
otlog.Int("batch_len", len(batch)),
Expand Down
83 changes: 83 additions & 0 deletions pkg/firedb/firedb_test.go
Expand Up @@ -12,12 +12,17 @@ import (
"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/common/model"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

schemav1 "github.com/grafana/fire/pkg/firedb/schemas/v1"
commonv1 "github.com/grafana/fire/pkg/gen/common/v1"
googlev1 "github.com/grafana/fire/pkg/gen/google/v1"
ingestv1 "github.com/grafana/fire/pkg/gen/ingester/v1"
"github.com/grafana/fire/pkg/iter"
firemodel "github.com/grafana/fire/pkg/model"
"github.com/grafana/fire/pkg/testhelper"
)

func TestCreateLocalDir(t *testing.T) {
Expand Down Expand Up @@ -146,3 +151,81 @@ func TestCloseFile(t *testing.T) {
require.NoError(t, db.AwaitTerminated(context.Background()))
require.NoError(t, os.RemoveAll(dataPath))
}

type fakeBidiServerMergeProfilesStacktraces struct {
profilesSent []*ingestv1.ProfileSets
keep [][]bool
t *testing.T
}

func (f *fakeBidiServerMergeProfilesStacktraces) Send(resp *ingestv1.MergeProfilesStacktracesResponse) error {
f.profilesSent = append(f.profilesSent, testhelper.CloneProto(f.t, resp.SelectedProfiles))
return nil
}

func (f *fakeBidiServerMergeProfilesStacktraces) Receive() (*ingestv1.MergeProfilesStacktracesRequest, error) {
res := &ingestv1.MergeProfilesStacktracesRequest{
Profiles: f.keep[0],
}
f.keep = f.keep[1:]
return res, nil
}

func TestFilterProfiles(t *testing.T) {
ctx := context.Background()
profiles := lo.Times(11, func(i int) Profile {
return ProfileWithLabels{
Profile: &schemav1.Profile{TimeNanos: int64(i * int(time.Minute))},
lbs: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i)),
fp: model.Fingerprint(firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i)).Hash()),
}
})
in := iter.NewSliceIterator(profiles)
bidi := &fakeBidiServerMergeProfilesStacktraces{
keep: [][]bool{{}, {true}, {true}},
t: t,
}
filtered, err := filterProfiles(ctx, in, 5, bidi)
require.NoError(t, err)
require.Equal(t, 2, len(filtered))
require.Equal(t, 3, len(bidi.profilesSent))
testhelper.EqualProto(t, []*ingestv1.ProfileSets{
{
LabelsSets: lo.Times(5, func(i int) *commonv1.Labels {
return &commonv1.Labels{Labels: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i))}
}),
Profiles: lo.Times(5, func(i int) *ingestv1.SeriesProfile {
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64(i * int(time.Minute)))), LabelIndex: int32(i)}
}),
},
{
LabelsSets: lo.Times(5, func(i int) *commonv1.Labels {
return &commonv1.Labels{Labels: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+5))}
}),
Profiles: lo.Times(5, func(i int) *ingestv1.SeriesProfile {
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64((i + 5) * int(time.Minute)))), LabelIndex: int32(i)}
}),
},
{
LabelsSets: lo.Times(1, func(i int) *commonv1.Labels {
return &commonv1.Labels{Labels: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", i+10))}
}),
Profiles: lo.Times(1, func(i int) *ingestv1.SeriesProfile {
return &ingestv1.SeriesProfile{Timestamp: int64(model.TimeFromUnixNano(int64((i + 10) * int(time.Minute)))), LabelIndex: int32(i)}
}),
},
}, bidi.profilesSent)

require.Equal(t, []Profile{
ProfileWithLabels{
Profile: &schemav1.Profile{TimeNanos: int64(5 * int(time.Minute))},
lbs: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 5)),
fp: model.Fingerprint(firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 5)).Hash()),
},
ProfileWithLabels{
Profile: &schemav1.Profile{TimeNanos: int64(10 * int(time.Minute))},
lbs: firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 10)),
fp: model.Fingerprint(firemodel.LabelsFromStrings("foo", "bar", "i", fmt.Sprintf("%d", 10)).Hash()),
},
}, filtered)
}
5 changes: 3 additions & 2 deletions pkg/iter/batch.go
Expand Up @@ -2,9 +2,10 @@ package iter

import "context"

// ReadBatchIterator reads profiles from the iterator in batches and call fn.
// ReadBatch reads profiles from the iterator in batches and call fn.
// If fn returns an error, the iteration is stopped and the error is returned.
func ReadBatchIterator[T any](ctx context.Context, iterator Iterator[T], batchSize int, fn func(context.Context, []T) error) error {
// The array passed in fn is reused between calls, so it should be copied if needed.
func ReadBatch[T any](ctx context.Context, iterator Iterator[T], batchSize int, fn func(context.Context, []T) error) error {
defer iterator.Close()
batch := make([]T, 0, batchSize)
for {
Expand Down
39 changes: 39 additions & 0 deletions pkg/iter/batch_test.go
@@ -0,0 +1,39 @@
package iter

import (
"context"
"errors"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/require"
)

func TestReadBatch(t *testing.T) {
ctx := context.Background()

require.Error(t, ReadBatch(ctx, NewSliceIterator(lo.Times(20, func(i int) int { return i })), 10,
func(context.Context, []int) error {
return errors.New("foo")
}))

var batches [][]int
require.NoError(t, ReadBatch(ctx, NewSliceIterator(lo.Times(20, func(i int) int { return i })), 10,
func(_ context.Context, batch []int) error {
c := make([]int, len(batch))
copy(c, batch)
batches = append(batches, c)
return nil
}))
require.Equal(t, [][]int{lo.Times(10, func(i int) int { return i }), lo.Times(10, func(i int) int { return 10 + i })}, batches)

batches = nil
require.NoError(t, ReadBatch(ctx, NewSliceIterator(lo.Times(20, func(i int) int { return i })), 11,
func(_ context.Context, batch []int) error {
c := make([]int, len(batch))
copy(c, batch)
batches = append(batches, c)
return nil
}))
require.Equal(t, [][]int{lo.Times(11, func(i int) int { return i }), lo.Times(9, func(i int) int { return 11 + i })}, batches)
}
2 changes: 1 addition & 1 deletion pkg/querier/ingester_querier.go
Expand Up @@ -17,7 +17,7 @@ type IngesterQueryClient interface {
ProfileTypes(context.Context, *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
SelectProfiles(context.Context, *connect.Request[ingestv1.SelectProfilesRequest]) (*connect.Response[ingestv1.SelectProfilesResponse], error)
Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)
MergeProfilesStacktraces(context.Context) *connect.BidiStreamForClient[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]
MergeProfilesStacktraces(context.Context) BidiClientMergeProfilesStacktraces
}

type responseFromIngesters[T interface{}] struct {
Expand Down
60 changes: 0 additions & 60 deletions pkg/querier/iterator_test.go
Expand Up @@ -12,66 +12,6 @@ import (
"github.com/grafana/fire/pkg/testhelper"
)

type testProfile struct {
Ts int64
Labels *commonv1.Labels
}

type fakeBidiClient struct {
profiles chan *ingestv1.ProfileSets
batches []*ingestv1.ProfileSets
kept []testProfile
cur *ingestv1.ProfileSets
}

func newFakeBidiClient(batches []*ingestv1.ProfileSets) *fakeBidiClient {
res := &fakeBidiClient{
profiles: make(chan *ingestv1.ProfileSets, 1),
}
res.profiles <- batches[0]
batches = batches[1:]
res.batches = batches
return res
}

func (f *fakeBidiClient) Send(in *ingestv1.MergeProfilesStacktracesRequest) error {
for i, b := range in.Profiles {
if b {
f.kept = append(f.kept, testProfile{
Ts: f.cur.Profiles[i].Timestamp,
Labels: f.cur.LabelsSets[f.cur.Profiles[i].LabelIndex],
})
}
}
if len(f.batches) == 0 {
close(f.profiles)
return nil
}
f.profiles <- f.batches[0]
f.batches = f.batches[1:]
return nil
}

func (f *fakeBidiClient) Receive() (*ingestv1.MergeProfilesStacktracesResponse, error) {
profiles := <-f.profiles
if profiles == nil {
return &ingestv1.MergeProfilesStacktracesResponse{
Result: &ingestv1.MergeProfilesStacktracesResult{
Stacktraces: []*ingestv1.StacktraceSample{
{FunctionIds: []int32{0, 1, 2}, Value: 1},
},
FunctionNames: []string{"foo", "bar", "buzz"},
},
}, nil
}
f.cur = profiles
return &ingestv1.MergeProfilesStacktracesResponse{
SelectedProfiles: profiles,
}, nil
}
func (f *fakeBidiClient) CloseSend() error { return nil }
func (f *fakeBidiClient) CloseReceive() error { return nil }

func TestDedupeBidi(t *testing.T) {
resp1 := newFakeBidiClient([]*ingestv1.ProfileSets{
{
Expand Down

0 comments on commit b233577

Please sign in to comment.