forked from yuhanfang/riot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bigquery_aggregator.go
301 lines (274 loc) · 6.9 KB
/
bigquery_aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// Package bigquery_aggregator uploads data to BigQuery. State is stored in
// Datastore to prevent duplicate uploads. The aggregator is threadsafe, using
// Datastore transactions to manage ownership of each uploaded item.
//
// This API is unstable.
package bigquery_aggregator
import (
"context"
"fmt"
"sync"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/datastore"
"github.com/ChoiKyubum/riot/analytics/data_aggregation"
"github.com/ChoiKyubum/riot/constants/region"
"github.com/nu7hatch/gouuid"
)
// Aggregator aggregates data into a BigQuery table. It is illegal to construct
// an instance directly. Use the New constructor instead.
type Aggregator struct {
ns string
dataset string
table string
ds *datastore.Client
bq *bigquery.Client
init *sync.Once
tab *bigquery.Table
}
// New returns an aggregator configured with the given namespace and datastore
// backend. The datastore is used as a lockservice for preventing duplicate
// rows. If namespace is empty, then the global namespace will be used.
func New(namespace, dataset, table string, ds *datastore.Client, bq *bigquery.Client) *Aggregator {
return &Aggregator{
ns: namespace,
dataset: dataset,
table: table,
ds: ds,
bq: bq,
init: &sync.Once{},
}
}
// lockvalue is a token stored in Datastore that is used as a proof of
// ownership. If a client supplies an identical UUID as the remote token,
// it knows that it owns the key.
type lockvalue struct {
Token string
}
// acquireLock returns a string token if the key is acquired by this call, or
// empty if it is already owned by somebody else.
func (a *Aggregator) acquireLock(ctx context.Context, key *datastore.Key) (string, error) {
uid, err := uuid.NewV4()
tok := uid.String()
if err != nil {
return "", err
}
tx, err := a.ds.NewTransaction(ctx)
if err != nil {
return "", err
}
var current lockvalue
// Continue until commit succeeds or we have an error.
for {
err = tx.Get(key, ¤t)
// Already owned.
if err == nil {
tx.Rollback()
return "", nil
}
// Actual error.
if err != datastore.ErrNoSuchEntity {
tx.Rollback()
return "", err
}
current.Token = tok
_, err = tx.Put(key, ¤t)
// Actual error.
if err != nil {
tx.Rollback()
return "", err
}
_, err = tx.Commit()
// Owned by us.
if err == nil {
return tok, nil
}
// Actual error.
if err != datastore.ErrConcurrentTransaction {
return "", err
}
}
}
// releaseLock returns true if the key was released by this call, or false if
// the key is already released.
func (a *Aggregator) releaseLock(ctx context.Context, key *datastore.Key, tok string) (bool, error) {
tx, err := a.ds.NewTransaction(ctx)
if err != nil {
return false, err
}
// Continue until commit succeeds or we have an error.
var current lockvalue
for {
err = tx.Get(key, ¤t)
// Not owned.
if err == datastore.ErrNoSuchEntity {
tx.Rollback()
return false, nil
}
// Actual error.
if err != nil {
tx.Rollback()
return false, err
}
// Not owned by us.
if current.Token != tok {
tx.Rollback()
return false, nil
}
err = tx.Delete(key)
// Actual error.
if err != nil {
tx.Rollback()
return false, err
}
_, err = tx.Commit()
// Released by us.
if err == nil {
return true, nil
}
// Actual error.
if err != datastore.ErrConcurrentTransaction {
return false, err
}
}
}
// markDone returns true if the key was marked done by this call, or false if
// the key is not owned.
func (a *Aggregator) markDone(ctx context.Context, key *datastore.Key, tok string) (bool, error) {
tx, err := a.ds.NewTransaction(ctx)
if err != nil {
return false, err
}
// Continue until commit succeeds or we have an error.
var current lockvalue
for {
err = tx.Get(key, ¤t)
// Not owned.
if err == datastore.ErrNoSuchEntity {
tx.Rollback()
return false, nil
}
// Actual error.
if err != nil {
tx.Rollback()
return false, err
}
// Not owned by us.
if current.Token != tok {
tx.Rollback()
return false, nil
}
current.Token = "done"
_, err = tx.Put(key, ¤t)
// Actual error.
if err != nil {
tx.Rollback()
return false, err
}
_, err = tx.Commit()
// Released by us.
if err == nil {
return true, nil
}
// Actual error.
if err != datastore.ErrConcurrentTransaction {
return false, err
}
}
}
// MatchExists returns true if the match ID for the given region is already
// stored.
func (a *Aggregator) MatchExists(ctx context.Context, r region.Region, id int64) (bool, error) {
var current lockvalue
key := a.gameKey(r, id)
err := a.ds.Get(ctx, key, ¤t)
if err == nil {
return true, nil
}
if err == datastore.ErrNoSuchEntity {
return false, nil
}
return false, err
}
func (a *Aggregator) gameKey(r region.Region, id int64) *datastore.Key {
return &datastore.Key{
Kind: fmt.Sprintf("aggregator-save-match-%s:%s:%s", r, a.dataset, a.table),
ID: id,
Namespace: a.ns,
}
}
// uploadKeyValue wraps a match that corresponds to a given key.
type uploadKeyValue struct {
Key *datastore.Key
Value data_aggregation.Match
}
// SaveMatches stores the match from the given region. Returns whether the
// function stored the match. If not, and there was no error, then the match
// was already cached.
func (a *Aggregator) SaveMatches(ctx context.Context, matches []data_aggregation.Match) error {
a.init.Do(func() {
ds := a.bq.Dataset(a.dataset)
ds.Create(ctx, nil)
a.tab = ds.Table(a.table)
})
schema, err := bigquery.InferSchema(data_aggregation.Match{})
if err != nil {
return err
}
a.tab.Create(ctx, &bigquery.TableMetadata{Schema: schema})
u := a.tab.Uploader()
var (
acquired = make(map[*datastore.Key]string)
toUpload []*bigquery.StructSaver
acquireErr error
)
for _, match := range matches {
key := a.gameKey(match.Region, match.ID)
tok, err := a.acquireLock(ctx, key)
if err == nil && tok != "" {
toUpload = append(toUpload, &bigquery.StructSaver{
Struct: match,
Schema: schema,
InsertID: fmt.Sprintf("%s:%d:%s:%s", key.Kind, key.ID, key.Name, key.Namespace),
})
acquired[key] = tok
}
if err != nil && acquireErr == nil {
acquireErr = err
}
}
putErr := u.Put(ctx, toUpload)
if putErr == nil {
// Mark each acquired key as done.
for key, tok := range acquired {
for {
_, err := a.markDone(ctx, key, tok)
if err == nil {
break
}
select {
case <-ctx.Done():
return err
default:
}
}
}
return acquireErr
}
for key, tok := range acquired {
// If the match can't be saved, then the lock must be released so that others
// can try again. If the context is done, the lock release could fail on
// datastore problems. We should address this possibility in the future.
for {
_, err := a.releaseLock(ctx, key, tok)
if err == nil {
break
}
select {
case <-ctx.Done():
return err
default:
}
}
}
return putErr
}