Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update loki to cortex master #2030

Merged
merged 1 commit into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v1.0.1-0.20200424135841-64fb9ad94a38
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.0.1-0.20200430170006-3462eb63f324
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.7.3-0.20190817195342-4760db040282
Expand All @@ -20,11 +20,11 @@ require (
github.com/fatih/color v1.7.0
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/frankban/quicktest v1.7.2 // indirect
github.com/go-kit/kit v0.9.0
github.com/go-logfmt/logfmt v0.4.0
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gogo/protobuf v1.3.1 // remember to update loki-build-image/Dockerfile too
github.com/golang/snappy v0.0.1
github.com/gorilla/mux v1.7.1
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
Expand All @@ -50,14 +50,14 @@ require (
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4
github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b
google.golang.org/grpc v1.25.1
google.golang.org/grpc v1.26.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.7
gopkg.in/yaml.v2 v2.2.8
k8s.io/klog v1.0.0
)

Expand Down
128 changes: 122 additions & 6 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (r
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
func (r mockRing) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
MaxErrors: 1,
Expand Down
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/cortex"
cortex_querier "github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
Expand Down Expand Up @@ -156,7 +157,7 @@ func (t *Loki) initDistributor() (services.Service, error) {

func (t *Loki) initQuerier() (services.Service, error) {
level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err := frontend.NewWorker(t.cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
worker, err := frontend.NewWorker(t.cfg.Worker, cortex_querier.Config{MaxConcurrent: t.cfg.Querier.MaxConcurrent}, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Config struct {
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
IngesterMaxQueryLookback time.Duration `yaml:"query_ingesters_within,omitempty"`
Engine logql.EngineOpts `yaml:"engine,omitempty"`
MaxConcurrent int `yaml:"max_concurrent"`
Copy link
Contributor Author

@adityacs adityacs May 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only configuration which is used in updated cortex code of frontend worker. I have modified the code accordingly here https://github.com/grafana/loki/blob/2f3632878fbf31e31f2d2a998288d1eb675cb479/pkg/loki/modules.go#L160

Hope this change is fine

}

// RegisterFlags register flags.
Expand All @@ -51,6 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryTimeout, "querier.query_timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.IngesterMaxQueryLookback, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
}

// Querier handlers queries.
Expand Down Expand Up @@ -99,7 +101,7 @@ type responseFromIngesters struct {
// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(ctx context.Context, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -334,7 +336,7 @@ func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.T
}

// Get the current replication set from the ring
replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -527,7 +529,7 @@ func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
return err
}

replicationSet, err := q.ring.GetAll()
replicationSet, err := q.ring.GetAll(ring.Read)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (r *readRingMock) BatchGet(keys []uint32, op ring.Operation) ([]ring.Replic
return []ring.ReplicationSet{r.replicationSet}, nil
}

func (r *readRingMock) GetAll() (ring.ReplicationSet, error) {
func (r *readRingMock) GetAll(op ring.Operation) (ring.ReplicationSet, error) {
return r.replicationSet, nil
}

Expand Down
27 changes: 20 additions & 7 deletions pkg/querier/queryrange/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,29 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
)

var jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary
var (
jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary
extractor = queryrange.PrometheusResponseExtractor{}
)

// PrometheusExtractor implements Extractor interface
type PrometheusExtractor struct{}

// prometheusResponseExtractor wraps the original prometheus cache extractor.
// Statistics are discarded when using cache entries.
var prometheusResponseExtractor = queryrange.ExtractorFunc(func(start, end int64, from queryrange.Response) queryrange.Response {
// Extract wraps the original prometheus cache extractor
func (PrometheusExtractor) Extract(start, end int64, from queryrange.Response) queryrange.Response {
response := extractor.Extract(start, end, from.(*LokiPromResponse).Response)
return &LokiPromResponse{
Response: queryrange.PrometheusResponseExtractor.
Extract(start, end, from.(*LokiPromResponse).Response).(*queryrange.PrometheusResponse),
Response: response.(*queryrange.PrometheusResponse),
}
})
}

// ResponseWithoutHeaders wraps the original prometheus caching without headers
func (PrometheusExtractor) ResponseWithoutHeaders(resp queryrange.Response) queryrange.Response {
response := extractor.ResponseWithoutHeaders(resp.(*LokiPromResponse).Response)
return &LokiPromResponse{
Response: response.(*queryrange.PrometheusResponse),
}
}

// encode encodes a Prometheus response and injects Loki stats.
func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet
instrumentMetrics := queryrange.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrange.NewRetryMiddlewareMetrics(registerer)

metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, prometheusResponseExtractor, instrumentMetrics, retryMetrics)
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, lokiCodec, PrometheusExtractor{}, instrumentMetrics, retryMetrics)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewMetricTripperware(
limits,
codec,
extractor,
nil,
)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type store struct {

// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer)
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer, nil)
if err != nil {
return nil, err
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/storage/stores/local/boltdb_shipper_table_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ func TestBoltDBShipperTableClient(t *testing.T) {
err = tableClient.DeleteTable(context.Background(), "table1")
require.NoError(t, err)

// cortex does not omit empty directories from the list
// ToDo: change the code in cortex to remove empty directories from the list
ensureEmptyAndRemoveDirectory(t, path.Join(tempDir, storageKeyPrefix, "table1"))
sandeepsukhani marked this conversation as resolved.
Show resolved Hide resolved

delete(foldersWithFiles, "table1")
checkExpectedTables(t, tableClient, foldersWithFiles)
}
Expand All @@ -73,11 +69,3 @@ func checkExpectedTables(t *testing.T, tableClient chunk.TableClient, expectedTa
require.True(t, ok)
}
}

func ensureEmptyAndRemoveDirectory(t *testing.T, directory string) {
filesInfo, err := ioutil.ReadDir(directory)
require.NoError(t, err)
require.Len(t, filesInfo, 0)

require.NoError(t, os.Remove(directory))
}
7 changes: 4 additions & 3 deletions vendor/github.com/aws/aws-sdk-go/aws/arn/arn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading