Skip to content

Commit

Permalink
fix test cases with Eventually semantics
Browse files Browse the repository at this point in the history
Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>
  • Loading branch information
lujiajing1126 committed Oct 6, 2022
1 parent 35a8b9b commit 6498e3b
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pkg/query/logical/measure/measure_plan_indexscan_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
}
}
if len(seriesList) == 0 {
return nil, nil
return executor.EmptyMIterator, nil
}
var builders []logical.SeekerBuilder
if i.Index != nil {
Expand Down
60 changes: 30 additions & 30 deletions test/cases/measure/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
"time"

"github.com/google/go-cmp/cmp"
g "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"sigs.k8s.io/yaml"

common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measure_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
)

Expand All @@ -46,38 +46,38 @@ var inputFS embed.FS
var wantFS embed.FS

// VerifyFn verify whether the query response matches the wanted result
var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
var VerifyFn = func(g Gomega, sharedContext helpers.SharedContext, args helpers.Args) {
i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
query := &measure_v1.QueryRequest{}
g.Expect(err).NotTo(HaveOccurred())
query := &measurev1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, sharedContext)
c := measure_v1.NewMeasureServiceClient(sharedContext.Connection)
c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
if args.WantErr {
if err == nil {
g.Fail("expect error")
Fail("expect error")
}
return
}
gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
Expect(err).NotTo(HaveOccurred(), query.String())
if args.WantEmpty {
gm.Expect(resp.DataPoints).To(gm.BeEmpty())
Expect(resp.DataPoints).To(BeEmpty())
return
}
if args.Want == "" {
args.Want = args.Input
}
ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
want := &measure_v1.QueryResponse{}
Expect(err).NotTo(HaveOccurred())
want := &measurev1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
gm.Expect(cmp.Equal(resp, want,
Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&measure_v1.DataPoint{}, "timestamp"),
protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
To(BeTrue(), func() string {
j, err := protojson.Marshal(resp)
if err != nil {
return err.Error()
Expand All @@ -93,37 +93,37 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
//go:embed testdata/*.json
var dataFS embed.FS

func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred())
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
for i, template := range templates {
rawDataPointValue, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
dataPointValue := &measure_v1.DataPointValue{}
gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred())
Expect(errMarshal).ShouldNot(HaveOccurred())
dataPointValue := &measurev1.DataPointValue{}
Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(HaveOccurred())
dataPointValue.Timestamp = timestamppb.New(baseTime.Add(time.Duration(i) * time.Minute))
gm.Expect(measure.Send(&measure_v1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
Should(gm.Succeed())
Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
Should(Succeed())
}
}

// Write data into the server
func Write(conn *grpclib.ClientConn, name, group, dataFile string,
baseTime time.Time, interval time.Duration,
) {
c := measure_v1.NewMeasureServiceClient(conn)
c := measurev1.NewMeasureServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
loadData(&common_v1.Metadata{
Expect(err).NotTo(HaveOccurred())
loadData(&commonv1.Metadata{
Name: name,
Group: group,
}, writeClient, dataFile, baseTime, interval)
gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
gm.Eventually(func() error {
Expect(writeClient.CloseSend()).To(Succeed())
Eventually(func() error {
_, err := writeClient.Recv()
return err
}).Should(gm.Equal(io.EOF))
}).Should(Equal(io.EOF))
}
39 changes: 21 additions & 18 deletions test/cases/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,36 @@ package measure_test
import (
"time"

g "github.com/onsi/ginkgo/v2"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/test/helpers"
measure_test_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
measureTestData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
)

var (
// SharedContext is the parallel execution context
SharedContext helpers.SharedContext
verify = func(args helpers.Args) {
measure_test_data.VerifyFn(SharedContext, args)
Eventually(func(g Gomega) {
measureTestData.VerifyFn(g, SharedContext, args)
})
}
)

var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 1 * time.Hour}),
g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 1 * time.Hour}),
g.Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 1 * time.Hour}),
g.Entry("top 2", helpers.Args{Input: "top", Duration: 1 * time.Hour}),
g.Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 1 * time.Hour}),
g.Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}),
g.Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}),
g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 1 * time.Hour}),
g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 1 * time.Hour}),
g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 1 * time.Hour}),
g.Entry("without field", helpers.Args{Input: "no_field", Duration: 1 * time.Hour}),
var _ = DescribeTable("Scanning Measures", verify,
Entry("all", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 1 * time.Hour}),
Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
Entry("group and max", helpers.Args{Input: "group_max", Duration: 1 * time.Hour}),
Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 1 * time.Hour}),
Entry("top 2", helpers.Args{Input: "top", Duration: 1 * time.Hour}),
Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 1 * time.Hour}),
Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}),
Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}),
Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
Entry("match a node", helpers.Args{Input: "match_node", Duration: 1 * time.Hour}),
Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 1 * time.Hour}),
Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 1 * time.Hour}),
Entry("without field", helpers.Args{Input: "no_field", Duration: 1 * time.Hour}),
)
111 changes: 58 additions & 53 deletions test/cases/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@ import (
"time"

"github.com/google/go-cmp/cmp"
g "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
stream_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"sigs.k8s.io/yaml"

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
)

