Skip to content

Commit

Permalink
Use implicit histogram format within query-frontend (#4318)
Browse files Browse the repository at this point in the history
* Use implicit histogram format within query-frontend.

This unblocks query sharding for queries involving native histograms,
and also unblocks using the new internal query result payload format
between query-frontends and rulers.

* Update changelog entry.

* Use Sample and FloatHistogramPair messages when encoding and decoding matrix values.

This removes the need to convert them when decoding in the
query-frontend.

This does not change the wire format as the shapes of the messages
(Sample/MatrixSample and FloatHistogramPair/MatrixHistogram) are the
same.

* Rename protobuf message fields to match existing naming convention.

This does not affect the wire format.

* Enable protobuf format for query frontend integration tests that use native histograms.

* Fix "getting started" integration tests that use native histograms.

* Fix querier and query-frontend caching integration tests that use native histograms.

* Fix trailing whitespace.

* Fix read-write deployment mode integration tests that use native histograms.

* Fix integration tests relied on series being sorted in a particular order.

* Address PR feedback: simplify assertion.

* Use different tenants for each test case to avoid `err-mimir-sample-out-of-order` when trying to create test series.
  • Loading branch information
charleskorn committed Mar 2, 2023
1 parent f3698f5 commit 43e5600
Show file tree
Hide file tree
Showing 25 changed files with 1,000 additions and 1,188 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Querying with using `{__mimir_storage__="ephemeral"}` selector no longer works.
* [ENHANCEMENT] Store-gateway: Reduce memory allocation rate when loading TSDB chunks from Memcached. #4074
* [ENHANCEMENT] Query-frontend: track `cortex_frontend_query_response_codec_duration_seconds` and `cortex_frontend_query_response_codec_payload_bytes` metrics to measure the time taken and bytes read / written while encoding and decoding query result payloads. #4110
* [ENHANCEMENT] Alertmanager: expose additional upstream metrics `cortex_alertmanager_dispatcher_aggregation_groups`, `cortex_alertmanager_dispatcher_alert_processing_duration_seconds`. #4151
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4153 #4304
* [ENHANCEMENT] Querier and query-frontend: add experimental, more performant protobuf query result response format enabled with `-query-frontend.query-result-response-format=protobuf`. #4153 #4304 #4318
* [ENHANCEMENT] Store-gateway: use more efficient chunks fetching and caching. This should reduce CPU, memory utilization, and receive bandwidth of a store-gateway. Enable with `-blocks-storage.bucket-store.chunks-cache.fine-grained-chunks-caching-enabled=true`. #4163 #4174 #4227
* [ENHANCEMENT] Query-frontend: Wait for in-flight queries to finish before shutting down. #4073 #4170
* [ENHANCEMENT] Store-gateway: added `encode` and `other` stage to `cortex_bucket_store_series_request_stage_duration_seconds` metric. #4179
Expand Down
3 changes: 3 additions & 0 deletions integration/getting_started_single_process_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
"-blocks-storage.s3.bucket-name": blocksBucketName,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
}

mimir := e2emimir.NewSingleBinary("mimir-1", flags, e2emimir.WithPorts(9009, 9095), e2emimir.WithConfigFile(mimirConfigFile))
Expand Down
3 changes: 3 additions & 0 deletions integration/getting_started_with_gossiped_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func runTestGettingStartedWithGossipedRing(t *testing.T, seriesName string, genS
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
}

