Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable native histograms fully on the read path #4513

Merged
merged 6 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* `-query-scheduler.grpc-client-config.*`
* `-query-scheduler.ring.etcd.*`
* `-overrides-exporter.ring.etcd.*`
* [FEATURE] Distributor, ingester, querier, query-frontend, store-gateway: add experimental support for native histograms. Requires that the experimental protobuf query result response format is enabled by `-query-frontend.query-result-response-format=protobuf` on the query frontend. #4286 #4352 #4354 #4376 #4377 #4387 #4396 #4425 #4442 #4494 #4512 #4513
* [ENHANCEMENT] Allow to define service name used for tracing via `JAEGER_SERVICE_NAME` environment variable. #4394
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4304 #4318 #4375
* [ENHANCEMENT] Compactor: added experimental configuration parameter `-compactor.first-level-compaction-wait-period`, to configure how long the compactor should wait before compacting 1st level blocks (uploaded by ingesters). This configuration option allows to reduce the chances compactor begins compacting blocks before all ingesters have uploaded their blocks to the storage. #4401
Expand Down
68 changes: 39 additions & 29 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"fmt"
"strconv"
"sync"
"testing"
Expand All @@ -16,7 +17,6 @@ import (
e2ecache "github.com/grafana/e2e/cache"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -26,34 +26,33 @@ import (
type querierShardingTestConfig struct {
shuffleShardingEnabled bool
querySchedulerEnabled bool
sendHistograms bool
querierResponseFormat string
}

func TestQuerierShuffleShardingWithoutQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: true,
querySchedulerEnabled: false,
})
}

func TestQuerierShuffleShardingWithQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: true,
querySchedulerEnabled: true,
})
}

func TestQuerierNoShardingWithoutQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: false,
querySchedulerEnabled: false,
})
}

func TestQuerierNoShardingWithQueryScheduler(t *testing.T) {
runQuerierShardingTest(t, querierShardingTestConfig{
shuffleShardingEnabled: false,
querySchedulerEnabled: true,
})
func TestQuerySharding(t *testing.T) {
for _, shuffleShardingEnabled := range []bool{false, true} {
for _, querySchedulerEnabled := range []bool{false, true} {
for _, sendHistograms := range []bool{false, true} {
for _, querierResponseFormat := range []string{"json", "protobuf"} {
if sendHistograms && querierResponseFormat == "json" {
// histograms over json are not supported
continue
}
testName := fmt.Sprintf("shuffle shard=%v/query scheduler=%v/histograms=%v/format=%v",
shuffleShardingEnabled, querySchedulerEnabled, sendHistograms, querierResponseFormat,
)
cfg := querierShardingTestConfig{
shuffleShardingEnabled: shuffleShardingEnabled,
querySchedulerEnabled: querySchedulerEnabled,
sendHistograms: sendHistograms,
querierResponseFormat: querierResponseFormat,
}
t.Run(testName, func(t *testing.T) { runQuerierShardingTest(t, cfg) })
}
}
}
}
}

func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
Expand All @@ -76,6 +75,12 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
"-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors.
})

if cfg.sendHistograms {
flags = mergeFlags(flags, map[string]string{
"-query-frontend.query-result-response-format": cfg.querierResponseFormat,
})
}

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

Expand Down Expand Up @@ -122,8 +127,13 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
distClient, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

var series []prompb.TimeSeries
series, expectedVector, _ := generateSeries("series_1", now)
var genSeries generateSeriesFunc
if !cfg.sendHistograms {
genSeries = generateFloatSeries
} else {
genSeries = generateHistogramSeries
}
series, expectedVector, _ := genSeries("series_1", now)

res, err := distClient.Push(series)
require.NoError(t, err)
Expand Down
22 changes: 18 additions & 4 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
"-query-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-ingester.max-global-exemplars-per-user": "10000",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -131,7 +134,13 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
require.NoError(t, err)

var series []prompb.TimeSeries
series, expectedVectors[u], _ = generateSeries("series_1", now)
var genSeries generateSeriesFunc
if u%2 == 0 {
genSeries = generateFloatSeries
} else {
genSeries = generateHistogramSeries
}
series, expectedVectors[u], _ = genSeries("series_1", now)

res, err := c.Push(series)
require.NoError(t, err)
Expand All @@ -145,19 +154,24 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
result, err := c.Query("series_1", now)
require.NoError(t, err)

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))
assert.ElementsMatch(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// query exemplars for all tenants
exemplars, err := c.QueryExemplars("series_1", now.Add(-1*time.Hour), now.Add(1*time.Hour))
require.NoError(t, err)
assert.Len(t, exemplars, numUsers)

// ensure a push to multiple tenants is failing
series, _, _ := generateSeries("series_1", now)
series, _, _ := generateFloatSeries("series_1", now)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 500, res.StatusCode)

series, _, _ = generateHistogramSeries("series_1", now)
res, err = c.Push(series)
require.NoError(t, err)
require.Equal(t, 500, res.StatusCode)

// check metric label values for total queries in the query frontend
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")),
Expand All @@ -181,7 +195,7 @@ func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vec
var v model.Vector
for pos, tenantID := range tenantIDs {
for _, r := range resultsPerTenant[pos] {
var s model.Sample = *r
s := *r
s.Metric = r.Metric.Clone()
s.Metric[model.LabelName("__tenant_id__")] = model.LabelValue(tenantID)
v = append(v, &s)
Expand Down
Loading