forked from cockroachdb/cockroach
/
bankdata.go
388 lines (346 loc) · 10.4 KB
/
bankdata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Copyright 2017 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/LICENSE
package sampledataccl
import (
"bytes"
gosql "database/sql"
"fmt"
"io"
"io/ioutil"
"math/rand"
"path/filepath"
"strconv"
"strings"
"testing"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlccl"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)
// TODO(dan): Once the interface for this has settled a bit, move it out of ccl.
// Data is a representation of a sql table, used for creating test data in
// various forms (sql rows, kvs, enterprise backup). All data tables live in the
// `data` database.
type Data interface {
// Name returns the fully-qualified name of the represented table.
Name() string
// Schema returns the schema for the represented table, such that it can be
// used in fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema`())`.
Schema() string
// NextRow iterates through the rows in the represented table, returning one
// per call. The row is represented as a slice of strings, each string one
// of the columns in the row. When no more rows are available, the bool
// returned is false.
NextRow() ([]string, bool)
// NextRow iterates through the split points in the represented table,
// returning one per call. The split is represented as a slice of strings,
// each string one of the columns in the split. When no more splits are
// available, the bool returned is false.
NextSplit() ([]string, bool)
}
// InsertBatched inserts all rows represented by `data` into `db` in batches of
// `batchSize` rows. The table must exist.
func InsertBatched(db *gosql.DB, data Data, batchSize int) error {
var stmt bytes.Buffer
for {
stmt.Reset()
fmt.Fprintf(&stmt, `INSERT INTO %s VALUES `, data.Name())
i := 0
done := false
for ; i < batchSize; i++ {
row, ok := data.NextRow()
if !ok {
done = true
break
}
if i != 0 {
stmt.WriteRune(',')
}
fmt.Fprintf(&stmt, `(%s)`, strings.Join(row, `,`))
}
if i > 0 {
if _, err := db.Exec(stmt.String()); err != nil {
return err
}
}
if done {
return nil
}
continue
}
}
// Split creates the configured number of ranges in an already created created
// version of the table represented by `data`.
func Split(db *gosql.DB, data Data) error {
var stmt bytes.Buffer
fmt.Fprintf(&stmt, `ALTER TABLE %s SPLIT AT VALUES `, data.Name())
i := 0
for ; ; i++ {
split, ok := data.NextSplit()
if !ok {
break
}
if i != 0 {
stmt.WriteRune(',')
}
fmt.Fprintf(&stmt, `(%s)`, strings.Join(split, `,`))
}
if i > 0 {
if _, err := db.Exec(stmt.String()); err != nil {
return err
}
}
return nil
}
// Setup creates a table in `db` with all the rows and splits of `data`.
func Setup(db *gosql.DB, data Data) error {
if _, err := db.Exec(`CREATE DATABASE IF NOT EXISTS data`); err != nil {
return err
}
if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE %s %s`, data.Name(), data.Schema())); err != nil {
return err
}
const batchSize = 1000
if err := InsertBatched(db, data, batchSize); err != nil {
return err
}
// This occasionally flakes, so ignore errors.
_ = Split(db, data)
return nil
}
// ToBackup creates an enterprise backup in `dir`.
func ToBackup(t testing.TB, data Data, dir string) (*Backup, error) {
return toBackup(t, data, dir, 0)
}
func toBackup(t testing.TB, data Data, dir string, chunkBytes int64) (*Backup, error) {
tempDir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()
// TODO(dan): Get rid of the `t testing.TB` parameter and this `TestServer`.
ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
if _, err := db.Exec(`CREATE DATABASE data`); err != nil {
return nil, err
}
var stmts bytes.Buffer
fmt.Fprintf(&stmts, "CREATE TABLE %s %s;\n", data.Name(), data.Schema())
for {
row, ok := data.NextRow()
if !ok {
break
}
fmt.Fprintf(&stmts, "INSERT INTO %s VALUES (%s);\n", data.Name(), strings.Join(row, `,`))
}
// TODO(dan): The csv load will be less overhead, use it when we have it.
ts := hlc.Timestamp{WallTime: hlc.UnixNano()}
desc, err := sqlccl.Load(ctx, db, &stmts, `data`, `nodelocal://`+dir, ts, chunkBytes, tempDir)
if err != nil {
return nil, err
}
return &Backup{BaseDir: dir, Desc: desc}, nil
}
// Backup is a representation of an enterprise BACKUP.
type Backup struct {
// BaseDir can be used for a RESTORE. All paths in the descriptor are
// relative to this.
BaseDir string
Desc sqlccl.BackupDescriptor
fileIdx int
iterIdx int
}
// ResetKeyValueIteration resets the NextKeyValues iteration to the first kv.
func (b *Backup) ResetKeyValueIteration() {
b.fileIdx = 0
b.iterIdx = 0
}
// NextKeyValues iterates and returns every *user table data* key-value in the
// backup. At least `count` kvs will be returned, but rows are not broken up, so
// slightly more than `count` may come back. If fewer than `count` are
// available, err will be `io.EOF` and kvs may be partially filled with the
// remainer.
func (b *Backup) NextKeyValues(
count int, newTableID sqlbase.ID,
) ([]engine.MVCCKeyValue, roachpb.Span, error) {
var userTables []*sqlbase.TableDescriptor
for _, d := range b.Desc.Descriptors {
if t := d.GetTable(); t != nil && t.ParentID != keys.SystemDatabaseID {
userTables = append(userTables, t)
}
}
if len(userTables) != 1 {
return nil, roachpb.Span{}, errors.Errorf(
"only backups of one table are currently supported, got %d", len(userTables))
}
tableDesc := userTables[0]
newDesc := *tableDesc
newDesc.ID = newTableID
newDescBytes, err := sqlbase.WrapDescriptor(&newDesc).Marshal()
if err != nil {
return nil, roachpb.Span{}, err
}
kr, err := storageccl.MakeKeyRewriter([]roachpb.ImportRequest_TableRekey{{
OldID: uint32(tableDesc.ID), NewDesc: newDescBytes,
}})
if err != nil {
return nil, roachpb.Span{}, err
}
var kvs []engine.MVCCKeyValue
span := roachpb.Span{Key: keys.MaxKey}
for ; b.fileIdx < len(b.Desc.Files); b.fileIdx++ {
file := b.Desc.Files[b.fileIdx]
sst := engine.MakeRocksDBSstFileReader()
defer sst.Close()
fileContents, err := ioutil.ReadFile(filepath.Join(b.BaseDir, file.Path))
if err != nil {
return nil, roachpb.Span{}, err
}
if err := sst.IngestExternalFile(fileContents); err != nil {
return nil, roachpb.Span{}, err
}
it := sst.NewIterator(false)
defer it.Close()
it.Seek(engine.MVCCKey{Key: file.Span.Key})
iterIdx := 0
for ; ; it.Next() {
if len(kvs) >= count {
break
}
if iterIdx < b.iterIdx {
iterIdx++
continue
}
ok, err := it.Valid()
if err != nil {
return nil, roachpb.Span{}, err
}
if !ok || it.UnsafeKey().Key.Compare(file.Span.EndKey) >= 0 {
break
}
if iterIdx < b.iterIdx {
break
}
b.iterIdx = iterIdx
key := it.Key()
key.Key, ok, err = kr.RewriteKey(key.Key)
if err != nil {
return nil, roachpb.Span{}, err
}
if !ok {
return nil, roachpb.Span{}, errors.Errorf("rewriter did not match key: %s", key.Key)
}
v := roachpb.Value{RawBytes: it.Value()}
v.ClearChecksum()
v.InitChecksum(key.Key)
kvs = append(kvs, engine.MVCCKeyValue{Key: key, Value: v.RawBytes})
if key.Key.Compare(span.Key) < 0 {
span.Key = append(span.Key[:0], key.Key...)
}
if key.Key.Compare(span.EndKey) > 0 {
span.EndKey = append(span.EndKey[:0], key.Key...)
}
iterIdx++
}
b.iterIdx = iterIdx
if len(kvs) >= count {
break
}
if ok, _ := it.Valid(); !ok {
b.iterIdx = 0
}
}
span.EndKey = span.EndKey.Next()
if len(kvs) < count {
return kvs, span, io.EOF
}
return kvs, span, nil
}
type bankData struct {
Rows int
PayloadBytes int
Ranges int
Rng *rand.Rand
rowIdx int
splitIdx int
}
var _ Data = &bankData{}
// bankConfigDefault will trigger the default for any of the parameters for
// `Bank`.
const bankConfigDefault = -1
// BankRows returns Bank testdata with the given number of rows and default
// payload size and range count.
func BankRows(rows int) Data {
return Bank(rows, bankConfigDefault, bankConfigDefault)
}
// Bank returns a bank table with three columns: an `id INT PRIMARY KEY`
// representing an account number, a `balance` INT, and a `payload` BYTES to pad
// the size of the rows for various tests.
func Bank(rows int, payloadBytes int, ranges int) Data {
// TODO(dan): This interface is a little wonky, but it's a bit annoying to
// replace it with a struct because that would make most of the callsites
// wrap. Complicating things is that there needs to be a distinction between
// the default value and an explicit 0 for ranges and payloadBytes.
if rows == bankConfigDefault {
rows = 1000
}
if payloadBytes == bankConfigDefault {
payloadBytes = 100
}
if ranges == bankConfigDefault {
ranges = 10
}
if ranges > rows {
ranges = rows
}
rng, _ := randutil.NewPseudoRand()
return &bankData{Rows: rows, PayloadBytes: payloadBytes, Ranges: ranges, Rng: rng}
}
// Name implements the Data interface.
func (d *bankData) Name() string {
return `data.bank`
}
// Schema implements the Data interface.
func (d *bankData) Schema() string {
return `(
id INT PRIMARY KEY,
balance INT,
payload STRING,
FAMILY (id, balance, payload)
)`
}
// NextRow implements the Data interface.
func (d *bankData) NextRow() ([]string, bool) {
if d.rowIdx >= d.Rows {
return nil, false
}
payload := fmt.Sprintf(`'initial-%x'`, randutil.RandBytes(d.Rng, d.PayloadBytes))
row := []string{
strconv.Itoa(d.rowIdx), // id
`0`, // balance
payload, // payload
}
d.rowIdx++
return row, true
}
// NextSplit implements the Data interface.
func (d *bankData) NextSplit() ([]string, bool) {
if d.splitIdx+1 >= d.Ranges {
return nil, false
}
split := []string{strconv.Itoa((d.splitIdx + 1) * (d.Rows / d.Ranges))}
d.splitIdx++
return split, true
}