Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: pass shard count to injected pipelines and extractors #12213

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,12 +437,13 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E
}

if i.pipelineWrapper != nil {
shards := logql.ParseShardCount(req.GetShards())
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

pipeline = i.pipelineWrapper.Wrap(ctx, pipeline, expr.String(), userID)
pipeline = i.pipelineWrapper.Wrap(ctx, pipeline, expr.String(), userID, shards)
}

stats := stats.FromContext(ctx)
Expand Down Expand Up @@ -491,12 +492,13 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
}

if i.extractorWrapper != nil {
shards := logql.ParseShardCount(req.GetShards())
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

extractor = i.extractorWrapper.Wrap(ctx, extractor, expr.String(), userID)
extractor = i.extractorWrapper.Wrap(ctx, extractor, expr.String(), userID, shards)
}

stats := stats.FromContext(ctx)
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ func Test_PipelineWrapper(t *testing.T) {
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Direction: logproto.BACKWARD,
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{job="3"}`),
},
Expand All @@ -713,19 +714,22 @@ func Test_PipelineWrapper(t *testing.T) {

require.Equal(t, "test-user", wrapper.tenant)
require.Equal(t, `{job="3"}`, wrapper.query)
require.Equal(t, 1, wrapper.shards)
require.Equal(t, 10, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
}

type testPipelineWrapper struct {
query string
tenant string
shards int
pipeline *mockPipeline
}

func (t *testPipelineWrapper) Wrap(_ context.Context, pipeline log.Pipeline, query, tenant string) log.Pipeline {
func (t *testPipelineWrapper) Wrap(_ context.Context, pipeline log.Pipeline, query, tenant string, shard int) log.Pipeline {
t.tenant = tenant
t.query = query
t.pipeline.wrappedExtractor = pipeline
t.shards = shard
return t.pipeline
}

Expand Down Expand Up @@ -787,6 +791,7 @@ func Test_ExtractorWrapper(t *testing.T) {
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
},
Expand All @@ -802,19 +807,22 @@ func Test_ExtractorWrapper(t *testing.T) {
}

require.Equal(t, `sum(count_over_time({job="3"}[1m]))`, wrapper.query)
require.Equal(t, 1, wrapper.shards)
require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
}

type testExtractorWrapper struct {
query string
tenant string
shards int
extractor *mockExtractor
}

func (t *testExtractorWrapper) Wrap(_ context.Context, extractor log.SampleExtractor, query, tenant string) log.SampleExtractor {
func (t *testExtractorWrapper) Wrap(_ context.Context, extractor log.SampleExtractor, query, tenant string, shard int) log.SampleExtractor {
t.tenant = tenant
t.query = query
t.extractor.wrappedExtractor = extractor
t.shards = shard
return t.extractor
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,23 @@ func ParseShards(strs []string) (Shards, error) {
return shards, nil
}

func ParseShardCount(strs []string) int {
if len(strs) == 0 {
return 0
}

for _, str := range strs {
shard, err := astmapper.ParseShard(str)
if err != nil {
continue
}

return shard.Of
}

return 0
}

type Downstreamable interface {
Downstreamer(context.Context) Downstreamer
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,42 @@ and
10`
assert.Equal(t, expected, got)
}

