Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Introduce cdcevent package. #81249

Merged
merged 1 commit into from
May 17, 2022

Conversation

miretskiy
Copy link
Contributor

Event processing in CDC is a fairly involved procedure, consisting
of multiple stages. A low level KeyValue event is received from
rangefeeds. Then this low level event needs to be decoded into
encoded datums, along with type information describing those datums,
representing the "row" in the table.
Thereafter those datums must be further encoded into appropriate format.

Historically, this pipeline operated on structures representing
fairly low level objects -- such as catalog.Descriptor, EncDatumRow, etc.

This is problematic for multiple reasons:

  • A tight coupling to the underlying table/family descriptors makes it
    hard to introduce layers into pipeline that may not be directly tied
    to those descriptors -- such as projections and predicates.
  • Exposing low level descriptor information meant that every encoder
    implementation had to have repeated, and very subtle and error prone code,
    to process columns in the correct order -- filtering unwanted
    families, processing primary index column in the correct order, etc.
  • The catalog.Descriptor interfaces are fairly large interfaces,
    understandably so. However, CDC does not need access to the
    entirety of those interfaces -- mostly, CDC needs to access to very few
    methods. It is desirable therefore to pass in the smallest
    interface necessary to make CDC work.

This is the purpose of cdcevent package. It introduces several
abstractions to facilitate conversion from the low level KeyValue event
into cdcevent.EventRow event. The cdcevent.EventDecoder interface
defines conversion from low level event into cdcevent.EventRow. Event
row has associated event descriptor describing the set of columns needed
to perform CDC activities. The rest of the CDC processing operates
on these higher level interfaces instead of having direct depency on
low level catalog interfaces.

Release Notes: None

@miretskiy miretskiy requested a review from HonoreDB May 13, 2022 20:44
@miretskiy miretskiy requested a review from a team as a code owner May 13, 2022 20:44
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking this on! Couple of minor comments. Also, is there an easy way to verify that introducing this extra layer of abstraction doesn't result in more allocs on the hot path?

Reviewed 3 of 3 files at r1, 2 of 2 files at r2, 3 of 3 files at r3, 1 of 1 files at r4, 20 of 20 files at r5, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/cdcevent/event.go line 47 at r5 (raw file):

	FamilyName() string
	// NumFamilies returns number of families in the table.
	NumFamilies() int

nit: could this be TableHasOtherFamilies() bool? Both because the EventSource always only has one family, and the only information we actually need is whether there are others.


pkg/ccl/changefeedccl/cdcevent/event.go line 380 at r5 (raw file):

		return EventRow{}, err
	}
	// Copy datums since row fetcher reuses alloc.

nit: can remove this comment

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subway first pass.

pkg/ccl/changefeedccl/cdcevent/event.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/cdcevent/event.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/cdcevent/event.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/cdcevent/event.go Show resolved Hide resolved
@miretskiy
Copy link
Contributor Author

nit: could this be TableHasOtherFamilies() bool? Both because the EventSource always only has one family, and the only information we actually need is whether there are others.

Done.

nit: can remove this comment

Done.

@miretskiy
Copy link
Contributor Author

Also, is there an easy way to verify that introducing this extra layer of abstraction doesn't result in more allocs on the hot path?

I don't think there is an easy way; I've tried not to do anything too silly. However, this should not prevent from merging (eventually) this pr.. We can improve things if needed.

@miretskiy
Copy link
Contributor Author

@HonoreDB @ajwerner all comments addressed.

@miretskiy miretskiy force-pushed the constrain branch 4 times, most recently from c21459c to feb8dfa Compare May 16, 2022 13:04
Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 11 of 15 files at r6, 8 of 8 files at r7, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @miretskiy)


