Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added -querier.query_timeout support #788

Merged
merged 1 commit into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@

[[override]]
name = "k8s.io/client-go"
revision = "1a26190bd76a9017e289958b9fba936430aa3704"
revision = "1a26190bd76a9017e289958b9fba936430aa3704"
[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"
32 changes: 27 additions & 5 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
// Config for a querier.
type Config struct {
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
QueryTimeout time.Duration `yaml:"query_timeout"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
f.DurationVar(&cfg.QueryTimeout, "querier.query_timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
}

// Querier handlers queries.
Expand All @@ -44,10 +46,16 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.
return client.New(clientCfg, addr)
}

return newQuerier(cfg, clientCfg, factory, ring, store)
}

// newQuerier creates a new Querier and allows to pass a custom ingester client factory
// used for testing purposes
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store) (*Querier, error) {
return &Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, util.Logger),
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
}, nil
}
Expand Down Expand Up @@ -109,6 +117,10 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l

// Query does the heavy lifting for an actual query.
func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

iterators, err := q.getQueryIterators(ctx, req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,6 +165,10 @@ func (q *Querier) queryIngesters(ctx context.Context, req *logproto.QueryRequest

// Label does the heavy lifting for a Label query.
func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

resps, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
Expand Down Expand Up @@ -256,8 +272,14 @@ func mergePair(s1, s2 []string) []string {

// Tail keeps getting matching logs from all ingesters for given query
func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
// Enforce the query timeout except when tailing, otherwise the tailing
// will be terminated once the query timeout is reached
tailCtx := ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch yeah !

queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancelQuery()

clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
return client.Tail(tailCtx, req)
})
if err != nil {
return nil, err
Expand All @@ -276,7 +298,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
Direction: logproto.FORWARD,
Regex: req.Regex,
}
histIterators, err := q.getQueryIterators(ctx, &histReq)
histIterators, err := q.getQueryIterators(queryCtx, &histReq)
if err != nil {
return nil, err
}
Expand All @@ -286,10 +308,10 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
tailClients,
histIterators,
func(from, to time.Time, labels string) (iterator iter.EntryIterator, e error) {
return q.queryDroppedStreams(ctx, req, from, to, labels)
return q.queryDroppedStreams(queryCtx, req, from, to, labels)
},
func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) {
return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr)
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
), nil
Expand Down
229 changes: 229 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package querier

import (
"context"
"errors"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
grpc_metadata "google.golang.org/grpc/metadata"
)

// querierClientMock is a mockable version of QuerierClient, used in querier
// unit tests to control the behaviour of a remote ingester
type querierClientMock struct {
util.ExtendedMock
grpc_health_v1.HealthClient
logproto.QuerierClient
}

func newQuerierClientMock() *querierClientMock {
return &querierClientMock{}
}

func (c *querierClientMock) Query(ctx context.Context, in *logproto.QueryRequest, opts ...grpc.CallOption) (logproto.Querier_QueryClient, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(logproto.Querier_QueryClient), args.Error(1)
}

func (c *querierClientMock) Label(ctx context.Context, in *logproto.LabelRequest, opts ...grpc.CallOption) (*logproto.LabelResponse, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(*logproto.LabelResponse), args.Error(1)
}

func (c *querierClientMock) Tail(ctx context.Context, in *logproto.TailRequest, opts ...grpc.CallOption) (logproto.Querier_TailClient, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(logproto.Querier_TailClient), args.Error(1)
}

// newIngesterClientMockFactory creates a factory function always returning
// the input querierClientMock
func newIngesterClientMockFactory(c *querierClientMock) cortex_client.Factory {
return func(addr string) (grpc_health_v1.HealthClient, error) {
return c, nil
}
}

// mockIngesterClientConfig returns an ingester client config suitable for testing
func mockIngesterClientConfig() client.Config {
return client.Config{
PoolConfig: cortex_client.PoolConfig{
ClientCleanupPeriod: 1 * time.Minute,
HealthCheckIngesters: false,
RemoteTimeout: 1 * time.Second,
},
MaxRecvMsgSize: 1024,
RemoteTimeout: 1 * time.Second,
}
}

// queryClientMock is a mockable version of Querier_QueryClient
type queryClientMock struct {
util.ExtendedMock
logproto.Querier_QueryClient
}

func newQueryClientMock() *queryClientMock {
return &queryClientMock{}
}

func (c *queryClientMock) Recv() (*logproto.QueryResponse, error) {
args := c.Called()
return args.Get(0).(*logproto.QueryResponse), args.Error(1)
}

func (c *queryClientMock) Header() (grpc_metadata.MD, error) {
return nil, nil
}

func (c *queryClientMock) Trailer() grpc_metadata.MD {
return nil
}

func (c *queryClientMock) CloseSend() error {
return nil
}

func (c *queryClientMock) SendMsg(m interface{}) error {
return nil
}

func (c *queryClientMock) RecvMsg(m interface{}) error {
return nil
}

// tailClientMock is mockable version of Querier_TailClient
type tailClientMock struct {
util.ExtendedMock
logproto.Querier_TailClient
}

func newTailClientMock() *tailClientMock {
return &tailClientMock{}
}

func (c *tailClientMock) Recv() (*logproto.TailResponse, error) {
args := c.Called()
return args.Get(0).(*logproto.TailResponse), args.Error(1)
}

func (c *tailClientMock) Header() (grpc_metadata.MD, error) {
return nil, nil
}

func (c *tailClientMock) Trailer() grpc_metadata.MD {
return nil
}

func (c *tailClientMock) CloseSend() error {
return nil
}

func (c *tailClientMock) SendMsg(m interface{}) error {
return nil
}

func (c *tailClientMock) RecvMsg(m interface{}) error {
return nil
}

// storeMock is a mockable version of Loki's storage, used in querier unit tests
// to control the behaviour of the store without really hitting any storage backend
type storeMock struct {
util.ExtendedMock
}

func newStoreMock() *storeMock {
return &storeMock{}
}

func (s *storeMock) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
args := s.Called(ctx, req)
return args.Get(0).(iter.EntryIterator), args.Error(1)
}

func (s *storeMock) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
args := s.Called(ctx, from, through, matchers)
return args.Get(0).([]chunk.Chunk), args.Error(1)
}

func (s *storeMock) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
args := s.Called(ctx, from, through, matchers)
return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*chunk.Fetcher), args.Error(2)
}

func (s *storeMock) Put(ctx context.Context, chunks []chunk.Chunk) error {
return errors.New("storeMock.Put() has not been mocked")
}

func (s *storeMock) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return errors.New("storeMock.PutOne() has not been mocked")
}

func (s *storeMock) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName, labelName)
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) {
args := s.Called(ctx, from, through, metricName)
return args.Get(0).([]string), args.Error(1)
}

func (s *storeMock) Stop() {

}

// readRingMock is a mocked version of a ReadRing, used in querier unit tests
// to control the pool of ingesters available
type readRingMock struct {
replicationSet ring.ReplicationSet
}

func newReadRingMock(ingesters []ring.IngesterDesc) *readRingMock {
return &readRingMock{
replicationSet: ring.ReplicationSet{
Ingesters: ingesters,
MaxErrors: 0,
},
}
}

func (r *readRingMock) Describe(ch chan<- *prometheus.Desc) {
}

func (r *readRingMock) Collect(ch chan<- prometheus.Metric) {
}

func (r *readRingMock) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

func (r *readRingMock) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
return []ring.ReplicationSet{r.replicationSet}, nil
}

func (r *readRingMock) GetAll() (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

func (r *readRingMock) ReplicationFactor() int {
return 1
}

func mockReadRingWithOneActiveIngester() *readRingMock {
return newReadRingMock([]ring.IngesterDesc{
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
})
}
Loading