func TestParseShardCount(t *testing.T) {
for _, st := range []struct {
name string
shards []string
expected int
}{
{
name: "empty shards",
shards: []string{},
expected: 0,
},
{
name: "single shard",
shards: []string{"0_of_3"},
expected: 3,
},
{
name: "single shard with error",
shards: []string{"0_of_"},
expected: 0,
},
{
name: "multiple shards",
shards: []string{"0_of_3", "0_of_4"},
expected: 3,
},
{
name: "multiple shards with errors",
shards: []string{"_of_3", "0_of_4"},
expected: 4,
},
} {
t.Run(st.name, func(t *testing.T) {
require.Equal(t, st.expected, ParseShardCount(st.shards))
})

}
}
2 changes: 1 addition & 1 deletion pkg/logql/log/metrics_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type StreamSampleExtractor interface {
// SampleExtractorWrapper takes an extractor, wraps it is some desired functionality
// and returns a new pipeline
type SampleExtractorWrapper interface {
Wrap(ctx context.Context, extractor SampleExtractor, query, tenant string) SampleExtractor
Wrap(ctx context.Context, extractor SampleExtractor, query, tenant string, shards int) SampleExtractor
}

type lineSampleExtractor struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Stage interface {
// PipelineWrapper takes a pipeline, wraps it is some desired functionality and
// returns a new pipeline
type PipelineWrapper interface {
Wrap(ctx context.Context, pipeline Pipeline, query, tenant string) Pipeline
Wrap(ctx context.Context, pipeline Pipeline, query, tenant string, shards int) Pipeline
}

// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,13 @@ func (s *LokiStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (
}

if s.pipelineWrapper != nil {
shards := logql.ParseShardCount(req.GetShards())
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

pipeline = s.pipelineWrapper.Wrap(ctx, pipeline, expr.String(), userID)
pipeline = s.pipelineWrapper.Wrap(ctx, pipeline, expr.String(), userID, shards)
}

var chunkFilterer chunk.Filterer
Expand Down Expand Up @@ -555,12 +556,13 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
}

if s.extractorWrapper != nil {
shards := logql.ParseShardCount(req.GetShards())
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

extractor = s.extractorWrapper.Wrap(ctx, extractor, expr.String(), userID)
extractor = s.extractorWrapper.Wrap(ctx, extractor, expr.String(), userID, shards)
}

var chunkFilterer chunk.Filterer
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func Benchmark_store_SelectSample(b *testing.B) {
sampleCount := 0
for i := 0; i < b.N; i++ {
iter, err := chunkStore.SelectSamples(ctx, logql.SelectSampleParams{
SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), nil),
SampleQueryRequest: newSampleQuery(test, time.Unix(0, start.UnixNano()), time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()), nil, nil),
})
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -530,7 +530,7 @@ func Test_store_SelectSample(t *testing.T) {
}{
{
"all",
newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond), nil),
newSampleQuery("count_over_time({foo=~\"ba.*\"}[5m])", from, from.Add(6*time.Millisecond), nil, nil),
[]logproto.Series{
{
Labels: "{foo=\"bar\"}",
Expand Down Expand Up @@ -606,7 +606,7 @@ func Test_store_SelectSample(t *testing.T) {
},
{
"filter regex",
newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond), nil),
newSampleQuery("rate({foo=~\"ba.*\"} |~ \"1|2|3\" !~ \"2|3\"[1m])", from, from.Add(6*time.Millisecond), nil, nil),
[]logproto.Series{
{
Labels: "{foo=\"bar\"}",
Expand All @@ -632,7 +632,7 @@ func Test_store_SelectSample(t *testing.T) {
},
{
"filter matcher",
newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond), nil),
newSampleQuery("count_over_time({foo=\"bar\"}[10m])", from, from.Add(6*time.Millisecond), nil, nil),
[]logproto.Series{
{
Labels: "{foo=\"bar\"}",
Expand Down Expand Up @@ -674,7 +674,7 @@ func Test_store_SelectSample(t *testing.T) {
},
{
"filter time",
newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond), nil),
newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(time.Millisecond), nil, nil),
[]logproto.Series{
{
Labels: "{foo=\"bar\"}",
Expand Down Expand Up @@ -704,6 +704,7 @@ func Test_store_SelectSample(t *testing.T) {
"count_over_time({foo=~\"ba.*\"}[5m])",
from,
from.Add(6*time.Millisecond),
nil,
[]*logproto.Delete{
{
Selector: `{foo="bar"}`,
Expand Down Expand Up @@ -757,6 +758,7 @@ func Test_store_SelectSample(t *testing.T) {
"count_over_time({foo=~\"ba.*\"}[5m])",
from,
from.Add(6*time.Millisecond),
nil,
[]*logproto.Delete{
{
Selector: `{foo="bar"}`,
Expand Down Expand Up @@ -870,7 +872,7 @@ func Test_ChunkFilterer(t *testing.T) {
}
s.SetChunkFilterer(&fakeChunkFilterer{})
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)})
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil, nil)})
if err != nil {
t.Errorf("store.SelectSamples() error = %v", err)
return
Expand Down Expand Up @@ -914,7 +916,8 @@ func Test_PipelineWrapper(t *testing.T) {

s.SetPipelineWrapper(wrapper)
ctx = user.InjectOrgID(context.Background(), "test-user")
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), nil, nil)})
logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)})

