|
3 | 3 | package main |
4 | 4 |
|
5 | 5 | import ( |
| 6 | + "fmt" |
| 7 | + "sync" |
6 | 8 | "testing" |
7 | 9 | "time" |
8 | 10 |
|
9 | 11 | "github.com/prometheus/common/model" |
| 12 | + "github.com/prometheus/prometheus/prompb" |
10 | 13 | "github.com/stretchr/testify/assert" |
11 | 14 | "github.com/stretchr/testify/require" |
12 | 15 |
|
@@ -251,3 +254,89 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { |
251 | 254 | require.Error(t, err) |
252 | 255 | assert.Contains(t, err.Error(), "500") |
253 | 256 | } |
| 257 | + |
| 258 | +func TestQuerierWithChunksStorage(t *testing.T) { |
| 259 | + const numUsers = 10 |
| 260 | + const numQueriesPerUser = 10 |
| 261 | + |
| 262 | + s, err := e2e.NewScenario(networkName) |
| 263 | + require.NoError(t, err) |
| 264 | + defer s.Close() |
| 265 | + |
| 266 | + require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) |
| 267 | + flags := mergeFlags(ChunksStorageFlags, map[string]string{}) |
| 268 | + |
| 269 | + // Start dependencies. |
| 270 | + dynamo := e2edb.NewDynamoDB() |
| 271 | + |
| 272 | + consul := e2edb.NewConsul() |
| 273 | + require.NoError(t, s.StartAndWaitReady(consul, dynamo)) |
| 274 | + |
| 275 | + tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "") |
| 276 | + require.NoError(t, s.StartAndWaitReady(tableManager)) |
| 277 | + |
| 278 | + // Wait until the first table-manager sync has completed, so that we're |
| 279 | + // sure the tables have been created. |
| 280 | + require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds")) |
| 281 | + |
| 282 | + // Start Cortex components for the write path. |
| 283 | + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") |
| 284 | + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") |
| 285 | + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) |
| 286 | + |
| 287 | + // Wait until the distributor has updated the ring. |
| 288 | + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) |
| 289 | + |
| 290 | + // Push a series for each user to Cortex. |
| 291 | + now := time.Now() |
| 292 | + expectedVectors := make([]model.Vector, numUsers) |
| 293 | + |
| 294 | + for u := 0; u < numUsers; u++ { |
| 295 | + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", u)) |
| 296 | + require.NoError(t, err) |
| 297 | + |
| 298 | + var series []prompb.TimeSeries |
| 299 | + series, expectedVectors[u] = generateSeries("series_1", now) |
| 300 | + |
| 301 | + res, err := c.Push(series) |
| 302 | + require.NoError(t, err) |
| 303 | + require.Equal(t, 200, res.StatusCode) |
| 304 | + } |
| 305 | + |
| 306 | + // Add 2 memcache instances to test for: https://github.com/cortexproject/cortex/issues/2302 |
| 307 | + // Note these are not running but added to trigger the behaviour. |
| 308 | + querierFlags := mergeFlags(flags, map[string]string{ |
| 309 | + "-store.index-cache-read.memcached.addresses": "dns+memcached0:11211", |
| 310 | + "-store.index-cache-write.memcached.addresses": "dns+memcached1:11211", |
| 311 | + }) |
| 312 | + |
| 313 | + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), querierFlags, "") |
| 314 | + require.NoError(t, s.StartAndWaitReady(querier)) |
| 315 | + |
| 316 | + // Wait until the querier has updated the ring. |
| 317 | + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) |
| 318 | + |
| 319 | + // Query the series for each user in parallel. |
| 320 | + wg := sync.WaitGroup{} |
| 321 | + wg.Add(numUsers * numQueriesPerUser) |
| 322 | + |
| 323 | + for u := 0; u < numUsers; u++ { |
| 324 | + userID := u |
| 325 | + |
| 326 | + c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", fmt.Sprintf("user-%d", userID)) |
| 327 | + require.NoError(t, err) |
| 328 | + |
| 329 | + for q := 0; q < numQueriesPerUser; q++ { |
| 330 | + go func() { |
| 331 | + defer wg.Done() |
| 332 | + |
| 333 | + result, err := c.Query("series_1", now) |
| 334 | + require.NoError(t, err) |
| 335 | + require.Equal(t, model.ValVector, result.Type()) |
| 336 | + assert.Equal(t, expectedVectors[userID], result.(model.Vector)) |
| 337 | + }() |
| 338 | + } |
| 339 | + } |
| 340 | + |
| 341 | + wg.Wait() |
| 342 | +} |
0 commit comments