From dd777d452a2370c5f985bd5c86fb269ef75b12f8 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 27 Apr 2026 19:30:25 +0900 Subject: [PATCH] expose ring status of parquet converter Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/api/_index.md | 11 +++++++++++ pkg/api/api.go | 7 +++++++ pkg/cortex/modules.go | 9 ++++++++- pkg/parquetconverter/converter.go | 12 ++++++++++++ 5 files changed, 39 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95d688766ce..f75f85180ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 +* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 diff --git a/docs/api/_index.md b/docs/api/_index.md index 9bc388f2328..8e0e96b55d8 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -72,6 +72,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi | [Delete user overrides](#delete-user-overrides) | Overrides || `DELETE /api/v1/user-overrides` | | [Store-gateway ring status](#store-gateway-ring-status) | Store-gateway || `GET /store-gateway/ring` | | [Compactor ring status](#compactor-ring-status) | Compactor || `GET /compactor/ring` | +| [Parquet Converter ring status](#parquet-converter-ring-status) | Parquet Converter || `GET /parquet-converter/ring` | | [Get rule files](#get-rule-files) | Configs API (deprecated) || `GET /api/prom/configs/rules` | | [Set rule files](#set-rule-files) | Configs API (deprecated) || `POST /api/prom/configs/rules` | | [Get template files](#get-template-files) | Configs API (deprecated) || `GET /api/prom/configs/templates` | @@ -968,6 +969,16 @@ GET /compactor/ring Displays a web page with the compactor hash ring status, including the state, healthy and last heartbeat time of each compactor. +## Parquet Converter + +### Parquet Converter ring status + +``` +GET /parquet-converter/ring +``` + +Displays a web page with the parquet-converter hash ring status, including the state, healthy and last heartbeat time of each parquet-converter instance. + ## Configs API _This service has been **deprecated** in favour of [Ruler](#ruler) and [Alertmanager](#alertmanager) API._ diff --git a/pkg/api/api.go b/pkg/api/api.go index 2971fe87762..08da9f1a11f 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/overrides" + "github.com/cortexproject/cortex/pkg/parquetconverter" "github.com/cortexproject/cortex/pkg/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" @@ -425,6 +426,12 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) { a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, "GET", "POST") } +// RegisterParquetConverter registers the ring UI page associated with the parquet-converter. +func (a *API) RegisterParquetConverter(c *parquetconverter.Converter) { + a.indexPage.AddLink(SectionAdminEndpoints, "/parquet-converter/ring", "Parquet Converter Ring Status") + a.RegisterRoute("/parquet-converter/ring", http.HandlerFunc(c.RingHandler), false, "GET", "POST") +} + type Distributor interface { querier.Distributor UserStatsHandler(w http.ResponseWriter, r *http.Request) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 805c2095429..3ccd5b2f975 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -785,7 +785,14 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initParquetConverter() (serv services.Service, err error) { t.Cfg.ParquetConverter.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - return parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.OverridesConfig) + t.Parquetconverter, err = parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, t.Cfg.Compactor.BlockRanges.ToMilliseconds(), util_log.Logger, prometheus.DefaultRegisterer, t.OverridesConfig) + if err != nil { + return + } + + // Expose HTTP endpoints. + t.API.RegisterParquetConverter(t.Parquetconverter) + return t.Parquetconverter, nil } func (t *Cortex) initCompactor() (serv services.Service, err error) { diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index ef8c5251b70..a5ed6ce0c05 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/fnv" "math/rand" + "net/http" "os" "path" "path/filepath" @@ -545,6 +546,17 @@ func (c *Converter) ownBlock(ring ring.ReadRing, blockId string) (bool, error) { return rs.Instances[0].Addr == c.ringLifecycler.Addr, nil } +// RingHandler is an HTTP handler that returns the ring status page. +func (c *Converter) RingHandler(w http.ResponseWriter, req *http.Request) { + if c.State() != services.Running { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("Parquet Converter is not running yet.")) + return + } + + c.ring.ServeHTTP(w, req) +} + func (c *Converter) cleanupMetricsForNotOwnedUser(userID string) { if _, ok := c.lastOwnedUsers[userID]; ok { c.metrics.deleteMetricsForTenant(userID)