// SharedContext is the parallel execution context
Expand All @@ -53,38 +52,38 @@ var inputFS embed.FS
//go:embed want/*.yaml
var wantFS embed.FS

var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
var verifyFn = func(g Gomega, args helpers.Args) {
i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
query := &stream_v1.QueryRequest{}
Expect(err).NotTo(HaveOccurred())
query := &streamv1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, SharedContext)
c := stream_v1.NewStreamServiceClient(SharedContext.Connection)
c := streamv1.NewStreamServiceClient(SharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
if args.WantErr {
if err == nil {
g.Fail("expect error")
Fail("expect error")
}
return
}
gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
Expect(err).NotTo(HaveOccurred(), query.String())
if args.WantEmpty {
gm.Expect(resp.Elements).To(gm.BeEmpty())
Expect(resp.Elements).To(BeEmpty())
return
}
if args.Want == "" {
args.Want = args.Input
}
ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
want := &stream_v1.QueryResponse{}
Expect(err).NotTo(HaveOccurred())
want := &streamv1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
gm.Expect(cmp.Equal(resp, want,
Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
To(BeTrue(), func() string {
j, err := protojson.Marshal(resp)
if err != nil {
return err.Error()
Expand All @@ -95,49 +94,55 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
}
return string(y)
})
}

var _ = DescribeTable("Scanning Streams", func(args helpers.Args) {
Eventually(func(g Gomega) {
verifyFn(g, args)
})
},
g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * time.Hour}),
g.Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
g.Entry("invalid time range", helpers.Args{
Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
Entry("offset", helpers.Args{Input: "offset", Duration: 1 * time.Hour}),
Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
Entry("invalid time range", helpers.Args{
Input: "all",
Begin: timestamppb.New(time.Unix(0, int64(math.MinInt64+time.Millisecond)).Truncate(time.Millisecond)),
End: timestamppb.New(time.Unix(0, math.MaxInt64).Truncate(time.Millisecond)),
}),
g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
g.Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
g.Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),
g.Entry("numeric local index: less and eq", helpers.Args{Input: "less_eq", Duration: 1 * time.Hour}),
g.Entry("logical expression", helpers.Args{Input: "logical", Duration: 1 * time.Hour}),
g.Entry("having", helpers.Args{Input: "having", Duration: 1 * time.Hour}),
g.Entry("full text searching", helpers.Args{Input: "search", Duration: 1 * time.Hour}),
Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),
Entry("numeric local index: less and eq", helpers.Args{Input: "less_eq", Duration: 1 * time.Hour}),
Entry("logical expression", helpers.Args{Input: "logical", Duration: 1 * time.Hour}),
Entry("having", helpers.Args{Input: "having", Duration: 1 * time.Hour}),
Entry("full text searching", helpers.Args{Input: "search", Duration: 1 * time.Hour}),
)

//go:embed testdata/*.json
var dataFS embed.FS

func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred())
Expect(err).ShouldNot(HaveOccurred())
Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
searchTagFamily := &model_v1.TagFamilyForWrite{}
gm.Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(gm.HaveOccurred())
e := &stream_v1.ElementValue{
Expect(errMarshal).ShouldNot(HaveOccurred())
searchTagFamily := &modelv1.TagFamilyForWrite{}
Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(HaveOccurred())
e := &streamv1.ElementValue{
ElementId: strconv.Itoa(i),
Timestamp: timestamppb.New(baseTime.Add(interval * time.Duration(i))),
TagFamilies: []*model_v1.TagFamilyForWrite{
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: []*model_v1.TagValue{
Tags: []*modelv1.TagValue{
{
Value: &model_v1.TagValue_BinaryData{
Value: &modelv1.TagValue_BinaryData{
BinaryData: bb,
},
},
Expand All @@ -146,27 +151,27 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
errInner := stream.Send(&stream_v1.WriteRequest{
Metadata: &common_v1.Metadata{
errInner := stream.Send(&streamv1.WriteRequest{
Metadata: &commonv1.Metadata{
Name: "sw",
Group: "default",
},
Element: e,
})
gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
Expect(errInner).ShouldNot(HaveOccurred())
}
}

// Write data into the server
func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, interval time.Duration) {
c := stream_v1.NewStreamServiceClient(conn)
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
Expect(err).NotTo(HaveOccurred())
loadData(writeClient, dataFile, baseTime, interval)
gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
gm.Eventually(func() error {
Expect(writeClient.CloseSend()).To(Succeed())
Eventually(func() error {
_, err := writeClient.Recv()
return err
}).Should(gm.Equal(io.EOF))
}).Should(Equal(io.EOF))
}
Loading

0 comments on commit 6498e3b

Please sign in to comment.