Skip to content

Commit

Permalink
e2e(query): Reproduce dedup issue from thanos-io#6257
Browse files Browse the repository at this point in the history
Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com>
  • Loading branch information
douglascamata authored and HC Zhu committed Jun 27, 2023
1 parent d44dbd6 commit e088533
Showing 1 changed file with 147 additions and 0 deletions.
147 changes: 147 additions & 0 deletions test/e2e/query_test.go
Expand Up @@ -849,6 +849,153 @@ func TestSidecarStorePushdown(t *testing.T) {
})
}

func TestQueryStoreDedup(t *testing.T) {
t.Parallel()

e, err := e2e.New(e2e.WithName("storededup"))
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)

bucket := "store-gw-dedup-test"
minio := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(minio))

l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, minio.Endpoint("http"), minio.Dir()), "test")
testutil.Ok(t, err)

storeGW := e2ethanos.NewStoreGW(
e,
"s1",
client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()),
},
"",
nil,
)
testutil.Ok(t, e2e.StartAndWaitReady(storeGW))

tests := []struct {
replicas []string
extLabel string
intLabel string
desc string
blockFinder string
}{
{
desc: "Deduplication works with external label",
replicas: []string{"a", "b"},
extLabel: "replica",
blockFinder: "dedupext",
},
{
desc: "Deduplication works with internal label",
replicas: []string{"a", "b"},
intLabel: "replica",
blockFinder: "dedupint",
},
{
desc: "Deduplication works with both internal and external label",
replicas: []string{"a", "b"},
intLabel: "replica",
extLabel: "receive_replica",
blockFinder: "dedupintext",
},
}

var totalBlocks int
for _, tt := range tests {
createSimpleReplicatedBlocksInS3(ctx, t, e, l, bkt, tt.replicas, tt.extLabel, tt.intLabel, tt.blockFinder)
totalBlocks += len(tt.replicas)
}
testutil.Ok(t, storeGW.WaitSumMetrics(e2emon.Equals(float64(totalBlocks)), "thanos_blocks_meta_synced"))

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
querierBuilder := e2ethanos.NewQuerierBuilder(e, tt.blockFinder, storeGW.InternalEndpoint("grpc")).WithProxyStrategy("lazy")
var replicaLabels []string
if tt.intLabel != "" {
replicaLabels = append(replicaLabels, tt.intLabel)
}
if tt.extLabel != "" {
replicaLabels = append(replicaLabels, tt.extLabel)
}
if len(replicaLabels) > 0 {
sort.Strings(replicaLabels)
querierBuilder = querierBuilder.WithReplicaLabels(replicaLabels...)
}
querier := querierBuilder.Init()
testutil.Ok(t, e2e.StartAndWaitReady(querier))

instantQuery(t, ctx, querier.Endpoint("http"), func() string {
return fmt.Sprintf("max_over_time(simple_series{instance='foo_0', block_finder='%s'}[2h])", tt.blockFinder)
}, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, 1)
testutil.Ok(t, err)
testutil.Ok(t, querier.Stop())
})
}
}

func createSimpleReplicatedBlocksInS3(
ctx context.Context,
t *testing.T,
dockerEnv *e2e.DockerEnvironment,
logger log.Logger,
bucket *s3.Bucket,
replicas []string,
extReplicaLabel string,
intReplicaLabel string,
blockFinder string,
) {
blockSizes := []struct {
samples int
series int
name string
}{
{samples: 1, series: 1, name: "simple_series"},
}
now := time.Now()
dir := filepath.Join(dockerEnv.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))
for _, replica := range replicas {
for _, blockSize := range blockSizes {
series := make([]labels.Labels, blockSize.series)
for i := 0; i < blockSize.series; i++ {
bigSeriesLabels := labels.FromStrings("__name__", blockSize.name, "instance", fmt.Sprintf("foo_%d", i), "block_finder", blockFinder)
if intReplicaLabel != "" {
bigSeriesLabels = append(bigSeriesLabels, labels.Label{Name: intReplicaLabel, Value: replica})
}
sort.Sort(bigSeriesLabels)
series[i] = bigSeriesLabels
}
extLabels := labels.FromStrings("prometheus", "p1")
if extReplicaLabel != "" {
extLabels = append(extLabels, labels.Label{Name: extReplicaLabel, Value: replica})
}
blockID, err := e2eutil.CreateBlockWithBlockDelay(ctx,
dir,
series,
blockSize.samples,
timestamp.FromTime(now),
timestamp.FromTime(now.Add(2*time.Hour)),
30*time.Minute,
extLabels,
0,
metadata.NoneFunc,
)
testutil.Ok(t, err)
blockPath := path.Join(dir, blockID.String())
testutil.Ok(t, objstore.UploadDir(ctx, logger, bucket, blockPath, blockID.String()))
}
}
}

func TestSidecarQueryEvaluation(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit e088533

Please sign in to comment.