Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* [BUGFIX] Configs: prevent validation of templates to fail when using template functions. #3157
* [BUGFIX] Configuring the S3 URL with an `@` but without username and password doesn't enable the AWS static credentials anymore. #3170
* [BUGFIX] Limit errors on ranged queries (`api/v1/query_range`) no longer return a status code `500` but `422` instead. #3167
* [BUGFIX] Handle hash-collisions in the query path. #3192

## 1.3.0 / 2020-08-21

Expand Down
101 changes: 101 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,3 +619,104 @@ func TestQuerierWithChunksStorage(t *testing.T) {
assertServiceMetricsPrefixes(t, Querier, querier)
assertServiceMetricsPrefixes(t, TableManager, tableManager)
}

func TestHashCollisionHandling(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
flags := mergeFlags(ChunksStorageFlags, map[string]string{})

// Start dependencies.
dynamo := e2edb.NewDynamoDB()

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, dynamo))

tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
require.NoError(t, s.StartAndWaitReady(tableManager))

// Wait until the first table-manager sync has completed, so that we're
// sure the tables have been created.
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))

// Start Cortex components for the write path.
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series for each user to Cortex.
now := time.Now()

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-0")
require.NoError(t, err)

var series []prompb.TimeSeries
var expectedVector model.Vector
// Generate two series which collide on fingerprints and fast fingerprints.
tsMillis := e2e.TimeToMilliseconds(now)
metric1 := []prompb.Label{
{Name: "A", Value: "K6sjsNNczPl"},
{Name: labels.MetricName, Value: "fingerprint_collision"},
}
metric2 := []prompb.Label{
{Name: "A", Value: "cswpLMIZpwt"},
{Name: labels.MetricName, Value: "fingerprint_collision"},
}

series = append(series, prompb.TimeSeries{
Labels: metric1,
Samples: []prompb.Sample{
{Value: float64(0), Timestamp: tsMillis},
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric1),
Value: model.SampleValue(float64(0)),
Timestamp: model.Time(tsMillis),
})
series = append(series, prompb.TimeSeries{
Labels: metric2,
Samples: []prompb.Sample{
{Value: float64(1), Timestamp: tsMillis},
},
})
expectedVector = append(expectedVector, &model.Sample{
Metric: prompbLabelsToModelMetric(metric2),
Value: model.SampleValue(float64(1)),
Timestamp: model.Time(tsMillis),
})

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Query the series.
c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-0")
require.NoError(t, err)

result, err := c.Query("fingerprint_collision", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
require.Equal(t, expectedVector, result.(model.Vector))
}

func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric {
metric := model.Metric{}

for _, l := range pbLabels {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}

return metric
}
16 changes: 8 additions & 8 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,32 +170,32 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, err
}

hashToChunkseries := map[model.Fingerprint]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[model.Fingerprint]ingester_client.TimeSeries{}
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[string]ingester_client.TimeSeries{}

for _, result := range results {
response := result.(*ingester_client.QueryStreamResponse)

// Parse any chunk series
for _, series := range response.Chunkseries {
hash := client.FastFingerprint(series.Labels)
existing := hashToChunkseries[hash]
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
existing := hashToChunkseries[key]
existing.Labels = series.Labels
existing.Chunks = append(existing.Chunks, series.Chunks...)
hashToChunkseries[hash] = existing
hashToChunkseries[key] = existing
}

// Parse any time series
for _, series := range response.Timeseries {
hash := client.FastFingerprint(series.Labels)
existing := hashToTimeSeries[hash]
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
if existing.Samples == nil {
existing.Samples = series.Samples
} else {
existing.Samples = mergeSamples(existing.Samples, series.Samples)
}
hashToTimeSeries[hash] = existing
hashToTimeSeries[key] = existing
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,16 @@ func Fingerprint(labels labels.Labels) model.Fingerprint {
return model.Fingerprint(sum)
}

// LabelsToKeyString is used to form a string to be used as
// the hashKey. Don't print, use l.String() for printing.
func LabelsToKeyString(l labels.Labels) string {
// We are allocating 1024, even though most series are less than 600b long.
// But this is not an issue as this function is being inlined when called in a loop
// and buffer allocated is a static buffer and not a dynamic buffer on the heap.
b := make([]byte, 0, 1024)
return string(l.Bytes(b))
}

// MarshalJSON implements json.Marshaler.
func (s Sample) MarshalJSON() ([]byte, error) {
t, err := json.Marshal(model.Time(s.TimestampMs))
Expand Down
51 changes: 51 additions & 0 deletions pkg/ingester/client/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"testing"
"unsafe"

Expand Down Expand Up @@ -216,3 +217,53 @@ func verifyCollision(t *testing.T, collision bool, ls1 labels.Labels, ls2 labels
t.Errorf("expected different fingerprints for %v (%016x) and %v (%016x)", ls1.String(), Fingerprint(ls1), ls2.String(), Fingerprint(ls2))
}
}

// The main usecase for `LabelsToKeyString` is to generate hashKeys
// for maps. We are benchmarking that here.
func BenchmarkSeriesMap(b *testing.B) {
benchmarkSeriesMap(100000, b)
}

func benchmarkSeriesMap(numSeries int, b *testing.B) {
series := makeSeries(numSeries)
sm := make(map[string]int, numSeries)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for i, s := range series {
sm[LabelsToKeyString(s)] = i
}

for _, s := range series {
_, ok := sm[LabelsToKeyString(s)]
if !ok {
b.Fatal("element missing")
}
}

if len(sm) != numSeries {
b.Fatal("the number of series expected:", numSeries, "got:", len(sm))
}
}
}

func makeSeries(n int) []labels.Labels {
series := make([]labels.Labels, 0, n)
for i := 0; i < n; i++ {
series = append(series, labels.FromMap(map[string]string{
"label0": "value0",
"label1": "value1",
"label2": "value2",
"label3": "value3",
"label4": "value4",
"label5": "value5",
"label6": "value6",
"label7": "value7",
"label8": "value8",
"label9": strconv.Itoa(i),
}))
}

return series
}
6 changes: 3 additions & 3 deletions pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...

// Series in the returned set are sorted alphabetically by labels.
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
chunksBySeries := map[model.Fingerprint][]chunk.Chunk{}
chunksBySeries := map[string][]chunk.Chunk{}
for _, c := range chunks {
fp := client.Fingerprint(c.Metric)
chunksBySeries[fp] = append(chunksBySeries[fp], c)
key := client.LabelsToKeyString(c.Metric)
chunksBySeries[key] = append(chunksBySeries[key], c)
}

series := make([]storage.Series, 0, len(chunksBySeries))
Expand Down