// This mimir will fail to join the cluster configured in yaml file. That's fine.
Expand Down
9 changes: 8 additions & 1 deletion integration/getting_started_with_grafana_mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ func TestGettingStartedWithGrafanaMimir(t *testing.T) {
defer s.Close()

require.NoError(t, copyFileToSharedDir(s, "docs/configurations/demo.yaml", "demo.yaml"))
flags := map[string]string{
// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
}

mimir := e2emimir.NewSingleBinary("mimir", nil, e2emimir.WithPorts(9009, 9095), e2emimir.WithConfigFile("demo.yaml"))
mimir := e2emimir.NewSingleBinary("mimir", flags, e2emimir.WithPorts(9009, 9095), e2emimir.WithConfigFile("demo.yaml"))
require.NoError(t, s.StartAndWaitReady(mimir))

runTestPushSeriesAndQueryBack(t, mimir, "series_1", generateFloatSeries)
Expand Down Expand Up @@ -55,6 +59,9 @@ func TestPlayWithGrafanaMimirTutorial(t *testing.T) {

// Override the list of members to join, setting the hostname we expect within the Docker network created by integration tests.
"-memberlist.join": networkName + "-mimir-1",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
}

// Start Mimir (3 replicas).
Expand Down
6 changes: 6 additions & 0 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
"-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors.
})

if cfg.sendHistogramsInstead {
flags = mergeFlags(flags, map[string]string{
"-query-frontend.query-result-response-format": "protobuf",
})
}

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

Expand Down
7 changes: 5 additions & 2 deletions integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
"-query-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-ingester.max-global-exemplars-per-user": "10000",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -151,7 +154,7 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
result, err := c.Query("series_1", now)
require.NoError(t, err)

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))
assert.ElementsMatch(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// query exemplars for all tenants
exemplars, err := c.QueryExemplars("series_1", now.Add(-1*time.Hour), now.Add(1*time.Hour))
Expand Down Expand Up @@ -192,7 +195,7 @@ func mergeResults(tenantIDs []string, resultsPerTenant []model.Vector) model.Vec
var v model.Vector
for pos, tenantID := range tenantIDs {
for _, r := range resultsPerTenant[pos] {
var s model.Sample = *r
s := *r
s.Metric = r.Metric.Clone()
s.Metric[model.LabelName("__tenant_id__")] = model.LabelValue(tenantID)
v = append(v, &s)
Expand Down
6 changes: 6 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, series
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": "1ms", // Retention period counts from the moment the block was uploaded to storage so we're setting it deliberatelly small so block gets deleted as soon as possible

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

// Start dependencies in common with all test cases.
Expand Down Expand Up @@ -384,6 +387,9 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
"-compactor.cleanup-interval": "2s", // Update bucket index often.
// Query-frontend.
"-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled),

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

// Start Mimir replicas.
Expand Down
3 changes: 3 additions & 0 deletions integration/query_frontend_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TestQueryFrontendUnalignedQuery(t *testing.T) {
"-query-frontend.max-cache-freshness": "0", // Cache everything.
"-query-frontend.results-cache.backend": "memcached",
"-query-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

// Start the query-frontend.
Expand Down
11 changes: 7 additions & 4 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,14 @@ func TestQueryFrontendWithQueryResultPayloadFormats(t *testing.T) {
flags = mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-query-frontend.query-result-response-format": format,
},
)

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

return "", flags
},
withHistograms: true,
withHistograms: format == "protobuf",
})
})
}
Expand All @@ -253,6 +250,12 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
"-query-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
})

if cfg.withHistograms {
flags = mergeFlags(flags, map[string]string{
"-query-frontend.query-result-response-format": "protobuf",
})
}

