forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gateway.go
94 lines (78 loc) · 2.21 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package indexgateway
import (
"github.com/grafana/dskit/services"
"github.com/frelon/loki/v2/pkg/storage/chunk"
"github.com/frelon/loki/v2/pkg/storage/stores/shipper"
"github.com/frelon/loki/v2/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/frelon/loki/v2/pkg/storage/stores/shipper/util"
)
const maxIndexEntriesPerResponse = 1000
type gateway struct {
services.Service
shipper chunk.IndexClient
}
func NewIndexGateway(shipperIndexClient *shipper.Shipper) *gateway {
g := &gateway{
shipper: shipperIndexClient,
}
g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.shipper.Stop()
return nil
})
return g
}
func (g gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error {
var outerErr error
var innerErr error
queries := make([]chunk.IndexQuery, 0, len(request.Queries))
for _, query := range request.Queries {
queries = append(queries, chunk.IndexQuery{
TableName: query.TableName,
HashValue: query.HashValue,
RangeValuePrefix: query.RangeValuePrefix,
RangeValueStart: query.RangeValueStart,
ValueEqual: query.ValueEqual,
})
}
outerErr = g.shipper.QueryPages(server.Context(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
innerErr = g.sendBatch(server, query, batch)
if innerErr != nil {
return false
}
return true
})
if innerErr != nil {
return innerErr
}
return outerErr
}
func (g *gateway) sendBatch(server indexgatewaypb.IndexGateway_QueryIndexServer, query chunk.IndexQuery, batch chunk.ReadBatch) error {
itr := batch.Iterator()
var resp []*indexgatewaypb.Row
for itr.Next() {
if len(resp) == maxIndexEntriesPerResponse {
err := server.Send(&indexgatewaypb.QueryIndexResponse{
QueryKey: util.QueryKey(query),
Rows: resp,
})
if err != nil {
return err
}
resp = []*indexgatewaypb.Row{}
}
resp = append(resp, &indexgatewaypb.Row{
RangeValue: itr.RangeValue(),
Value: itr.Value(),
})
}
if len(resp) != 0 {
err := server.Send(&indexgatewaypb.QueryIndexResponse{
QueryKey: util.QueryKey(query),
Rows: resp,
})
if err != nil {
return err
}
}
return nil
}