pkg/ccl/changefeedccl/cdcevent/event.go line 183 at r7 (raw file):

	desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtualColumns bool,
) (*eventDescriptor, error) {
	var inFamily catalog.TableColSet

golf: inFamily := catalog.MakeTableColSet(family.ColumnIDs...)

@miretskiy
Copy link
Contributor Author

inFamily := catalog.MakeTableColSet(family.ColumnIDs...)

Nice; thanks

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursory second pass :lgtm:

Reviewed 1 of 15 files at r6, 1 of 1 files at r8.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/cdcevent/event.go line 244 at r8 (raw file):

// DebugString returns event descriptor debug information.
func (d *eventDescriptor) DebugString() string {

if you felt fancy, you could implement SafeFormat or whatever the method name is so that this object and even the row itself can be logged. Might come in handy

Code quote:

// DebugString returns event descriptor debug information.
func (d *eventDescriptor) DebugString() string {
	return fmt.Sprintf("eventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v",
		d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.familyCols)
}

@miretskiy
Copy link
Contributor Author

if you felt fancy, you could implement SafeFormat

added

Event processing in CDC is a fairly involved procedure, consisting
of multiple stages.  A low level KeyValue event is received from
rangefeeds.  Then this low level event needs to be decoded into
encoded datums, along with type information describing those datums,
representing the "row" in the table.
Thereafter those datums must be further encoded into appropriate format.

Historically, this pipeline operated on structures representing
fairly low level objects -- such as catalog.Descriptor, EncDatumRow, etc.

This is problematic for multiple reasons:
  * A tight coupling to the underlying table/family descriptors makes it
    hard to introduce layers into pipeline that may not be directly tied
    to those descriptors -- such as projections and predicates.
  * Exposing low level descriptor information meant that every encoder
    implementation had to have repeated, and very subtle and error prone code,
    to process columns in the correct order -- filtering unwanted
    families, processing primary index column in the correct order, etc.
  * The catalog.Descriptor interfaces are fairly large interfaces,
    understandably so.  However, CDC does not need access to the
    entirety of those interfaces -- mostly, CDC needs to access to very few
    methods.  It is desirable therefore to pass in the smallest
    interface necessary to make CDC work.

This is the purpose of `cdcevent` package.  It introduces several
abstractions to facilitate conversion from the low level KeyValue event
into `cdcevent.EventRow` event.  The `cdcevent.EventDecoder` interface
defines conversion from low level event into `cdcevent.EventRow`.  Event
row has associated event descriptor describing the set of columns needed
to perform CDC activities.  The rest of the CDC processing operates
on these higher level interfaces instead of having direct depency on
low level catalog interfaces.

Release Notes: None
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented May 17, 2022

Build succeeded:

@craig craig bot merged commit 8e4b1a2 into cockroachdb:master May 17, 2022
@miretskiy
Copy link
Contributor Author

miretskiy commented May 17, 2022 via email

craig bot pushed a commit that referenced this pull request Jun 26, 2022
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy

Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.

```
CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...
```

This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:
  * Volatile functions not allowed.
  * Sub-selects not allowed.
  * Aggregate and window functions (i.e. functions operating over many
    rows) not allowed.
  * Some stable functions, notably functions which return MVCC
    timestamp, are overridden to return MVCC timestamp of the event.

In addition, some CDC specific functions are provided:
  * cdc_is_delete: returns true if the event is a deletion event.
  * cdc_prev: returns JSON representation of the previous row state.
  * cdc_updated_timestamp: returns event update timestamp (usually MVCC
    timestamp, but can be different if e.g. undergoing schema changes)
Additional CDC specific functions will be added in the follow on PRs.

Few examples:

* Emit all but the deletion events:
```
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
```

* Emit all events that modified `important_col` column:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
```

* Emit few colums, as well as computed expresions:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';
```

When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.

For example, given the following table and a changefeed:
```
CREATE TABLE warehouse (
  region STRING,
  warehouseID int,
  ....
  PRIMARY KEY (region, warehouseID)
);

CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';
```

The create changefeed will only scan table spans that contain `US/east`
region (and ignore all other table spans).

---

For foundational work, see:

- #81676
- #81249
- #80499

Addresses:
- #56949
- #31214


---

Release Notes (enterprise):
CHANGEFEED statement now supports general expressions -- predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.

```
CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()
```


Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants