forked from cortexproject/cortex
/
index_client.go
107 lines (90 loc) · 2.46 KB
/
index_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package grpc
import (
"context"
"io"
"github.com/pkg/errors"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
)
func (w *WriteBatch) Add(tableName, hashValue string, rangeValue []byte, value []byte) {
w.Writes = append(w.Writes, &IndexEntry{
TableName: tableName,
HashValue: hashValue,
RangeValue: rangeValue,
Value: value,
})
}
func (w *WriteBatch) Delete(tableName, hashValue string, rangeValue []byte) {
w.Deletes = append(w.Deletes, &IndexEntry{
TableName: tableName,
HashValue: hashValue,
RangeValue: rangeValue,
})
}
func (s *StorageClient) NewWriteBatch() chunk.WriteBatch {
return &WriteBatch{}
}
func (s *StorageClient) BatchWrite(c context.Context, batch chunk.WriteBatch) error {
writeBatch := batch.(*WriteBatch)
batchWrites := &WriteIndexRequest{Writes: writeBatch.Writes}
_, err := s.client.WriteIndex(context.Background(), batchWrites)
if err != nil {
return errors.WithStack(err)
}
batchDeletes := &DeleteIndexRequest{Deletes: writeBatch.Deletes}
_, err = s.client.DeleteIndex(context.Background(), batchDeletes)
if err != nil {
return errors.WithStack(err)
}
return nil
}
func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
return util.DoParallelQueries(ctx, s.query, queries, callback)
}
func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error {
indexQuery := &QueryIndexRequest{
TableName: query.TableName,
HashValue: query.HashValue,
RangeValuePrefix: query.RangeValuePrefix,
RangeValueStart: query.RangeValueStart,
ValueEqual: query.ValueEqual,
Immutable: query.Immutable,
}
streamer, err := s.client.QueryIndex(ctx, indexQuery)
if err != nil {
return errors.WithStack(err)
}
for {
readBatch, err := streamer.Recv()
if err == io.EOF {
break
}
if err != nil {
return errors.WithStack(err)
}
if !callback(query, readBatch) {
return nil
}
}
return nil
}
func (r *QueryIndexResponse) Iterator() chunk.ReadBatchIterator {
return &grpcIter{
i: -1,
QueryIndexResponse: r,
}
}
type grpcIter struct {
i int
*QueryIndexResponse
}
func (b *grpcIter) Next() bool {
b.i++
return b.i < len(b.Rows)
}
func (b *grpcIter) RangeValue() []byte {
return b.Rows[b.i].RangeValue
}
func (b *grpcIter) Value() []byte {
return b.Rows[b.i].Value
}