Skip to content

Commit

Permalink
Fix test cases with Eventually semantics (#183)
Browse files Browse the repository at this point in the history
* fix test cases with Eventually semantics
  • Loading branch information
lujiajing1126 authored and hanahmily committed Oct 7, 2022
1 parent ae7fe81 commit b1d1c0e
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 128 deletions.
2 changes: 1 addition & 1 deletion pkg/query/logical/measure/measure_plan_indexscan_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
}
}
if len(seriesList) == 0 {
return nil, nil
return executor.EmptyMIterator, nil
}
var builders []logical.SeekerBuilder
if i.Index != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/signal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
)

// ErrSignal is returned when a termination signal is received.
var ErrSignal = errors.New("signal received")
var _ run.Service = (*Handler)(nil)
var (
ErrSignal = errors.New("signal received")
_ run.Service = (*Handler)(nil)
)

// Handler implements a unix signal handler as run.GroupService.
type Handler struct {
Expand Down
34 changes: 17 additions & 17 deletions test/cases/measure/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"sigs.k8s.io/yaml"

common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measure_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
)

Expand All @@ -46,13 +46,13 @@ var inputFS embed.FS
var wantFS embed.FS

// VerifyFn verify whether the query response matches the wanted result
var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) {
i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
query := &measure_v1.QueryRequest{}
innerGm.Expect(err).NotTo(gm.HaveOccurred())
query := &measurev1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, sharedContext)
c := measure_v1.NewMeasureServiceClient(sharedContext.Connection)
c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
if args.WantErr {
Expand All @@ -61,21 +61,21 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
}
return
}
gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
if args.WantEmpty {
gm.Expect(resp.DataPoints).To(gm.BeEmpty())
innerGm.Expect(resp.DataPoints).To(gm.BeEmpty())
return
}
if args.Want == "" {
args.Want = args.Input
}
ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
want := &measure_v1.QueryResponse{}
innerGm.Expect(err).NotTo(gm.HaveOccurred())
want := &measurev1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
gm.Expect(cmp.Equal(resp, want,
innerGm.Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&measure_v1.DataPoint{}, "timestamp"),
protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
j, err := protojson.Marshal(resp)
Expand All @@ -93,18 +93,18 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
//go:embed testdata/*.json
var dataFS embed.FS

func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred())
for i, template := range templates {
rawDataPointValue, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
dataPointValue := &measure_v1.DataPointValue{}
dataPointValue := &measurev1.DataPointValue{}
gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred())
dataPointValue.Timestamp = timestamppb.New(baseTime.Add(time.Duration(i) * time.Minute))
gm.Expect(measure.Send(&measure_v1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
Should(gm.Succeed())
}
}
Expand All @@ -113,11 +113,11 @@ func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteCli
func Write(conn *grpclib.ClientConn, name, group, dataFile string,
baseTime time.Time, interval time.Duration,
) {
c := measure_v1.NewMeasureServiceClient(conn)
c := measurev1.NewMeasureServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
loadData(&common_v1.Metadata{
loadData(&commonv1.Metadata{
Name: name,
Group: group,
}, writeClient, dataFile, baseTime, interval)
Expand Down
7 changes: 5 additions & 2 deletions test/cases/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import (
"time"

g "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"

"github.com/apache/skywalking-banyandb/pkg/test/helpers"
measure_test_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
measureTestData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
)

var (
// SharedContext is the parallel execution context
SharedContext helpers.SharedContext
verify = func(args helpers.Args) {
measure_test_data.VerifyFn(SharedContext, args)
gm.Eventually(func(innerGm gm.Gomega) {
measureTestData.VerifyFn(innerGm, SharedContext, args)
})
}
)

Expand Down
57 changes: 31 additions & 26 deletions test/cases/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,16 @@ import (
"github.com/google/go-cmp/cmp"
g "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"
"sigs.k8s.io/yaml"

common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
stream_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"

grpclib "google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"sigs.k8s.io/yaml"

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
)

// SharedContext is the parallel execution context
Expand All @@ -53,13 +52,13 @@ var inputFS embed.FS
//go:embed want/*.yaml
var wantFS embed.FS

var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
query := &stream_v1.QueryRequest{}
innerGm.Expect(err).NotTo(gm.HaveOccurred())
query := &streamv1.QueryRequest{}
helpers.UnmarshalYAML(i, query)
query.TimeRange = helpers.TimeRange(args, SharedContext)
c := stream_v1.NewStreamServiceClient(SharedContext.Connection)
c := streamv1.NewStreamServiceClient(SharedContext.Connection)
ctx := context.Background()
resp, err := c.Query(ctx, query)
if args.WantErr {
Expand All @@ -68,21 +67,21 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
}
return
}
gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
if args.WantEmpty {
gm.Expect(resp.Elements).To(gm.BeEmpty())
innerGm.Expect(resp.Elements).To(gm.BeEmpty())
return
}
if args.Want == "" {
args.Want = args.Input
}
ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
gm.Expect(err).NotTo(gm.HaveOccurred())
want := &stream_v1.QueryResponse{}
innerGm.Expect(err).NotTo(gm.HaveOccurred())
want := &streamv1.QueryResponse{}
helpers.UnmarshalYAML(ww, want)
gm.Expect(cmp.Equal(resp, want,
innerGm.Expect(cmp.Equal(resp, want,
protocmp.IgnoreUnknown(),
protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
protocmp.Transform())).
To(gm.BeTrue(), func() string {
j, err := protojson.Marshal(resp)
Expand All @@ -95,6 +94,12 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
}
return string(y)
})
}

var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
gm.Eventually(func(innerGm gm.Gomega) {
verifyFn(innerGm, args)
})
},
g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
Expand All @@ -120,7 +125,7 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
//go:embed testdata/*.json
var dataFS embed.FS

func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
var templates []interface{}
content, err := dataFS.ReadFile("testdata/" + dataFile)
gm.Expect(err).ShouldNot(gm.HaveOccurred())
Expand All @@ -129,16 +134,16 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
for i, template := range templates {
rawSearchTagFamily, errMarshal := json.Marshal(template)
gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
searchTagFamily := &model_v1.TagFamilyForWrite{}
searchTagFamily := &modelv1.TagFamilyForWrite{}
gm.Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(gm.HaveOccurred())
e := &stream_v1.ElementValue{
e := &streamv1.ElementValue{
ElementId: strconv.Itoa(i),
Timestamp: timestamppb.New(baseTime.Add(interval * time.Duration(i))),
TagFamilies: []*model_v1.TagFamilyForWrite{
TagFamilies: []*modelv1.TagFamilyForWrite{
{
Tags: []*model_v1.TagValue{
Tags: []*modelv1.TagValue{
{
Value: &model_v1.TagValue_BinaryData{
Value: &modelv1.TagValue_BinaryData{
BinaryData: bb,
},
},
Expand All @@ -147,8 +152,8 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
},
}
e.TagFamilies = append(e.TagFamilies, searchTagFamily)
errInner := stream.Send(&stream_v1.WriteRequest{
Metadata: &common_v1.Metadata{
errInner := stream.Send(&streamv1.WriteRequest{
Metadata: &commonv1.Metadata{
Name: "sw",
Group: "default",
},
Expand All @@ -160,7 +165,7 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT

// Write data into the server
func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, interval time.Duration) {
c := stream_v1.NewStreamServiceClient(conn)
c := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, err := c.Write(ctx)
gm.Expect(err).NotTo(gm.HaveOccurred())
Expand Down
47 changes: 24 additions & 23 deletions test/integration/cold_query/query_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,23 @@ import (
"testing"
"time"

g "github.com/onsi/ginkgo/v2"
gm "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
)

func TestIntegrationColdQuery(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Integration Query Cold Data Suite")
gm.RegisterFailHandler(g.Fail)
g.RunSpecs(t, "Integration Query Cold Data Suite")
}

var (
Expand All @@ -45,25 +46,25 @@ var (
deferFunc func()
)

var _ = SynchronizedBeforeSuite(func() []byte {
Expect(logger.Init(logger.Logging{
var _ = g.SynchronizedBeforeSuite(func() []byte {
gm.Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
})).To(Succeed())
})).To(gm.Succeed())
var addr string
addr, deferFunc = setup.SetUp()
conn, err := grpclib.Dial(
addr,
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
gm.Expect(err).NotTo(gm.HaveOccurred())
now = timestamp.NowMilli().Add(-time.Hour * 24)
interval := 500 * time.Millisecond
cases_stream.Write(conn, "data.json", now, interval)
cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
Expect(conn.Close()).To(Succeed())
casesStream.Write(conn, "data.json", now, interval)
casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
gm.Expect(conn.Close()).To(gm.Succeed())
return []byte(addr)
}, func(address []byte) {
var err error
Expand All @@ -72,20 +73,20 @@ var _ = SynchronizedBeforeSuite(func() []byte {
grpclib.WithTransportCredentials(insecure.NewCredentials()),
grpclib.WithBlock(),
)
cases_stream.SharedContext = helpers.SharedContext{
casesStream.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
cases_measure.SharedContext = helpers.SharedContext{
casesMeasure.SharedContext = helpers.SharedContext{
Connection: connection,
BaseTime: now,
}
Expect(err).NotTo(HaveOccurred())
gm.Expect(err).NotTo(gm.HaveOccurred())
})

var _ = SynchronizedAfterSuite(func() {
var _ = g.SynchronizedAfterSuite(func() {
if connection != nil {
Expect(connection.Close()).To(Succeed())
gm.Expect(connection.Close()).To(gm.Succeed())
}
}, func() {
deferFunc()
Expand Down
Loading

0 comments on commit b1d1c0e

Please sign in to comment.