Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeeccl: Projections and Filters in CDC. #82562

Merged
merged 2 commits into from
Jun 26, 2022

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented Jun 7, 2022

Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.

CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...

This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:

  • Volatile functions not allowed.
  • Sub-selects not allowed.
  • Aggregate and window functions (i.e. functions operating over many
    rows) not allowed.
  • Some stable functions, notably functions which return MVCC
    timestamp, are overridden to return MVCC timestamp of the event.

In addition, some CDC specific functions are provided:

  • cdc_is_delete: returns true if the event is a deletion event.
  • cdc_prev: returns JSON representation of the previous row state.
  • cdc_updated_timestamp: returns event update timestamp (usually MVCC
    timestamp, but can be different if e.g. undergoing schema changes)
    Additional CDC specific functions will be added in the follow on PRs.

Few examples:

  • Emit all but the deletion events:
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
  • Emit all events that modified important_col column:
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
  • Emit few colums, as well as computed expresions:
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';

When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.

For example, given the following table and a changefeed:

CREATE TABLE warehouse (
  region STRING,
  warehouseID int,
  ....
  PRIMARY KEY (region, warehouseID)
);

CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';

The create changefeed will only scan table spans that contain US/east
region (and ignore all other table spans).


For foundational work, see:

Addresses:


Release Notes (enterprise):
CHANGEFEED statement now supports general expressions -- predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.

CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()

@miretskiy miretskiy requested review from a team as code owners June 7, 2022 23:13
@miretskiy miretskiy requested review from stevendanna and removed request for a team June 7, 2022 23:13
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy miretskiy requested review from rytaft and HonoreDB June 7, 2022 23:14
@miretskiy
Copy link
Contributor Author

Note to the reviewers: I plan on expanding changefeed_test with more end2end tests.
In the meantime, want to start the review and see what TC has to say.

Copy link
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I left a few nits. I think I reviewed the optimizer-related code, but if you could point me to specific files you want me to look at more closely that would be helpful.

Reviewed 8 of 8 files at r1, 34 of 34 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, and @stevendanna)


-- commits line 92 at r2:
now supports?


pkg/ccl/changefeedccl/cdceval/constraint.go line 25 at r2 (raw file):

// ConstrainPrimaryIndexSpanByFilter attempts to constrain table primary index span via specified
// filter.  Returns constrained span, and a possibly empty remaining filter that needs to be evaluated.

nit: comment should be <= 80 columns wide


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 48 at r2 (raw file):

}

// NewEvaluatorForExpressions returns evaluator configured to process specified projection and filter expressions.

nit: wrap comment lines


pkg/ccl/changefeedccl/cdceval/validation.go line 85 at r2 (raw file):

		return nil, pgerror.Newf(pgcode.InvalidParameterValue,
			"projections and filter cannot be used when running against multifamily table (table has %d families)",
			desc.NumFamilies())

I think you can relax this restriction in the future if the filters and projections only use columns from the target family


pkg/ccl/changefeedccl/schemafeed/schema_feed.go line 534 at r2 (raw file):

