/
provider.go
446 lines (399 loc) · 14.7 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
// Copyright 2023 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
// Package base provides enough functionality to connect to a database,
// but does not provide any other services. This package is primary used
// to break dependency cycles in tests.
package base
import (
"context"
"flag"
"fmt"
"os"
"sync/atomic"
"testing"
"time"
"github.com/cockroachdb/cdc-sink/internal/sinktest"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/diag"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/retry"
"github.com/cockroachdb/cdc-sink/internal/util/stdpool"
"github.com/cockroachdb/cdc-sink/internal/util/stmtcache"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/google/wire"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
const (
defaultConnString = "postgresql://root@localhost:26257/defaultdb?sslmode=disable"
envSourceString = "TEST_SOURCE_CONNECT"
envStagingString = "TEST_STAGING_CONNECT"
envTargetString = "TEST_TARGET_CONNECT"
// chosen arbitrarily, we don't really generate this many different
// SQL statements.
statementCacheSize = 128
)
var (
sourceConn *string
stagingConn *string
targetString *string
)
func init() {
// We use os.Getenv and a length check so that defined, empty
// environment variables are ignored.
sourceConnect := defaultConnString
if found := os.Getenv(envSourceString); len(found) > 0 {
sourceConnect = found
}
sourceConn = flag.String("testSourceConnect", sourceConnect,
"the connection string to use for the source db")
stagingConnect := defaultConnString
if found := os.Getenv(envStagingString); len(found) > 0 {
stagingConnect = found
}
stagingConn = flag.String("testStagingConnect", stagingConnect,
"the connection string to use for the staging db")
targetConnect := defaultConnString
if found := os.Getenv(envTargetString); len(found) > 0 {
targetConnect = found
}
targetString = flag.String("testTargetConnect", targetConnect,
"the connection string to use for the target db")
}
// TestSet is used by wire.
var TestSet = wire.NewSet(
ProvideContext,
ProvideStagingSchema,
ProvideStagingPool,
ProvideSourcePool,
ProvideSourceSchema,
ProvideTargetPool,
ProvideTargetSchema,
ProvideTargetStatements,
diag.New,
wire.Bind(new(context.Context), new(*stopper.Context)),
wire.Struct(new(Fixture), "*"),
)
// Fixture can be used for tests that "just need a database",
// without the other services provided by the target package. One can be
// constructed by calling NewFixture.
type Fixture struct {
Context *stopper.Context // The context for the test.
SourcePool *types.SourcePool // Access to user-data tables and changefeed creation.
SourceSchema sinktest.SourceSchema // A container for tables within SourcePool.
StagingPool *types.StagingPool // Access to __cdc_sink database.
StagingDB ident.StagingSchema // The _cdc_sink SQL DATABASE.
TargetCache *types.TargetStatements // Prepared statements.
TargetPool *types.TargetPool // Access to the destination.
TargetSchema sinktest.TargetSchema // A container for tables within TargetPool.
}
// CreateSourceTable creates a test table within the SourcePool and
// SourceSchema. The schemaSpec parameter must have exactly one %s
// substitution parameter for the database name and table name.
func (f *Fixture) CreateSourceTable(
ctx context.Context, schemaSpec string,
) (TableInfo[*types.SourcePool], error) {
return CreateTable(ctx, f.SourcePool, f.SourceSchema.Schema(), schemaSpec)
}
// CreateTargetTable creates a test table within the TargetPool and
// TargetSchema. The schemaSpec parameter must have exactly one %s
// substitution parameter for the database name and table name.
func (f *Fixture) CreateTargetTable(
ctx context.Context, schemaSpec string,
) (TableInfo[*types.TargetPool], error) {
return CreateTable(ctx, f.TargetPool, f.TargetSchema.Schema(), schemaSpec)
}
var caseTimout = flag.Duration(
"caseTimout",
2*time.Minute,
"raise this value when debugging to allow individual tests to run longer",
)
// ProvideContext returns an execution context that is associated with a
// singleton connection to a CockroachDB cluster.
func ProvideContext(t testing.TB) *stopper.Context {
ctx := stopper.WithContext(context.Background())
t.Cleanup(func() {
ctx.Stop(10 * time.Millisecond)
_ = ctx.Wait()
})
ctx.Go(func() error {
select {
case <-ctx.Stopping():
// Clean shutdown, do nothing.
case <-time.After(*caseTimout):
// Just cancel immediately.
ctx.Stop(0)
}
return nil
})
return ctx
}
// ProvideSourcePool connects to the source database. If the source is a
// CockroachDB cluster, this function will also configure the rangefeed
// and license cluster settings if they have not been previously
// configured.
func ProvideSourcePool(ctx *stopper.Context, diags *diag.Diagnostics) (*types.SourcePool, error) {
tgt := *sourceConn
log.Infof("source connect string: %s", tgt)
ret, err := stdpool.OpenTarget(ctx, tgt,
stdpool.WithDiagnostics(diags, "source"),
stdpool.WithTestControls(stdpool.TestControls{
WaitForStartup: true,
}),
)
if err != nil {
return nil, err
}
if ret.Product != types.ProductCockroachDB {
return (*types.SourcePool)(ret), err
}
// Set the cluster settings once, if we need to.
var enabled bool
if err := retry.Retry(ctx, ret, func(ctx context.Context) error {
return ret.QueryRowContext(ctx, "SHOW CLUSTER SETTING kv.rangefeed.enabled").Scan(&enabled)
}); err != nil {
return nil, errors.Wrap(err, "could not check cluster setting")
}
if !enabled {
if lic, ok := os.LookupEnv("COCKROACH_DEV_LICENSE"); ok {
if err := retry.Execute(ctx, ret,
"SET CLUSTER SETTING cluster.organization = $1",
"Cockroach Labs - Production Testing",
); err != nil {
return nil, errors.Wrap(err, "could not set cluster.organization")
}
if err := retry.Execute(ctx, ret,
"SET CLUSTER SETTING enterprise.license = $1", lic,
); err != nil {
return nil, errors.Wrap(err, "could not set enterprise.license")
}
}
if err := retry.Execute(ctx, ret,
"SET CLUSTER SETTING kv.rangefeed.enabled = true"); err != nil {
return nil, errors.Wrap(err, "could not enable rangefeeds")
}
}
return (*types.SourcePool)(ret), nil
}
// ProvideStagingSchema create a globally-unique container for tables in the
// staging database.
func ProvideStagingSchema(
ctx *stopper.Context, pool *types.StagingPool,
) (ident.StagingSchema, error) {
ret, err := provideSchema(ctx, pool, "cdc")
return ident.StagingSchema(ret), err
}
// ProvideStagingPool opens a connection to the CockroachDB staging
// cluster under test.
func ProvideStagingPool(ctx *stopper.Context) (*types.StagingPool, error) {
tgt := *stagingConn
log.Infof("staging connect string: %s", tgt)
pool, err := stdpool.OpenPgxAsStaging(ctx, tgt,
stdpool.WithTestControls(stdpool.TestControls{
WaitForStartup: true,
}),
stdpool.WithConnectionLifetime(time.Minute, 15*time.Second, 5*time.Second),
stdpool.WithPoolSize(32),
stdpool.WithTransactionTimeout(2*time.Minute), // Aligns with test case timeout.
)
if err != nil {
return nil, err
}
return pool, nil
}
// ProvideTargetPool connects to the target database (which is most
// often the same as the source database).
func ProvideTargetPool(
ctx *stopper.Context, source *types.SourcePool, diags *diag.Diagnostics,
) (*types.TargetPool, error) {
tgt := *targetString
if tgt == source.ConnectionString {
log.Info("reusing SourcePool as TargetPool")
return (*types.TargetPool)(source), nil
}
log.Infof("target connect string: %s", tgt)
pool, err := stdpool.OpenTarget(ctx, *targetString,
stdpool.WithDiagnostics(diags, "target"),
stdpool.WithTestControls(stdpool.TestControls{
WaitForStartup: true,
}),
stdpool.WithConnectionLifetime(time.Minute, 15*time.Second, 5*time.Second),
stdpool.WithPoolSize(32),
stdpool.WithTransactionTimeout(2*time.Minute), // Aligns with test case timeout.
)
if err != nil {
return nil, err
}
return pool, nil
}
// ProvideSourceSchema create a globally-unique container for tables in
// the source database.
func ProvideSourceSchema(
ctx *stopper.Context, pool *types.SourcePool,
) (sinktest.SourceSchema, error) {
sch, err := provideSchema(ctx, pool, "src")
log.Infof("source schema: %s", sch)
return sinktest.SourceSchema(sch), err
}
// ProvideTargetSchema create a globally-unique container for tables in
// the target database.
func ProvideTargetSchema(
ctx *stopper.Context,
diags *diag.Diagnostics,
pool *types.TargetPool,
stmts *types.TargetStatements,
) (sinktest.TargetSchema, error) {
sch, err := provideSchema(ctx, pool, "tgt")
ret := sinktest.TargetSchema(sch)
if err != nil {
return ret, err
}
log.Infof("target schema: %s", sch)
// In PostgresSQL, connections are tightly coupled to the target
// database. Cross-database queries are generally unsupported, as
// opposed to CockroachDB, which allows any database to be queried
// from any connection.
//
// To resolve this, we're going to re-open the target database
// connection so that the connection uses the schema that we have
// just created. We also need to recreate the statement cache so
// that it's associated with the newly-constructed database
// connection.
if pool.Info().Product == types.ProductPostgreSQL {
db, _ := sch.Split()
conn := fmt.Sprintf("%s/%s", pool.ConnectionString, db.Raw())
next, err := stdpool.OpenPgxAsTarget(ctx, conn, stdpool.WithDiagnostics(diags, "target_reopened"))
if err != nil {
return sinktest.TargetSchema{}, err
}
nextCache := ProvideTargetStatements(ctx, next)
pool.ConnectionString = conn
pool.DB = next.DB
stmts.Cache = nextCache.Cache
}
return ret, nil
}
// ProvideTargetStatements is called by Wire to construct a
// prepared-statement cache. Anywhere the associated TargetPool is
// reused should also reuse the cache.
func ProvideTargetStatements(ctx *stopper.Context, pool *types.TargetPool) *types.TargetStatements {
ret := stmtcache.New[string](ctx, pool.DB, statementCacheSize)
return &types.TargetStatements{Cache: ret}
}
func provideSchema[P types.AnyPool](
ctx *stopper.Context, pool P, prefix string,
) (ident.Schema, error) {
switch pool.Info().Product {
case types.ProductCockroachDB, types.ProductMariaDB, types.ProductMySQL, types.ProductPostgreSQL:
return CreateSchema(ctx, pool, prefix)
case types.ProductOracle:
// Each package tests run in a separate binary, so we need a
// "globally" unique ID. While PIDs do recycle, they're highly
// unlikely to do so during a single run of the test suite.
name := ident.New(fmt.Sprintf(
"%s_%d_%d", prefix, os.Getpid(), dbIdentCounter.Add(1)))
err := retry.Execute(ctx, pool, fmt.Sprintf("CREATE USER %s", name))
if err != nil {
return ident.Schema{}, errors.Wrapf(err, "could not create user %s", name)
}
err = retry.Execute(ctx, pool, fmt.Sprintf("ALTER USER %s QUOTA UNLIMITED ON USERS", name))
if err != nil {
return ident.Schema{}, errors.Wrapf(err, "could not grant quota to %s", name)
}
ctx.Defer(func() {
// Use background since stopper context has stopped.
err := retry.Execute(context.Background(), pool, fmt.Sprintf("DROP USER %s CASCADE", name))
if err != nil {
log.WithError(err).Warnf("could not clean up schema %s", name)
}
})
return ident.MustSchema(name), nil
default:
return ident.Schema{},
errors.Errorf("cannot create test db for %s", pool.Info().Product)
}
}
// Ensure unique database identifiers within a test run.
var dbIdentCounter atomic.Int32
// CreateSchema creates a schema with a unique name that will be
// dropped when the stopper has stopped.
func CreateSchema[P types.AnyPool](
ctx *stopper.Context, pool P, prefix string,
) (ident.Schema, error) {
dbNum := dbIdentCounter.Add(1)
// Each package tests run in a separate binary, so we need a
// "globally" unique ID. While PIDs do recycle, they're highly
// unlikely to do so during a single run of the test suite.
// We use dashes in the name to ensure that the identifier is always
// correctly quoted when sent in SQL commands.
name := ident.New(fmt.Sprintf("%s-%d-%d", prefix, os.Getpid(), dbNum))
cancel := func() {
option := "CASCADE"
switch pool.Info().Product {
case types.ProductMariaDB, types.ProductMySQL:
option = ""
default:
// nothing to do.
}
// Called from stopper.Context.Defer, so we must use Background.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err := retry.Execute(ctx, pool,
fmt.Sprintf("DROP DATABASE IF EXISTS %s %s", name, option))
log.WithError(err).WithField("target", name).Debug("dropped database")
}
ctx.Defer(cancel)
if err := retry.Execute(ctx, pool, fmt.Sprintf(
"CREATE DATABASE %s", name)); err != nil {
return ident.Schema{}, errors.WithStack(err)
}
if pool.Info().Product == types.ProductCockroachDB {
if err := retry.Execute(ctx, pool, fmt.Sprintf(
`ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds = 600`, name)); err != nil {
return ident.Schema{}, errors.WithStack(err)
}
}
var sch ident.Schema
var err error
switch pool.Info().Product {
case types.ProductMariaDB, types.ProductMySQL:
sch, err = ident.NewSchema(name)
default:
sch, err = ident.NewSchema(name, ident.Public)
}
if err != nil {
return ident.Schema{}, err
}
return sch, nil
}
// A global counter for allocating all temp tables in a test run. We
// know that the enclosing database has a unique name, but it's
// convenient for all test table names to be unique as well.
var tempTable atomic.Int32
// CreateTable creates a test table. The schemaSpec parameter must have
// exactly one %s substitution parameter for the database name and table
// name.
func CreateTable[P types.AnyPool](
ctx context.Context, pool P, enclosing ident.Schema, schemaSpec string,
) (TableInfo[P], error) {
tableNum := tempTable.Add(1)
// We use a dash here to ensure that the table name must be
// correctly quoted when sent as a SQL command.
tableName := ident.New(fmt.Sprintf("tbl-%d", tableNum))
table := ident.NewTable(enclosing, tableName)
err := retry.Execute(ctx, pool, fmt.Sprintf(schemaSpec, table))
return TableInfo[P]{pool, table}, errors.WithStack(err)
}