Skip to content

Commit

Permalink
Shards Series API.
Browse files Browse the repository at this point in the history
This PR introduces Series Queries Sharding. It does not check the boundaries of ingesters data since I'm assuming
grafana#3852 will be merge first.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jun 15, 2021
1 parent bdee4a3 commit c995a4b
Show file tree
Hide file tree
Showing 13 changed files with 505 additions and 180 deletions.
2 changes: 1 addition & 1 deletion pkg/loghttp/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) {
Start: start,
End: end,
Groups: deduped,
Shards: shards(r),
}, nil

}

func union(cols ...[]string) []string {
Expand Down
243 changes: 157 additions & 86 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message SeriesRequest {
google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string groups = 3;
repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"];
}

message SeriesResponse {
Expand Down Expand Up @@ -163,4 +164,4 @@ message GetChunkIDsRequest {

message GetChunkIDsResponse {
repeated string chunkIDs = 1;
}
}
14 changes: 6 additions & 8 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ const (

// ShardingMetrics is the metrics wrapper used in shard mapping
type ShardingMetrics struct {
shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
Shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
ShardFactor prometheus.Histogram // per request shard factor
parsed *prometheus.CounterVec // parsed ASTs total, partitioned by (success/failure/noop)
shardFactor prometheus.Histogram // per request shard factor
}

func NewShardingMetrics(registerer prometheus.Registerer) *ShardingMetrics {

return &ShardingMetrics{
shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_shards_total",
}, []string{"type"}),
parsed: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_sharding_parsed_queries_total",
}, []string{"type"}),
shardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
ShardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "query_frontend_shard_factor",
Help: "Number of shards per request",
Expand Down Expand Up @@ -67,14 +66,14 @@ type shardRecorder struct {
// Add increments both the shard count and tracks it for the eventual histogram entry.
func (r *shardRecorder) Add(x int, key string) {
r.total += x
r.shards.WithLabelValues(key).Add(float64(x))
r.Shards.WithLabelValues(key).Add(float64(x))
}

// Finish idemptotently records a histogram entry with the total shard factor.
func (r *shardRecorder) Finish() {
if !r.done {
r.done = true
r.shardFactor.Observe(float64(r.total))
r.ShardFactor.Observe(float64(r.total))
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ func (m ShardMapper) mapSampleExpr(expr SampleExpr, r *shardRecorder) SampleExpr
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *shardRecorder) (SampleExpr, error) {

// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if !expr.Shardable() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)
}

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups())
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -441,20 +441,21 @@ func (q *Querier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
shards []string,
) ([]logproto.SeriesIdentifier, error) {

var results []logproto.SeriesIdentifier
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "")
results, err = q.seriesForMatcher(ctx, from, through, "", shards)
if err != nil {
return nil, err
}
} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group)
ids, err := q.seriesForMatcher(ctx, from, through, group, shards)
if err != nil {
return nil, err
}
Expand All @@ -465,14 +466,15 @@ func (q *Querier) seriesForMatchers(
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) {
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
Shards: shards,
},
})
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (r *LokiSeriesRequest) LogToSpan(sp opentracing.Span) {
otlog.String("matchers", strings.Join(r.GetMatch(), ",")),
otlog.String("start", timestamp.Time(r.GetStart()).String()),
otlog.String("end", timestamp.Time(r.GetEnd()).String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
)
}

Expand Down Expand Up @@ -196,7 +197,6 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path: %s", r.URL.Path))
}

}

func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
Expand Down Expand Up @@ -235,7 +235,9 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
"end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())},
"match[]": request.Match,
}

if len(request.Shards) > 0 {
params["shards"] = request.Shards
}
u := &url.URL{
Path: "/loki/api/v1/series",
RawQuery: params.Encode(),
Expand Down Expand Up @@ -355,7 +357,6 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
return nil, httpgrpc.Errorf(http.StatusBadRequest, "unsupported response type")
}
}

}

func (Codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
Expand Down Expand Up @@ -482,7 +483,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
lokiSeriesData = append(lokiSeriesData, series)
uniqueSeries[series.String()] = struct{}{}
}

}
}

Expand All @@ -504,7 +504,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
names = append(names, labelName)
uniqueNames[labelName] = struct{}{}
}

}
}

Expand All @@ -520,7 +519,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons

// mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values
func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {

var total int

// turn resps -> map[labels] []entries
Expand Down Expand Up @@ -612,7 +610,6 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire
}

return results

}

func toProto(m loghttp.Matrix) []queryrange.SampleStream {
Expand Down Expand Up @@ -642,7 +639,6 @@ func (res LokiResponse) Count() int64 {
result += int64(len(s.Entries))
}
return result

}

type paramsWrapper struct {
Expand All @@ -658,12 +654,15 @@ func paramsFromRequest(req queryrange.Request) *paramsWrapper {
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
}

func (p paramsWrapper) Start() time.Time {
return p.StartTs
}

func (p paramsWrapper) End() time.Time {
return p.EndTs
}

func (p paramsWrapper) Step() time.Duration {
return time.Duration(p.LokiRequest.Step * 1e6)
}
Expand Down
Loading

0 comments on commit c995a4b

Please sign in to comment.