func (tf *schemaFeed) validatePredicates(ctx context.Context, desc catalog.TableDescriptor) error {
	if len(tf.projection) == 0 {

did you mean predicate here?


pkg/clusterversion/cockroach_versions.go line 368 at r2 (raw file):

	// keys at the Pebble layer.
	EnablePebbleFormatVersionRangeKeys
	// EnablePredicateChangefeed indicates that changefeeds support predicates and projections.

nit: wrap comment


pkg/clusterversion/cockroach_versions.go line 369 at r2 (raw file):

	EnablePebbleFormatVersionRangeKeys
	// EnablePredicateChangefeed indicates that changefeeds support predicates and projections.
	EnablePredicateChangefeed

nit: maybe call this EnablePredicateProjectionChangefeed?


pkg/sql/execinfrapb/processors_changefeeds.proto line 61 at r2 (raw file):

  // Projection is the "select clause" for predicate changefeed.
  optional Expression projection = 6 [(gogoproto.nullable) = false];

should this be repeated instead of optional?


pkg/sql/opt/exec/execbuilder/builder.go line 132 at r1 (raw file):

	ContainsMutation bool

	// wrapFunction returns resolvable function referenced for function with

nit: referenced -> reference


pkg/ccl/changefeedccl/alter_changefeed_test.go line 1285 at r2 (raw file):

	// and removed 'primary_key_filter'.  Since predicates and projections are no longer
	// a "string" option, alter statement implementation (and grammar) needs to be updated, and
	// this test modified and re-enabled.

nit: wrap comment

Copy link
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @rytaft, and @stevendanna)


-- commits line 92 at r2:

Previously, rytaft (Rebecca Taft) wrote…

now supports?

Done.


pkg/ccl/changefeedccl/cdceval/validation.go line 85 at r2 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

I think you can relax this restriction in the future if the filters and projections only use columns from the target family

I agree; I spoke about this very problem before with @HonoreDB . I left a TODO. I think we should drop this restriction; but for now, keeping it.


pkg/ccl/changefeedccl/schemafeed/schema_feed.go line 534 at r2 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

did you mean predicate here?

Nah.. Projection is right because you must have at least 1 expression in projection, while filter is optional.
I remaned this method to validateProjectionAndFilter to better reflect the purpose.


pkg/clusterversion/cockroach_versions.go line 369 at r2 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: maybe call this EnablePredicateProjectionChangefeed?

Done.


pkg/sql/execinfrapb/processors_changefeeds.proto line 61 at r2 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

should this be repeated instead of optional?

Nah.. I was actually putting entire list into that... I've been going back and forth on this... but basically
I decided to change sql grammar, and update this file approriately to have a tree.SelectClause
(and store entire select clause here, instead of splitting into projection/filter).
This way, I improve Go api (no more functions that take 2 strings where you can get the order wrong), and I also get ability to have "aliases".


pkg/sql/opt/exec/execbuilder/builder.go line 132 at r1 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: referenced -> reference

Done.


pkg/ccl/changefeedccl/alter_changefeed_test.go line 1285 at r2 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

nit: wrap comment

Done.

@miretskiy miretskiy force-pushed the expressions branch 13 times, most recently from 26c7cb1 to 8146460 Compare June 14, 2022 00:50
Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 8 files at r1, 8 of 36 files at r5, 2 of 37 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @rytaft, and @stevendanna)