if err != nil {
t.Errorf("store.SelectLogs() error = %v", err)
return
Expand All @@ -926,18 +929,21 @@ func Test_PipelineWrapper(t *testing.T) {

require.Equal(t, "test-user", wrapper.tenant)
require.Equal(t, "{foo=~\"ba.*\"}", wrapper.query)
require.Equal(t, 5, wrapper.shards)
require.Equal(t, 28, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper
}

type testPipelineWrapper struct {
query string
pipeline *mockPipeline
tenant string
shards int
}

func (t *testPipelineWrapper) Wrap(_ context.Context, pipeline lokilog.Pipeline, query, tenant string) lokilog.Pipeline {
func (t *testPipelineWrapper) Wrap(_ context.Context, pipeline lokilog.Pipeline, query, tenant string, shards int) lokilog.Pipeline {
t.tenant = tenant
t.query = query
t.shards = shards
t.pipeline.wrappedExtractor = pipeline
return t.pipeline
}
Expand Down Expand Up @@ -999,7 +1005,7 @@ func Test_SampleWrapper(t *testing.T) {
s.SetExtractorWrapper(wrapper)

ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), nil)})
it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 3}}, nil)})
if err != nil {
t.Errorf("store.SelectSamples() error = %v", err)
return
Expand All @@ -1011,19 +1017,22 @@ func Test_SampleWrapper(t *testing.T) {

require.Equal(t, "test-user", wrapper.tenant)
require.Equal(t, "count_over_time({foo=~\"ba.*\"}[1s])", wrapper.query)
require.Equal(t, 3, wrapper.shards)
require.Equal(t, 28, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
}

type testExtractorWrapper struct {
query string
tenant string
shards int
extractor *mockExtractor
}

func (t *testExtractorWrapper) Wrap(_ context.Context, extractor lokilog.SampleExtractor, query, tenant string) lokilog.SampleExtractor {
func (t *testExtractorWrapper) Wrap(_ context.Context, extractor lokilog.SampleExtractor, query, tenant string, shards int) lokilog.SampleExtractor {
t.tenant = tenant
t.query = query
t.extractor.wrappedExtractor = extractor
t.shards = shards
return t.extractor
}

Expand Down Expand Up @@ -2069,7 +2078,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
_, ctx = stats.NewContext(ctx)
it, err := store.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery(tc.query, chkFrom, chkThrough.Add(time.Minute), nil)})
it, err := store.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery(tc.query, chkFrom, chkThrough.Add(time.Minute), nil, nil)})
require.NoError(t, err)
numSamples := int64(0)
for it.Next() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func newQuery(query string, start, end time.Time, shards []astmapper.ShardAnnota
return req
}

func newSampleQuery(query string, start, end time.Time, deletes []*logproto.Delete) *logproto.SampleQueryRequest {
func newSampleQuery(query string, start, end time.Time, shards []astmapper.ShardAnnotation, deletes []*logproto.Delete) *logproto.SampleQueryRequest {
req := &logproto.SampleQueryRequest{
Selector: query,
Start: start,
Expand All @@ -156,6 +156,9 @@ func newSampleQuery(query string, start, end time.Time, deletes []*logproto.Dele
AST: syntax.MustParseExpr(query),
},
}
for _, shard := range shards {
req.Shards = append(req.Shards, shard.String())
}
return req
}

Expand Down
Loading