-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathreader.go
158 lines (150 loc) · 4.31 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigtable
import (
"context"
"strings"
cbt "cloud.google.com/go/bigtable"
"github.com/datacommonsorg/mixer/internal/util"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// BtRow contains the BT read key tokens and the cache data.
type BtRow struct {
// The body parts of the BT key, which are used to identify the place, dcid
// or other properties that related to the data. This is to be used by the
// caller to group the result.
Parts []string
// Data read from Cloud Bigtable
Data interface{}
}
// readRowFn generates a function to be used as the callback function in Bigtable Read.
// This utilizes the Golang closure so the arguments can be scoped in the
// generated function.
func readRowFn(
errCtx context.Context,
btTable *cbt.Table,
rowSetPart cbt.RowSet,
action func([]byte) (interface{}, error),
btRowChan chan BtRow,
prefix string,
) func() error {
return func() error {
if err := btTable.ReadRows(errCtx, rowSetPart,
func(btRow cbt.Row) bool {
if len(btRow[BtFamily]) == 0 {
return true
}
raw := btRow[BtFamily][0].Value
jsonRaw, err := util.UnzipAndDecode(string(raw))
if err != nil {
return false
}
elem, err := action(jsonRaw)
if err != nil {
return false
}
parts := strings.Split(strings.TrimPrefix(btRow.Key(), prefix), "^")
btRowChan <- BtRow{parts, elem}
return true
}); err != nil {
return err
}
return nil
}
}
// Read reads BigTable rows from multiple Bigtable in parallel.
// Note all Bigtable read use the same set of rowList.
func Read(
ctx context.Context,
btGroup *Group,
prefix string,
body [][]string,
action func([]byte) (interface{}, error),
) ([][]BtRow, error) {
accs := []*Accessor{}
for i := 0; i < len(btGroup.Tables()); i++ {
accs = append(accs, &Accessor{i, body})
}
return ReadWithGroupRowList(ctx, btGroup, prefix, accs, action)
}
// ReadWithGroupRowList reads BigTable rows from multiple Bigtable in parallel.
// Reading is chunked as the size limit for RowSet is 500KB.
//
// Note the read could have different RowList for each import group Bigtable as
// needed by the pagination APIs.
func ReadWithGroupRowList(
ctx context.Context,
btGroup *Group,
prefix string,
accs []*Accessor,
unmarshalFunc func([]byte) (interface{}, error),
) ([][]BtRow, error) {
tables := btGroup.Tables()
if len(tables) == 0 {
return nil, status.Errorf(codes.NotFound, "Bigtable instance is not specified")
}
rowListMap := map[int]cbt.RowList{}
for _, acc := range accs {
rowListMap[acc.ImportGroup] = append(
rowListMap[acc.ImportGroup],
BuildRowList(prefix, acc.Body)...,
)
}
// Channels for each import group read.
chans := make(map[int]chan BtRow)
for i := 0; i < len(tables); i++ {
chans[i] = make(chan BtRow, len(rowListMap[i]))
}
errs, errCtx := errgroup.WithContext(ctx)
// Read from each import group tables. Note each table could have different
// rowList in pagination APIs.
for i := 0; i < len(tables); i++ {
rowSet := rowListMap[i]
rowSetSize := len(rowSet)
if rowSetSize == 0 {
continue
}
for j := 0; j <= rowSetSize/BtBatchQuerySize; j++ {
left := j * BtBatchQuerySize
right := (j + 1) * BtBatchQuerySize
if right > rowSetSize {
right = rowSetSize
}
rowSetPart := rowSet[left:right]
if tables[i] != nil {
errs.Go(readRowFn(errCtx, tables[i], rowSetPart, unmarshalFunc, chans[i], prefix))
}
}
}
err := errs.Wait()
if err != nil {
return nil, err
}
for i := 0; i < len(chans); i++ {
close(chans[i])
}
result := [][]BtRow{}
if tables != nil {
for i := 0; i < len(tables); i++ {
items := []BtRow{}
for elem := range chans[i] {
items = append(items, elem)
}
result = append(result, items)
}
}
return result, nil
}