pkg/ccl/changefeedccl/changefeed_stmt.go line 899 at r6 (raw file):

	{
		if details.Select != "" {
			if len(details.TargetSpecifications) > 1 {

Suggest len(details.TargetSpecifications) != 1 and requires exactly one table so that we'll catch it if this check happens before TargetSpecifications is populated.


pkg/ccl/changefeedccl/event_processing.go line 171 at r6 (raw file):

		}
		updatedRow = projection
		prevRow = cdcevent.Row{}

Could you say more about why you're clearing this here? Maybe an alternative would be to call c.evaluator.Projection(ctx, prevRow, mvccTimestamp, prevRow) even though if the projection already references cdc_prev it's semantically a little messy.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 730 at r6 (raw file):

	for _, c := range d.ResultColumns() {
		if c.Typ.UserDefined() {
			r.byName[c.Typ.Name()] = c.Typ

Annoying corner case here. You can't use enums across databases, but you can have enums with the same name from different schemas in the same table.

root@localhost:26257/one> show create table both_switches;
   table_name   |                      create_statement
----------------+--------------------------------------------------------------
  both_switches | CREATE TABLE public.both_switches (
                |     a public.switch NULL,
                |     b alt.switch NULL,
                |     rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(),
                |     CONSTRAINT both_switches_pkey PRIMARY KEY (rowid ASC)
                | )

Name() looks like it returns the unqualified name (see types.go:1500) so this can collide. I think c.TypeMeta.Name.FQName() should do it.


pkg/sql/parser/sql.y line 3977 at r6 (raw file):

  }
| CREATE CHANGEFEED /*$3=*/ opt_changefeed_sink /*$4=*/ opt_with_options
  AS SELECT /*$7=*/target_list FROM /*$9=*/insert_target /*$10=*/opt_where_clause

Ooh, these arg number comments are handy.

insert_target feels wrong here, I think we should move away from coupling changefeed target syntax to other statements' target syntax even if they happen to be the same right now. from_list might be more appropriate or just add a changefeed_from_clause token.


pkg/ccl/changefeedccl/changefeed_test.go line 4016 at r6 (raw file):

	sqlDB := sqlutils.MakeSQLRunner(s.DB)
	// Create enum to ensure enum values displayed correctly in the summary.
	sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`)

Couple of enum tests that might fail right now:

  • Two enums with the same name, same database, different schemas
  • Create a changefeed with WHERE status='open' and then ALTER TYPE status RENAME VALUE 'open' to 'active'.

@HonoreDB
Copy link
Contributor

pkg/ccl/changefeedccl/cdceval/expr_eval.go line 730 at r6 (raw file):

Previously, HonoreDB (Aaron Zinger) wrote…

Annoying corner case here. You can't use enums across databases, but you can have enums with the same name from different schemas in the same table.

root@localhost:26257/one> show create table both_switches;
   table_name   |                      create_statement
----------------+--------------------------------------------------------------
  both_switches | CREATE TABLE public.both_switches (
                |     a public.switch NULL,
                |     b alt.switch NULL,
                |     rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(),
                |     CONSTRAINT both_switches_pkey PRIMARY KEY (rowid ASC)
                | )

Name() looks like it returns the unqualified name (see types.go:1500) so this can collide. I think c.TypeMeta.Name.FQName() should do it.

Oh and also ALTER type switch RENAME TO status is a thing and arguably wouldn't be expected to break a running changefeed.

@jordanlewis
Copy link
Member

Thanks. That improves things, but it wasn't really what I was after. I see this:

root@localhost:26257/defaultdb> create changefeed with schema_change_policy='stop' as select a, b from a;
table,key,value
a,[1],"{""after"": {""a"": 1, ""b"": 2}}"
a,[2],"{""after"": {""a"": 2, ""b"": 2}}"
a,[3],"{""after"": {""a"": 3, ""b"": 2}}"
a,[4],"{""after"": {""a"": 4, ""b"": 2}}"
a,[5],"{""after"": {""a"": 5, ""b"": 2}}"
a,[6],"{""after"": {""a"": 6, ""b"": 2}}"

With the new hotness (projections), I'd expect to see this:

a,b
1,2
2,2
3,2

etc. I'm sure this is different semantics to an ordinary changefeed, and perhaps we'd need a different way to ask for this output. It's also likely I'm wandering into an old debate so apologies if this has already been covered at length elsewhere (and please feel free to point me to old conversations and so on). But comparing to the kind of output you see from other streaming query systems like ksqldb, the table/key/value output is harder to consume and understand for a novice user.

Take a look at EMIT CHANGES output from ksqldb. As expected, it would "stream updates" to the given projections, returning the projected columns as native SQL, rather than wrapped in JSON.

image

reference: https://www.confluent.io/blog/intro-to-ksqldb-sql-database-streaming/

@miretskiy
Copy link
Contributor Author

miretskiy commented Aug 29, 2022 via email

@miretskiy
Copy link
Contributor Author

miretskiy commented Aug 29, 2022 via email

@jordanlewis
Copy link
Member

Cool, thank you.

miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 16, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 16, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 17, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 29, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 29, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 30, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Nov 30, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Dec 1, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Dec 1, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Dec 2, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90421
Fixes cockroachdb#90455
Informs cockroachdb#90442

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Dec 2, 2022
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90455
Informs cockroachdb#90442
Informs CRDB-18978
Informs CRDB-17161

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
craig bot pushed a commit that referenced this pull request Dec 2, 2022
85177: changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions. r=miretskiy a=miretskiy

Previous PRs (#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev.*) AS prevJson FROM tbl
```

Fixes #90416
Fixes #90714
Fixes #90455
Informs #90442
Informs CRDB-18978
Informs CRDB-17161

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.

92951: ui: show txn idle time in txn detail page r=matthewtodd a=matthewtodd

Part of #86667

|Before|After|
|--|--|
|<img width="1628" alt="Screen Shot 2022-12-02 at 3 16 08 PM" src="https://user-images.githubusercontent.com/5261/205379330-e1990261-9ca4-4d73-878f-a21ed0a90412.png">|<img width="1628" alt="Screen Shot 2022-12-02 at 3 16 15 PM" src="https://user-images.githubusercontent.com/5261/205379338-956a1801-07db-48e8-9a9c-bbe9e84fb5b6.png">|

Release note (ui change): The "Transaction resource usage" card on the transaction fingerprint page now includes an "Idle latency" row, representing the time spent by the application performing other work while holding this transaction open.

Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Matthew Todd <todd@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants