Skip to content

Implementing Parquet Queryable with fallback #6743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
* [FEATURE] Ruler: Add support for group labels. #6665
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
* [FEATURE] Experimental Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet and Parquet Queryable. #6716 #6743
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/parquet-go/parquet-go v0.25.0
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
github.com/prometheus/procfs v0.15.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d h1:j7d62fP5x6yUFNgNDth5JCLOoj6ZclXkBneSATbPZig=
github.com/prometheus-community/parquet-common v0.0.0-20250428074311-306c8486441d/go.mod h1:Eo3B53ZLcfCEV06clM4UIFTgwxRXm0BHdiaRslKe3Y8=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down
177 changes: 177 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//go:build integration_query_fuzz
// +build integration_query_fuzz

package integration

import (
"context"
"fmt"
"math/rand"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/cortexproject/promqlsmith"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestParquetFuzz(t *testing.T) {

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "1",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
// Querier
"-querier.query-parquet-files": "true",
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
numSeries := 10
numSamples := 60
lbls := make([]labels.Labels, 0, numSeries*2)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
now := time.Now()
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour)

for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_a"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i % 3)},
{Name: "status_code", Value: statusCodes[i%5]},
})

lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_b"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
{Name: "status_code", Value: statusCodes[(i+1)%5]},
})
}
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait until we convert the blocks
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
found := false

err := bkt.Iter(context.Background(), "", func(name string) error {
fmt.Println(name)
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
found = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found
})

att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
require.NoError(t, err)
numberOfIndexesUpdate := 0
lastUpdate := att.LastModified

cortex_testutil.Poll(t, 30*time.Second, 5, func() interface{} {
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
require.NoError(t, err)
if lastUpdate != att.LastModified {
lastUpdate = att.LastModified
numberOfIndexesUpdate++
}
return numberOfIndexesUpdate
})

c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus("", map[string]string{
"--enable-feature": "promql-experimental-functions",
})
require.NoError(t, s.StartAndWaitReady(prom))

c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
require.NoError(t, err)
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledFunctions(enabledFunctions),
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)

require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
5 changes: 5 additions & 0 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,11 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
// Generate an updated in-memory version of the bucket index.
begin = time.Now()
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)

if c.cfgProvider.ParquetConverterEnabled(userID) {
w.EnableParquet()
}

idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
idxs.Status = bucketindex.GenericError
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}

func (m *mockConfigProvider) ParquetConverterEnabled(userID string) bool {
return false
}

func newMockConfigProvider() *mockConfigProvider {
return &mockConfigProvider{
userRetentionPeriods: make(map[string]time.Duration),
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
// ConfigProvider defines the per-tenant config provider for the Compactor.
type ConfigProvider interface {
bucket.TenantConfigProvider
ParquetConverterEnabled(userID string) bool
CompactorBlocksRetentionPeriod(user string) time.Duration
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,20 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
var servs []services.Service

//nolint:revive // I prefer this form over removing 'else', because it allows q to have smaller scope.
if q, err := initQueryableForEngine(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil {
var queriable prom_storage.Queryable
if q, err := initBlockStoreQueryable(t.Cfg, t.Overrides, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to initialize querier: %v", err)
} else {
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q))
if s, ok := q.(services.Service); ok {
queriable = q
if t.Cfg.Querier.QueryParquetFiles {
pq, err := querier.NewParquetQueryable(t.Cfg.Querier, t.Cfg.BlocksStorage, t.Overrides, q, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to initialize parquet querier: %v", err)
}
queriable = pq
}
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(queriable))
if s, ok := queriable.(services.Service); ok {
servs = append(servs, s)
}
}
Expand All @@ -424,7 +433,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
}
}

func initQueryableForEngine(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (prom_storage.Queryable, error) {
func initBlockStoreQueryable(cfg Config, limits *validation.Overrides, reg prometheus.Registerer) (*querier.BlocksStoreQueryable, error) {
// When running in single binary, if the blocks sharding is disabled and no custom
// store-gateway address has been configured, we can set it to the running process.
if cfg.isModuleEnabled(All) && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" {
Expand Down
29 changes: 11 additions & 18 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand All @@ -46,10 +45,8 @@ const (
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

type Config struct {
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`

DataDir string `yaml:"data_dir"`

Expand All @@ -64,8 +61,7 @@ type Converter struct {
cfg Config
storageCfg cortex_tsdb.BlocksStorageConfig

allowedTenants *util.AllowedTenants
limits *validation.Overrides
limits *validation.Overrides

// Ring used for sharding compactions.
ringLifecycler *ring.Lifecycler
Expand All @@ -87,8 +83,6 @@ type Converter struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlags(f)

f.Var(&cfg.EnabledTenants, "parquet-converter.enabled-tenants", "Comma separated list of tenants that can be converted. If specified, only these tenants will be converted, otherwise all tenants can be converted.")
f.Var(&cfg.DisabledTenants, "parquet-converter.disabled-tenants", "Comma separated list of tenants that cannot converted.")
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
Expand All @@ -107,7 +101,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
reg: registerer,
storageCfg: storageCfg,
logger: logger,
allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants),
limits: limits,
pool: chunkenc.NewPool(),
blockRanges: blockRanges,
Expand Down Expand Up @@ -171,6 +164,10 @@ func (c *Converter) running(ctx context.Context) error {
}
ownedUsers := map[string]struct{}{}
for _, userID := range users {
if !c.limits.ParquetConverterEnabled(userID) {
continue
}

var ring ring.ReadRing
ring = c.ring
if c.limits.ParquetConverterTenantShardSize(userID) > 0 {
Expand Down Expand Up @@ -375,15 +372,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
return nil
}

func (c *Converter) ownUser(r ring.ReadRing, userID string) (bool, error) {
if !c.allowedTenants.IsAllowed(userID) {
func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) {
if userId == tenant.GlobalMarkersDir {
// __markers__ is reserved for global markers and no tenant should be allowed to have that name.
return false, nil
}

if c.limits.ParquetConverterTenantShardSize(userID) <= 0 {
return true, nil
}

rs, err := r.GetAllHealthy(RingOp)
if err != nil {
return false, err
Expand Down
5 changes: 4 additions & 1 deletion pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestConverter(t *testing.T) {
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), nil)
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)

ctx := context.Background()

Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
package querier

import (
"context"

"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
)

type contextKey int

var (
blockCtxKey contextKey = 0
)

func InjectBlocksIntoContext(ctx context.Context, blocks ...*bucketindex.Block) context.Context {
return context.WithValue(ctx, blockCtxKey, blocks)
}

func ExtractBlocksFromContext(ctx context.Context) ([]*bucketindex.Block, bool) {
if blocks := ctx.Value(blockCtxKey); blocks != nil {
return blocks.([]*bucketindex.Block), true
}

return nil, false
}

func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMatcher {
var converted []storepb.LabelMatcher
for _, m := range matchers {
Expand Down
Loading
Loading