Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
58436: server: add /api/v2/ tree with auth/pagination, port listSessions endpoint r=itsbilal a=itsbilal

This change adds the skeleton for a new API tree that lives in
`/api/v2/` in the http listener, and currently reimplements the `/sessions/`
endpoint that is also implemented in `/_status/`. The new v2 API tree avoids
the need to use GRPC Gateway, as well as cookie-based authentication which is
less intuitive and idiomatic for REST APIs. Instead, for authentication,
it uses a new session header that needs to be set on every request.

As many RPC fan-out APIs use statusServer.iterateNodes, this change
implements a pagination-aware method, paginatedIterateNodes, that
works on a sorted set of node IDs and arranges results in such a way
to be able to return the next `limit` results of an arbitary slice
after the `next` cursor passed in. An example of how this works in practice
is the new `/api/v2/sessions/` endpoint.

A dependency on gorilla/mux is added to be able to pattern-match
arguments in the URL. This was already an indirect dependency; now it's
a direct dependency of cockroach.

TODO that are likely to fall over into future PRs:
 - API Documentation, need to explore using swagger here.
 - Porting over remaining /_admin/ and /_status/ APIs, incl. SQL based ones

Part of #55947.

Release note (api change): Adds a new API tree, in /api/v2/*, currently
undocumented, that avoids the use of and cookie-based
authentication in favour of sessions in headers, and support for pagination.

58906: sql: improve interface for injecting descriptors into internal executor r=lucy-zhang a=lucy-zhang

As part of validating new schema elements (unique indexes, constraints)
in the schema changer, we inject in-memory descriptors to be used by the
internal executor that are never written to disk.

This is currently done by creating an entirely new dummy
`descs.Collection` and adding the injected descriptors as uncommitted
descriptors, and then setting this collection as the `tcModifier` on the
internal executor.  Then all subsequent queries using the internal
executor, which each get their own child `connExecutor` and
`descs.Collection`, will have their collection's uncommitted descriptors
replaced by the ones in the dummy collection.

This commit introduces the concept of a "synthetic descriptor" to refer
to these injected descriptors, and slightly improves the interface in
two ways:

1. Instead of creating a new `descs.Collection` to hold synthetic
   descriptors to copy, we now just use a slice of
   `catalog.Descriptor`s.
2. Synthetic descriptors are now made explicit in the `descs.Collection`
   and precede uncommitted descriptors when resolving immutable
   descriptors. Resolving these descriptors mutably is now illegal and
   will return an assertion error.

This commit doesn't change the fact that the synthetic descriptors to be
injected into each query/statement's child `descs.Collection` are set
on the internal executor itself. This is still not threadsafe. It would
make more sense for these descriptors to be scoped at the level of
individual queries.

Related to #34304.

Release note: None

59258: changefeedccl: Freeze table name to the (optionally fully qualified) statement time name r=[miretskiy] a=HonoreDB

Previously, Kafka topics and top-level keys were always derived from the
table name in changefeeds. If the table name changed, the feed
eventually failed, and if the table name was non-unique across
databases, collisions were unavoidable. This PR adds a WITH
full_table_name option to changefeeds, and honors it by serializing
movr.public.drivers as the statement time name and relying on that.

There are probably more things that need to change downstream.

Release note (sql change): Added "WITH full_table_name" option to create
a changefeed on "movr.public.drivers" instead of
"drivers".

59281: sql: prevent DROP SCHEMA CASCADE from droping types with references r=ajwerner a=ajwerner

Before this patch, a DROP SCHEMA CASCADE could cause database corruption
by dropping types which were referenced by other tables. This would lead to
bad errors like:

```
ERROR: object already exists: desc 687: type ID 685 in descriptor not found: descriptor not found
SQLSTATE: 42710
```

And doctor errors like:
```
   Table 687: ParentID  50, ParentSchemaID 29, Name 't': type ID 685 in descriptor not found: descriptor not found
```

Fixes #59021.

Release note (bug fix): Fixed a bug where `DROP SCHEMA ... CASCADE` could
result in types which are referenced being dropped.

59591: sql: hook up multi-region DDL to new zone config attributes r=aayushshah15 a=aayushshah15

After the introduction of #57184, we can constrain voting replicas
separately from non-voting replicas using the new attributes
`num_voters` and `voter_constraints`. This commit connects our
multi-region DDL statements to leverage these new attributes.

Broadly speaking,
- The existing `constraints` and `num_replicas` fields are set at the
database level, to be inherited by table/partition level zone configs.
- The new attributes: `num_voters` and `voter_constraints` (along with
accompanying `lease_preferences` for these voters) are set at the
table/partition level.

This brings the implementation of these DDL statements inline with the
functional specfication.

Fixes #57663

Release note: None


59659: sql: fix bug preventing adding FKs referencing hidden columns r=lucy-zhang a=lucy-zhang

The validation query for adding foreign keys had a pointless `SELECT *`
on the referenced table that caused hidden columns to be omitted,
so attempting to add foreign key constraints referencing hidden columns
would fail. This PR fixes the query.

Fixes #59582.

Release note (bug fix): Fixed a bug preventing foreign key constraints
referencing hidden columns (e.g., `rowid`) from being added.

Co-authored-by: Bilal Akhtar <bilal@cockroachlabs.com>
Co-authored-by: Lucy Zhang <lucy@cockroachlabs.com>
Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Co-authored-by: Aayush Shah <aayush.shah15@gmail.com>
  • Loading branch information
6 people committed Feb 1, 2021
7 parents 90aad1a + a889429 + 77cfe96 + adb5528 + 8ba36eb + 14fb683 + 0f5be09 commit f37712a
Show file tree
Hide file tree
Showing 46 changed files with 3,807 additions and 622 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Expand Up @@ -1169,8 +1169,8 @@ def go_deps():
name = "com_github_gorilla_mux",
build_file_proto_mode = "disable_global",
importpath = "github.com/gorilla/mux",
sum = "h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=",
version = "v1.7.4",
sum = "h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=",
version = "v1.8.0",
)
go_repository(
name = "com_github_gorilla_securecookie",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -81,7 +81,7 @@ require (
github.com/google/pprof v0.0.0-20190109223431-e84dfd68c163
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
github.com/gorhill/cronexpr v0.0.0-20140423231348-a557574d6c02
github.com/gorilla/mux v1.7.4 // indirect
github.com/gorilla/mux v1.8.0
github.com/goware/modvendor v0.3.0
github.com/grpc-ecosystem/grpc-gateway v1.13.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -358,8 +358,8 @@ github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorhill/cronexpr v0.0.0-20140423231348-a557574d6c02 h1:Spo+4PFAGDqULAsZ7J69MOxq4/fwgZ0zvmDTBqpq7yU=
github.com/gorhill/cronexpr v0.0.0-20140423231348-a557574d6c02/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.0 h1:S7P+1Hm5V/AT9cjEcUD5uDaQSX0OE577aCXgoaKpYbQ=
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Expand Up @@ -50,6 +50,7 @@ go_library(
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/hydratedtables",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/avro.go
Expand Up @@ -402,12 +402,13 @@ func columnDescToAvroSchema(colDesc *descpb.ColumnDescriptor) (*avroSchemaField,

// indexToAvroSchema converts a column descriptor into its corresponding avro
// record schema. The fields are kept in the same order as columns in the index.
// sqlName can be any string but should uniquely identify a schema.
func indexToAvroSchema(
tableDesc catalog.TableDescriptor, indexDesc *descpb.IndexDescriptor,
tableDesc catalog.TableDescriptor, indexDesc *descpb.IndexDescriptor, sqlName string,
) (*avroDataRecord, error) {
schema := &avroDataRecord{
avroRecord: avroRecord{
Name: SQLNameToAvroName(tableDesc.GetName()),
Name: SQLNameToAvroName(sqlName),
SchemaType: `record`,
},
fieldIdxByName: make(map[string]int),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Expand Up @@ -285,7 +285,7 @@ func TestAvroSchema(t *testing.T) {
`{"type":["null","long"],"name":"_u0001f366_","default":null,`+
`"__crdb__":"🍦 INT8 NOT NULL"}]}`,
tableSchema.codec.Schema())
indexSchema, err := indexToAvroSchema(tableDesc, tableDesc.GetPrimaryIndex().IndexDesc())
indexSchema, err := indexToAvroSchema(tableDesc, tableDesc.GetPrimaryIndex().IndexDesc(), tableDesc.GetName())
require.NoError(t, err)
require.Equal(t,
`{"type":"record","name":"_u2603_","fields":[`+
Expand Down
46 changes: 44 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Expand Up @@ -34,8 +34,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -86,7 +88,6 @@ func changefeedPlanHook(
); err != nil {
return nil, nil, nil, false, err
}

var sinkURIFn func() (string, error)
var header colinfo.ResultColumns
unspecifiedSink := changefeedStmt.SinkURI == nil
Expand Down Expand Up @@ -198,8 +199,13 @@ func changefeedPlanHook(
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, table, *p.ExecCfg(), p.Txn(), qualified)
if err != nil {
return err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
StatementTimeName: table.GetName(),
StatementTimeName: name,
}
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return err
Expand Down Expand Up @@ -675,3 +681,39 @@ func (b *changefeedResumer) OnPauseRequest(
return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, *b.job.ID(),
details.Targets, *resolved, cp)
}

// getQualifiedTableName returns the database-qualified name of the table
// or view represented by the provided descriptor.
func getQualifiedTableName(
ctx context.Context, execCfg sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (string, error) {
dbDesc, err := catalogkv.MustGetDatabaseDescByID(ctx, txn, execCfg.Codec, desc.GetParentID())
if err != nil {
return "", err
}
schemaID := desc.GetParentSchemaID()
schemaName, err := resolver.ResolveSchemaNameByID(ctx, txn, execCfg.Codec, desc.GetParentID(), schemaID)
if err != nil {
return "", err
}
tbName := tree.MakeTableNameWithSchema(
tree.Name(dbDesc.GetName()),
tree.Name(schemaName),
tree.Name(desc.GetName()),
)
return tbName.String(), nil
}

// getChangefeedTargetName gets a table name with or without the dots
func getChangefeedTargetName(
ctx context.Context,
desc catalog.TableDescriptor,
execCfg sql.ExecutorConfig,
txn *kv.Txn,
qualified bool,
) (string, error) {
if qualified {
return getQualifiedTableName(ctx, execCfg, txn, desc)
}
return desc.GetName(), nil
}
36 changes: 22 additions & 14 deletions pkg/ccl/changefeedccl/changefeed_test.go
Expand Up @@ -195,6 +195,26 @@ func TestChangefeedEnvelope(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedFullTableName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)

t.Run(`envelope=row`, func(t *testing.T) {
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH full_table_name`)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{`d.public.foo: [1]->{"after": {"a": 1, "b": "a"}}`})
})
}
//TODO(zinger): Plumb this option through to all encoders so it works in sinkless mode
//t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedMultiTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1404,7 +1424,7 @@ func TestChangefeedUpdatePrimaryKey(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedTruncateRenameDrop(t *testing.T) {
func TestChangefeedTruncateOrDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -1450,18 +1470,6 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) {
}
assertFailuresCounter(t, metrics, 2)

sqlDB.Exec(t, `CREATE TABLE rename (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO rename VALUES (1)`)
rename := feed(t, f, `CREATE CHANGEFEED FOR rename`)
defer closeFeed(t, rename)
assertPayloads(t, rename, []string{`rename: [1]->{"after": {"a": 1}}`})
sqlDB.Exec(t, `ALTER TABLE rename RENAME TO renamed`)
sqlDB.Exec(t, `INSERT INTO renamed VALUES (2)`)
if _, err := rename.Next(); !testutils.IsError(err, `"rename" was renamed to "renamed"`) {
t.Errorf(`expected ""rename" was renamed to "renamed"" error got: %+v`, err)
}
assertFailuresCounter(t, metrics, 3)

sqlDB.Exec(t, `CREATE TABLE drop (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO drop VALUES (1)`)
drop := feed(t, f, `CREATE CHANGEFEED FOR drop`)
Expand All @@ -1471,7 +1479,7 @@ func TestChangefeedTruncateRenameDrop(t *testing.T) {
if _, err := drop.Next(); !testutils.IsError(err, `"drop" was dropped`) {
t.Errorf(`expected ""drop" was dropped" error got: %+v`, err)
}
assertFailuresCounter(t, metrics, 4)
assertFailuresCounter(t, metrics, 3)
}

t.Run(`sinkless`, sinklessTest(testFn))
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Expand Up @@ -30,6 +30,7 @@ const (
OptCursor = `cursor`
OptEnvelope = `envelope`
OptFormat = `format`
OptFullTableName = `full_table_name`
OptKeyInValue = `key_in_value`
OptResolvedTimestamps = `resolved`
OptUpdatedTimestamps = `updated`
Expand Down Expand Up @@ -100,6 +101,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptCursor: sql.KVStringOptRequireValue,
OptEnvelope: sql.KVStringOptRequireValue,
OptFormat: sql.KVStringOptRequireValue,
OptFullTableName: sql.KVStringOptRequireNoValue,
OptKeyInValue: sql.KVStringOptRequireNoValue,
OptResolvedTimestamps: sql.KVStringOptAny,
OptUpdatedTimestamps: sql.KVStringOptRequireNoValue,
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Expand Up @@ -49,9 +49,6 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
if tableDesc.GetState() == descpb.DescriptorState_DROP {
return errors.Errorf(`"%s" was dropped`, t.StatementTimeName)
}
if tableDesc.GetName() != t.StatementTimeName {
return errors.Errorf(`"%s" was renamed to "%s"`, t.StatementTimeName, tableDesc.GetName())
}

return nil
}
31 changes: 25 additions & 6 deletions pkg/ccl/changefeedccl/encoder.go
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"encoding/binary"
gojson "encoding/json"
"fmt"
"io/ioutil"
"net/url"
"path/filepath"
Expand Down Expand Up @@ -273,8 +274,8 @@ func (e *jsonEncoder) EncodeResolvedTimestamp(
// JSON format. Keys are the primary key columns in a record. Values are all
// columns in a record.
type confluentAvroEncoder struct {
registryURL string
updatedField, beforeField, keyOnly bool
registryURL string
updatedField, beforeField, keyOnly, useFullTableName bool

keyCache map[tableIDAndVersion]confluentRegisteredKeySchema
valueCache map[tableIDAndVersionPair]confluentRegisteredEnvelopeSchema
Expand Down Expand Up @@ -326,32 +327,50 @@ func newConfluentAvroEncoder(opts map[string]string) (*confluentAvroEncoder, err
return nil, errors.Errorf(`%s is not supported with %s=%s`,
changefeedbase.OptKeyInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro)
}
_, e.useFullTableName = opts[changefeedbase.OptFullTableName]

if len(e.registryURL) == 0 {
return nil, errors.Errorf(`WITH option %s is required for %s=%s`,
changefeedbase.OptConfluentSchemaRegistry, changefeedbase.OptFormat, changefeedbase.OptFormatAvro)
}

e.keyCache = make(map[tableIDAndVersion]confluentRegisteredKeySchema)
e.valueCache = make(map[tableIDAndVersionPair]confluentRegisteredEnvelopeSchema)
e.resolvedCache = make(map[string]confluentRegisteredEnvelopeSchema)
return e, nil
}

//Get the raw SQL-formatted string for a table name and apply full_table_name option
func (e *confluentAvroEncoder) rawTableName(desc catalog.TableDescriptor) string {
tableName := desc.GetName()
if e.useFullTableName {
//We can't use the statement time name here because schemas are version specific
//And the fully-qualified table name is either hard to get or undefined
//without a current transaction and execution context.
//But this just needs to avoid collisions, so we can use ids in place of names.
tableName = fmt.Sprintf("db%d.schema%d.%s",
desc.GetParentID(),
desc.GetParentSchemaID(),
tableName)
}
return tableName
}

// EncodeKey implements the Encoder interface.
func (e *confluentAvroEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) {
cacheKey := makeTableIDAndVersion(row.tableDesc.GetID(), row.tableDesc.GetVersion())

registered, ok := e.keyCache[cacheKey]
if !ok {
var err error
registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex().IndexDesc())
tableName := e.rawTableName(row.tableDesc)
registered.schema, err = indexToAvroSchema(row.tableDesc, row.tableDesc.GetPrimaryIndex().IndexDesc(), tableName)
if err != nil {
return nil, err
}

// NB: This uses the kafka name escaper because it has to match the name
// of the kafka topic.
subject := SQLNameToKafkaName(row.tableDesc.GetName()) + confluentSubjectSuffixKey
subject := SQLNameToKafkaName(tableName) + confluentSubjectSuffixKey
registered.registryID, err = e.register(ctx, &registered.schema.avroRecord, subject)
if err != nil {
return nil, err
Expand Down Expand Up @@ -404,7 +423,7 @@ func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) (

// NB: This uses the kafka name escaper because it has to match the name
// of the kafka topic.
subject := SQLNameToKafkaName(row.tableDesc.GetName()) + confluentSubjectSuffixValue
subject := SQLNameToKafkaName(e.rawTableName(row.tableDesc)) + confluentSubjectSuffixValue
registered.registryID, err = e.register(ctx, &registered.schema.avroRecord, subject)
if err != nil {
return nil, err
Expand Down
56 changes: 56 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Expand Up @@ -388,6 +388,62 @@ func TestAvroEncoder(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestTableNameCollision(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
reg := makeTestSchemaRegistry()
defer reg.Close()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE DATABASE movr`)
sqlDB.Exec(t, `CREATE DATABASE printr`)
sqlDB.Exec(t, `CREATE TABLE movr.drivers (id INT PRIMARY KEY, name STRING)`)
sqlDB.Exec(t, `CREATE TABLE printr.drivers (id INT PRIMARY KEY, version INT)`)
sqlDB.Exec(t,
`INSERT INTO movr.drivers VALUES (1, 'Alice'), (2, NULL)`,
)
sqlDB.Exec(t,
`INSERT INTO printr.drivers VALUES (1, 100), (2, NULL)`,
)

movrFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers `+
`WITH format=$1, confluent_schema_registry=$2, diff, resolved`,
changefeedbase.OptFormatAvro, reg.server.URL)
defer closeFeed(t, movrFeed)

printrFeed := feed(t, f, `CREATE CHANGEFEED FOR printr.drivers `+
`WITH format=$1, confluent_schema_registry=$2, diff, resolved`,
changefeedbase.OptFormatAvro, reg.server.URL)
defer closeFeed(t, printrFeed)

comboFeed := feed(t, f, `CREATE CHANGEFEED FOR printr.drivers, movr.drivers `+
`WITH format=$1, confluent_schema_registry=$2, diff, resolved, full_table_name`,
changefeedbase.OptFormatAvro, reg.server.URL)
defer closeFeed(t, comboFeed)

assertPayloadsAvro(t, reg, movrFeed, []string{
`drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"name":{"string":"Alice"}}},"before":null}`,
`drivers: {"id":{"long":2}}->{"after":{"drivers":{"id":{"long":2},"name":null}},"before":null}`,
})

assertPayloadsAvro(t, reg, printrFeed, []string{
`drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"version":{"long":100}}},"before":null}`,
`drivers: {"id":{"long":2}}->{"after":{"drivers":{"id":{"long":2},"version":null}},"before":null}`,
})

assertPayloadsAvro(t, reg, comboFeed, []string{
`movr.public.drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"name":{"string":"Alice"}}},"before":null}`,
`movr.public.drivers: {"id":{"long":2}}->{"after":{"drivers":{"id":{"long":2},"name":null}},"before":null}`,
`printr.public.drivers: {"id":{"long":1}}->{"after":{"drivers":{"id":{"long":1},"version":{"long":100}}},"before":null}`,
`printr.public.drivers: {"id":{"long":2}}->{"after":{"drivers":{"id":{"long":2},"version":null}},"before":null}`,
})
}

t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestAvroMigrateToUnsupportedColumn(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit f37712a

Please sign in to comment.