// Start the query-scheduler if enabled.
var queryScheduler *e2emimir.MimirService
if cfg.querySchedulerEnabled && cfg.querySchedulerDiscoveryMode == "dns" {
Expand Down
10 changes: 9 additions & 1 deletion integration/read_write_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ func TestReadWriteModeQueryingIngester(t *testing.T) {
require.NoError(t, err)
defer s.Close()

client, _ := startReadWriteModeCluster(t, s)
flags := map[string]string{
// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
}

client, _ := startReadWriteModeCluster(t, s, flags)

runQueryingIngester(t, client, "test_series_1", generateFloatSeries)
runQueryingIngester(t, client, "test_hseries_1", generateHistogramSeries)
Expand Down Expand Up @@ -73,6 +78,9 @@ func TestReadWriteModeQueryingStoreGateway(t *testing.T) {
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": "3s",
"-blocks-storage.tsdb.head-compaction-idle-timeout": "1s",

// Enable protobuf format so that we can use native histograms.
"-query-frontend.query-result-response-format": "protobuf",
})

runQueryingStoreGateway(t, client, cluster, "test_series_1", generateFloatSeries)
Expand Down
6 changes: 3 additions & 3 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,10 +857,10 @@ func TestRulerRemoteEvaluation(t *testing.T) {
},
},
"federated rule group": {
tenantsWithMetrics: []string{"tenant-1", "tenant-2"},
ruleGroupOwner: "tenant-2",
tenantsWithMetrics: []string{"tenant-2", "tenant-3"},
ruleGroupOwner: "tenant-3",
ruleExpression: "count(group by (__tenant_id__) (metric))",
groupSourceTenants: []string{"tenant-1", "tenant-2"},
groupSourceTenants: []string{"tenant-2", "tenant-3"},
assertEvalResult: func(evalResult model.Vector) {
require.Len(t, evalResult, 1)
require.Equal(t, evalResult[0].Value, model.SampleValue(2))
Expand Down
36 changes: 18 additions & 18 deletions pkg/api/protobuf_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ func (c protobufCodec) Encode(resp *v1.Response) ([]byte, error) {

func (c protobufCodec) encodeString(s promql.String) mimirpb.StringData {
return mimirpb.StringData{
TimestampMilliseconds: s.T,
Value: s.V,
TimestampMs: s.T,
Value: s.V,
}
}

func (c protobufCodec) encodeScalar(s promql.Scalar) mimirpb.ScalarData {
return mimirpb.ScalarData{
TimestampMilliseconds: s.T,
Value: s.V,
TimestampMs: s.T,
Value: s.V,
}
}

Expand All @@ -104,15 +104,15 @@ func (c protobufCodec) encodeVector(v promql.Vector) mimirpb.VectorData {

if s.H == nil {
samples = append(samples, mimirpb.VectorSample{
Metric: metric,
TimestampMilliseconds: s.T,
Value: s.V,
Metric: metric,
TimestampMs: s.T,
Value: s.V,
})
} else {
histograms = append(histograms, mimirpb.VectorHistogram{
Metric: metric,
TimestampMilliseconds: s.T,
Histogram: *mimirpb.FloatHistogramFromPrometheusModel(s.Point.H),
Metric: metric,
TimestampMs: s.T,
Histogram: *mimirpb.FloatHistogramFromPrometheusModel(s.Point.H),
})
}
}
Expand Down Expand Up @@ -144,19 +144,19 @@ func (c protobufCodec) encodeMatrixSeries(s promql.Series) mimirpb.MatrixSeries
}
}

samples := make([]mimirpb.MatrixSample, 0, len(s.Points)-histogramCount)
histograms := make([]mimirpb.MatrixHistogram, 0, histogramCount)
samples := make([]mimirpb.Sample, 0, len(s.Points)-histogramCount)
histograms := make([]mimirpb.FloatHistogramPair, 0, histogramCount)

for _, p := range s.Points {
if p.H == nil {
samples = append(samples, mimirpb.MatrixSample{
TimestampMilliseconds: p.T,
Value: p.V,
samples = append(samples, mimirpb.Sample{
TimestampMs: p.T,
Value: p.V,
})
} else {
histograms = append(histograms, mimirpb.MatrixHistogram{
TimestampMilliseconds: p.T,
Histogram: *mimirpb.FloatHistogramFromPrometheusModel(p.H),
histograms = append(histograms, mimirpb.FloatHistogramPair{
TimestampMs: p.T,
Histogram: *mimirpb.FloatHistogramFromPrometheusModel(p.H),
})
}
}
Expand Down
Loading

0 comments on commit 43e5600

Please sign in to comment.