Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated cortex to latest master. #1088

Merged
merged 4 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ require (
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bmatcuk/doublestar v1.1.1
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v0.2.0
github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.0.0-20190607191414-238f8eaa31aa
Expand Down Expand Up @@ -40,11 +39,11 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f
github.com/prometheus/common v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20190918104050-8744afdd1ea0
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.3.0
github.com/stretchr/testify v1.4.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4
Expand All @@ -60,7 +59,6 @@ require (
google.golang.org/genproto v0.0.0-20190916214212-f660b8655731 // indirect
google.golang.org/grpc v1.23.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.2
gotest.tools v2.2.0+incompatible // indirect
Expand Down
184 changes: 19 additions & 165 deletions go.sum

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}

replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
}
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
var descs [maxExpectedReplicationSet]ring.IngesterDesc

samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.IngesterDesc{}
for i, replicationSet := range replicationSets {
for i, key := range keys {
replicationSet, err := d.ring.Get(key, ring.Write, descs[:0])
if err != nil {
return nil, err
}

streams[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
streams[i].maxFailures = replicationSet.MaxErrors
for _, ingester := range replicationSet.Ingesters {
Expand Down
19 changes: 6 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ type mockRing struct {
replicationFactor uint32
}

func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Ingesters: buf[:0],
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
Expand All @@ -144,18 +145,6 @@ func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error
return result, nil
}

func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
rs, err := r.Get(keys[i], op)
if err != nil {
return nil, err
}
result = append(result, rs)
}
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
Expand All @@ -166,3 +155,7 @@ func (r mockRing) GetAll() (ring.ReplicationSet, error) {
func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}

func (r mockRing) IngesterCount() int {
return len(r.ingesters)
}
9 changes: 7 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,20 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
return nil, err
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
var storeValues []string
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, from, through, "logs", req.Name)
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
return nil, err
}
} else {
storeValues, err = q.store.LabelNamesForMetricName(ctx, from, through, "logs")
storeValues, err = q.store.LabelNamesForMetricName(ctx, userID, from, through, "logs")
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ func (s *storeMock) LazyQuery(ctx context.Context, req logql.SelectParams) (iter
return args.Get(0).(iter.EntryIterator), args.Error(1)
}

func (s *storeMock) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
args := s.Called(ctx, from, through, matchers)
func (s *storeMock) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([]chunk.Chunk), args.Error(1)
}

func (s *storeMock) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
args := s.Called(ctx, from, through, matchers)
func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
args := s.Called(ctx, userID, from, through, matchers)
return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*chunk.Fetcher), args.Error(2)
}

Expand All @@ -194,13 +194,13 @@ func (s *storeMock) PutOne(ctx context.Context, from, through model.Time, chunk
return errors.New("storeMock.PutOne() has not been mocked")
}

func (s *storeMock) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName, labelName)
func (s *storeMock) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName, labelName)
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName)
func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
args := s.Called(ctx, userID, from, through, metricName)
return args.Get(0).([]string), args.Error(1)
}

Expand Down Expand Up @@ -229,7 +229,7 @@ func (r *readRingMock) Describe(ch chan<- *prometheus.Desc) {
func (r *readRingMock) Collect(ch chan<- prometheus.Metric) {
}

func (r *readRingMock) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
func (r *readRingMock) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

Expand All @@ -245,6 +245,10 @@ func (r *readRingMock) ReplicationFactor() int {
return 1
}

func (r *readRingMock) IngesterCount() int {
return len(r.replicationSet.Ingesters)
}

func mockReadRingWithOneActiveIngester() *readRingMock {
return newReadRingMock([]ring.IngesterDesc{
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) {
ingesterClient.On("Label", mock.Anything, &request, mock.Anything).Return(mockLabelResponse([]string{}), nil)

store := newStoreMock()
store.On("LabelValuesForMetricName", mock.Anything, model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)
store.On("LabelValuesForMetricName", mock.Anything, "test", model.TimeFromUnixNano(startTime.UnixNano()), model.TimeFromUnixNano(endTime.UnixNano()), "logs", "test").Return([]string{"foo", "bar"}, nil)

limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
Expand Down Expand Up @@ -70,9 +71,14 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent
return nil, err
}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

matchers = append(matchers, nameLabelMatcher)
from, through := util.RoundToMilliseconds(req.Start, req.End)
chks, fetchers, err := s.GetChunkRefs(ctx, from, through, matchers...)
chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ func Test_store_LazyQuery(t *testing.T) {
MaxChunkBatchSize: 10,
},
}
it, err := s.LazyQuery(context.Background(), logql.SelectParams{QueryRequest: tt.req})
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req})
if err != nil {
t.Errorf("store.LazyQuery() error = %v", err)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil
}
func (m *mockChunkStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
func (m *mockChunkStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
return nil, nil
}
func (m *mockChunkStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
func (m *mockChunkStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
return nil, nil
}
func (m *mockChunkStore) Stop() {}
func (m *mockChunkStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
func (m *mockChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
return nil, nil
}

Expand All @@ -143,7 +143,7 @@ func (m *mockChunkStore) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([
return res, nil
}

func (m *mockChunkStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
func (m *mockChunkStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
refs := make([]chunk.Chunk, 0, len(m.chunks))
// transform real chunks into ref chunks.
for _, c := range m.chunks {
Expand Down
35 changes: 11 additions & 24 deletions vendor/github.com/aws/aws-sdk-go/aws/client/default_retryer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/aws/aws-sdk-go/aws/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading