diff --git a/CHANGELOG.md b/CHANGELOG.md index 553c6733e9d..642976d2434 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ * [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863 * [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880 * [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025 +* [BUGFIX] Distributor: Fix the `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026 ## 1.19.0 2025-02-27 diff --git a/integration/api_endpoints_test.go b/integration/api_endpoints_test.go index 6da0971b49e..5e5d683e060 100644 --- a/integration/api_endpoints_test.go +++ b/integration/api_endpoints_test.go @@ -9,12 +9,15 @@ import ( "net/http" "path/filepath" "testing" + "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/runutil" "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" ) @@ -85,3 +88,50 @@ func TestConfigAPIEndpoint(t *testing.T) { cortex2 := e2ecortex.NewSingleBinaryWithConfigFile("cortex-2", cortexConfigFile, configOverrides, "", 9009, 9095) require.NoError(t, s.StartAndWaitReady(cortex2)) } + +func Test_AllUserStats_WhenIngesterRollingUpdate(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags() + flags["-distributor.replication-factor"] = "3" + flags["-distributor.sharding-strategy"] = "shuffle-sharding" + flags["-distributor.ingestion-tenant-shard-size"] = "3" + flags["-distributor.shard-by-all-labels"] = "true" + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // stop ingester1 to emulate rolling update + require.NoError(t, s.Stop(ingester1)) + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + series, _ := generateSeries("series_1", now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // QueriedIngesters is 2 since ingester1 has been stopped. + userStats, err := client.AllUserStats() + require.NoError(t, err) + require.Len(t, userStats, 1) + require.Equal(t, uint64(2), userStats[0].QueriedIngesters) +} diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 2b46d2262ec..fa707e7f49d 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -33,6 +33,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util/backoff" ) @@ -115,6 +116,40 @@ func NewPromQueryClient(address string) (*Client, error) { return c, nil } +func (c *Client) AllUserStats() ([]ingester.UserIDStats, error) { + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/distributor/all_user_stats", c.distributorAddress), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json") + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return nil, err + } + + bodyBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + userStats := make([]ingester.UserIDStats, 0) + err = json.Unmarshal(bodyBytes, &userStats) + if err != nil { + return nil, err + } + + return userStats, nil +} + // Push the input timeseries to the remote endpoint func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricMetadata) (*http.Response, error) { // Create write request diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 196fdbbfe4a..5b32804daca 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1603,26 +1603,31 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error // AllUserStats returns statistics about all users. // Note it does not divide by the ReplicationFactor like UserStats() -func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) { +func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, int, error) { // Add up by user, across all responses from ingesters perUserTotals := make(map[string]ingester.UserStats) + queriedIngesterNum := 0 req := &ingester_client.UserStatsRequest{} ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID - // Not using d.ForReplicationSet(), so we can fail after first error. replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read) if err != nil { - return nil, err + return nil, 0, err } for _, ingester := range replicationSet.Instances { client, err := d.ingesterPool.GetClientFor(ingester.Addr) if err != nil { - return nil, err + return nil, 0, err } resp, err := client.(ingester_client.IngesterClient).AllUserStats(ctx, req) if err != nil { - return nil, err + // During an ingester rolling update, an ingester might be temporarily + // in stopping or starting state. Therefore, returning an error would + // cause the API to fail during the update. This is an expected error in + // that scenario, we continue the loop to work API. + continue } + queriedIngesterNum++ for _, u := range resp.Stats { s := perUserTotals[u.UserId] s.IngestionRate += u.Data.IngestionRate @@ -1631,6 +1636,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, s.NumSeries += u.Data.NumSeries s.ActiveSeries += u.Data.ActiveSeries s.LoadedBlocks += u.Data.LoadedBlocks + s.QueriedIngesters += 1 perUserTotals[u.UserId] = s } } @@ -1647,22 +1653,23 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, NumSeries: stats.NumSeries, ActiveSeries: stats.ActiveSeries, LoadedBlocks: stats.LoadedBlocks, + QueriedIngesters: stats.QueriedIngesters, }, }) } - return response, nil + return response, queriedIngesterNum, nil } // AllUserStatsHandler shows stats for all users. func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) { - stats, err := d.AllUserStats(r.Context()) + stats, queriedIngesterNum, err := d.AllUserStats(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor()) + ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor(), queriedIngesterNum) } func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) { diff --git a/pkg/ingester/http_admin.go b/pkg/ingester/http_admin.go index 084e132db42..a5543c7d02e 100644 --- a/pkg/ingester/http_admin.go +++ b/pkg/ingester/http_admin.go @@ -25,6 +25,8 @@ const tpl = ` {{if (gt .ReplicationFactor 0)}}

NB stats do not account for replication factor, which is currently set to {{ .ReplicationFactor }}

{{end}} +

These stats were aggregated from {{ .QueriedIngesterNum }} ingesters.

+
@@ -37,6 +39,7 @@ const tpl = ` + @@ -49,6 +52,7 @@ const tpl = ` + {{ end }} @@ -87,10 +91,11 @@ type UserStats struct { RuleIngestionRate float64 `json:"RuleIngestionRate"` ActiveSeries uint64 `json:"activeSeries"` LoadedBlocks uint64 `json:"loadedBlocks"` + QueriedIngesters uint64 `json:"queriedIngesters"` } // AllUserStatsRender render data for all users or return in json format. -func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf int) { +func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf, queriedIngesterNum int) { sort.Sort(UserStatsByTimeseries(stats)) if encodings, found := r.Header["Accept"]; found && @@ -102,12 +107,14 @@ func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDSt } util.RenderHTTPResponse(w, struct { - Now time.Time `json:"now"` - Stats []UserIDStats `json:"stats"` - ReplicationFactor int `json:"replicationFactor"` + Now time.Time `json:"now"` + Stats []UserIDStats `json:"stats"` + ReplicationFactor int `json:"replicationFactor"` + QueriedIngesterNum int `json:"queriedIngesterNum"` }{ - Now: time.Now(), - Stats: stats, - ReplicationFactor: rf, + Now: time.Now(), + Stats: stats, + ReplicationFactor: rf, + QueriedIngesterNum: queriedIngesterNum, }, UserStatsTmpl, r) } diff --git a/pkg/ingester/http_admin_test.go b/pkg/ingester/http_admin_test.go index bb49b42cdc6..6cc619871a0 100644 --- a/pkg/ingester/http_admin_test.go +++ b/pkg/ingester/http_admin_test.go @@ -24,7 +24,7 @@ func TestUserStatsPageRendered(t *testing.T) { }, }, } - AllUserStatsRender(res, req, userStats, 3) + AllUserStatsRender(res, req, userStats, 3, 3) assert.Equal(t, http.StatusOK, res.Code) body := res.Body.String() assert.Regexp(t, "", body) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index faf7deb85b5..36df06f4308 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2133,7 +2133,7 @@ func (i *Ingester) userStats() []UserIDStats { func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) { stats := i.userStats() - AllUserStatsRender(w, r, stats, 0) + AllUserStatsRender(w, r, stats, 0, 0) } // AllUserStats returns ingestion statistics for all users known to this ingester.
Total Ingest Rate API Ingest Rate Rule Ingest Rate# Queried Ingesters
{{ printf "%.2f" .UserStats.IngestionRate }} {{ printf "%.2f" .UserStats.APIIngestionRate }} {{ printf "%.2f" .UserStats.RuleIngestionRate }}{{ .UserStats.QueriedIngesters }}