Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: standard stream #14272

Merged
merged 10 commits into from Jan 20, 2024
Merged

feat: standard stream #14272

merged 10 commits into from Jan 20, 2024

Conversation

zhyass
Copy link
Member

@zhyass zhyass commented Jan 8, 2024

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

  1. Adding Change Tracking Field _row_version:
  • A new change tracking field, _row_version, has been introduced in the database to record the version information of each row. It starts with an initial value of 0, and increments by 1 with every update to the row. This addition effectively tracks the change history of each row.
  1. Enhancing CREATE STREAM Syntax with append_only Option:
CREATE STREAM [IF NOT EXISTS] [<database_name>.]<stream_name>
  ON TABLE [<database_name>.]<table_name>
    [AT (STREAM => <stream_name>)]
    [append_only = true|false]
    [COMMENT = '<comment>']
  • If append_only=true, the created stream is defined as an append_only stream, meaning it is used solely for appending data and does not track updates or deletions.
  • If append_only=false, the stream created is a standard stream. This type of stream is capable of capturing all types of data changes, including updates and deletions, offering a comprehensive tracking of data modifications.
  1. Stream Query Update

"The change field is no longer treated as an internal field; it will be returned in the results when performing a SELECT * query on a stream."

mysql> select * from s;
+------+---------------+----------------------------------------+------------------+
| a    | change$action | change$row_id                          | change$is_update |
+------+---------------+----------------------------------------+------------------+
|    1 | DELETE        | 1abf6fbd82fc46caa57660d6f41c5bb6000000 |                0 |
|    0 | INSERT        | 1abf6fbd82fc46caa57660d6f41c5bb6000001 |                1 |
|    2 | DELETE        | 1abf6fbd82fc46caa57660d6f41c5bb6000001 |                1 |
|    4 | INSERT        | fd837066ab7c422d9abac273c85ce9f9000000 |                0 |
|    5 | INSERT        | fd837066ab7c422d9abac273c85ce9f9000001 |                0 |
+------+---------------+----------------------------------------+------------------+
5 rows in set (0.26 sec)
Read 14 rows, 931.00 B in 0.042 sec., 336.15 rows/sec., 21.83 KiB/sec.
  1. Features of Standard Stream:
  • The Standard stream can capture all types of data changes, including updates and deletions. This makes the standard stream a more versatile and suitable option for complex data change scenarios.

  • Design

    Design Assumptions:

    • The foundational version of the stream is labeled as v0, corresponding to snapshot s0.
    • At the transaction execution moment of querying the stream, the observed version of the table is v1, correlating to snapshot s1.

    Definitions:

    • add_blocks (A): This denotes the set of blocks newly added in snapshot s1.
    • delete_blocks (D): This refers to the set of blocks present in snapshot s0 but removed in snapshot s1.

Capture data changes through the following SQL to obtain standard stream data.

WITH change AS (
    SELECT *
    FROM 
        (SELECT xx, change$row_id, 'insert' AS change$action
         FROM add_blocks ) as A
        FULL OUTER JOIN 
        (SELECT xx AS D_xx, change$row_id AS D_change$row_id, 'delete' AS D_change$action
         FROM delete_blocks ) as D
        ON A.change$row_id == D.D_change$row_id
    WHERE ( A.change$row_id is null OR D.change$row_id is null OR A._row_version > D._row_version )  
)
SELECT xx, change$row_id, change$action, is_not_null(D_change$action) as change$is_update
FROM change
WHERE is_not_null(change$action)
UNION ALL
SELECT D_xx, D_change$row_id, D_change$action, is_not_null(change$action) as D_change$is_update
FROM change
WHERE is_not_null(D_change$action)
  1. Example:
mysql> create table t(a int);
Query OK, 0 rows affected (0.03 sec)

mysql> insert into t values(1),(2),(3);
Query OK, 3 rows affected (0.04 sec)

mysql> create stream s on table t append_only=false;
Query OK, 0 rows affected (0.04 sec)

mysql> insert into t values(4),(5);
Query OK, 2 rows affected (0.05 sec)

mysql> delete from t where a=1;
Query OK, 1 row affected (0.08 sec)

mysql> update t set a=0 where a=2;
Query OK, 1 row affected (0.08 sec)

mysql> select _row_version, _origin_block_row_num, _origin_block_id, _origin_version,a from t order by a;
+--------------+-----------------------+----------------------------------------+-----------------+------+
| _row_version | _origin_block_row_num | _origin_block_id                       | _origin_version | a    |
+--------------+-----------------------+----------------------------------------+-----------------+------+
|            1 |                     1 | 35553922952514720191061844481455971254 |            1188 |    0 |
|            0 |                     2 | 35553922952514720191061844481455971254 |            1188 |    3 |
|            0 |                  NULL |                                   NULL |            NULL |    4 |
|            0 |                  NULL |                                   NULL |            NULL |    5 |
+--------------+-----------------------+----------------------------------------+-----------------+------+
4 rows in set (0.03 sec)
Read 4 rows, 357.00 B in 0.007 sec., 559.35 rows/sec., 48.75 KiB/sec.

mysql> select * from s order by change$row_id;
+------+---------------+----------------------------------------+------------------+
| a    | change$action | change$row_id                          | change$is_update |
+------+---------------+----------------------------------------+------------------+
|    1 | DELETE        | 1abf6fbd82fc46caa57660d6f41c5bb6000000 |                0 |
|    0 | INSERT        | 1abf6fbd82fc46caa57660d6f41c5bb6000001 |                1 |
|    2 | DELETE        | 1abf6fbd82fc46caa57660d6f41c5bb6000001 |                1 |
|    4 | INSERT        | fd837066ab7c422d9abac273c85ce9f9000000 |                0 |
|    5 | INSERT        | fd837066ab7c422d9abac273c85ce9f9000001 |                0 |
+------+---------------+----------------------------------------+------------------+
5 rows in set (0.26 sec)
Read 14 rows, 931.00 B in 0.042 sec., 336.15 rows/sec., 21.83 KiB/sec.

root@localhost:8000/default> explain select * from s;
-[ EXPLAIN ]-----------------------------------
UnionAll
├── output columns: [_change_insert.a (#0), change$action (#6), change$row_id (#7), change$is_update (#16)]
├── estimated rows: 0.40
├── EvalScalar
│   ├── output columns: [_change_insert.a (#0), change$action (#6), change$row_id (#7), change$is_update (#16)]
│   ├── expressions: [is_not_null(_change.d_change$action (#14))]
│   ├── estimated rows: 0.20
│   └── Filter
│       ├── output columns: [_change_insert.a (#0), change$action (#6), change$row_id (#7), d_change$action (#14)]
│       ├── filters: [is_not_null(a.change$action (#6)), is_true(CAST(NOT is_not_null(a.change$row_id (#7)) OR NOT is_not_null(d.d_change$row_id (#15)) AS Boolean NULL) OR a._row_version (#4) > d._row_version (#12))]
│       ├── estimated rows: 0.20
│       └── HashJoin
│           ├── output columns: [_change_insert.a (#0), _change_insert._row_version (#4), change$action (#6), change$row_id (#7), _change_delete._row_version (#12), d_change$action (#14), d_change$row_id (#15)]
│           ├── join type: FULL OUTER
│           ├── build keys: [d.d_change$row_id (#15)]
│           ├── probe keys: [a.change$row_id (#7)]
│           ├── filters: []
│           ├── estimated rows: 1.00
│           ├── EvalScalar(Build)
│           │   ├── output columns: [_change_delete._row_version (#12), d_change$action (#14), d_change$row_id (#15)]
│           │   ├── expressions: ['DELETE', if(CAST(is_not_null(_change_delete._origin_block_id (#10)) AS Boolean NULL), concat(to_uuid(_change_delete._origin_block_id (#10)), lpad(to_hex(CAST(_change_delete._origin_block_row_num (#11) AS Int64 NULL)), 6, '0')), CAST(s._base_row_id (#13) AS String NULL))]
│           │   ├── estimated rows: 1.00
│           │   └── TableScan
│           │       ├── table: default.default.s
│           │       ├── output columns: [_origin_block_id (#10), _origin_block_row_num (#11), _row_version (#12), _base_row_id (#13)]
│           │       ├── read rows: 3
│           │       ├── read bytes: 0
│           │       ├── partitions total: 1
│           │       ├── partitions scanned: 1
│           │       ├── push downs: [filters: [], limit: NONE]
│           │       └── estimated rows: 1.00
│           └── EvalScalar(Probe)
│               ├── output columns: [_change_insert.a (#0), _change_insert._row_version (#4), change$action (#6), change$row_id (#7)]
│               ├── expressions: ['INSERT', if(CAST(is_not_null(_change_insert._origin_block_id (#2)) AS Boolean NULL), concat(to_uuid(_change_insert._origin_block_id (#2)), lpad(to_hex(CAST(_change_insert._origin_block_row_num (#3) AS Int64 NULL)), 6, '0')), CAST(s._base_row_id (#5) AS String NULL))]
│               ├── estimated rows: 1.00
│               └── TableScan
│                   ├── table: default.default.s
│                   ├── output columns: [a (#0), _origin_block_id (#2), _origin_block_row_num (#3), _row_version (#4), _base_row_id (#5)]
│                   ├── read rows: 4
│                   ├── read bytes: 225
│                   ├── partitions total: 2
│                   ├── partitions scanned: 2
│                   ├── push downs: [filters: [], limit: NONE]
│                   └── estimated rows: 1.00
└── EvalScalar
    ├── output columns: [_change_delete.a (#25), d_change$action (#31), d_change$row_id (#32), change$is_update (#33)]
    ├── expressions: [is_not_null(_change.change$action (#23))]
    ├── estimated rows: 0.20
    └── Filter
        ├── output columns: [change$action (#23), _change_delete.a (#25), d_change$action (#31), d_change$row_id (#32)]
        ├── filters: [is_not_null(d.d_change$action (#31)), is_true(CAST(NOT is_not_null(a.change$row_id (#24)) OR NOT is_not_null(d.d_change$row_id (#32)) AS Boolean NULL) OR a._row_version (#21) > d._row_version (#29))]
        ├── estimated rows: 0.20
        └── HashJoin
            ├── output columns: [_change_insert._row_version (#21), change$action (#23), change$row_id (#24), _change_delete.a (#25), _change_delete._row_version (#29), d_change$action (#31), d_change$row_id (#32)]
            ├── join type: FULL OUTER
            ├── build keys: [d.d_change$row_id (#32)]
            ├── probe keys: [a.change$row_id (#24)]
            ├── filters: []
            ├── estimated rows: 1.00
            ├── EvalScalar(Build)
            │   ├── output columns: [_change_delete.a (#25), _change_delete._row_version (#29), d_change$action (#31), d_change$row_id (#32)]
            │   ├── expressions: ['DELETE', if(CAST(is_not_null(_change_delete._origin_block_id (#27)) AS Boolean NULL), concat(to_uuid(_change_delete._origin_block_id (#27)), lpad(to_hex(CAST(_change_delete._origin_block_row_num (#28) AS Int64 NULL)), 6, '0')), CAST(s._base_row_id (#30) AS String NULL))]
            │   ├── estimated rows: 1.00
            │   └── TableScan
            │       ├── table: default.default.s
            │       ├── output columns: [a (#25), _origin_block_id (#27), _origin_block_row_num (#28), _row_version (#29), _base_row_id (#30)]
            │       ├── read rows: 3
            │       ├── read bytes: 45
            │       ├── partitions total: 1
            │       ├── partitions scanned: 1
            │       ├── push downs: [filters: [], limit: NONE]
            │       └── estimated rows: 1.00
            └── EvalScalar(Probe)
                ├── output columns: [_change_insert._row_version (#21), change$action (#23), change$row_id (#24)]
                ├── expressions: ['INSERT', if(CAST(is_not_null(_change_insert._origin_block_id (#19)) AS Boolean NULL), concat(to_uuid(_change_insert._origin_block_id (#19)), lpad(to_hex(CAST(_change_insert._origin_block_row_num (#20) AS Int64 NULL)), 6, '0')), CAST(s._base_row_id (#22) AS String NULL))]
                ├── estimated rows: 1.00
                └── TableScan
                    ├── table: default.default.s
                    ├── output columns: [_origin_block_id (#19), _origin_block_row_num (#20), _row_version (#21), _base_row_id (#22)]
                    ├── read rows: 4
                    ├── read bytes: 143
                    ├── partitions total: 2
                    ├── partitions scanned: 2
                    ├── push downs: [filters: [], limit: NONE]
                    └── estimated rows: 1.00

85 rows explain in 0.220 sec. Processed 0 rows, 0 B (0 rows/s, 0 B/s)

Fixes #14271

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@zhyass zhyass marked this pull request as draft January 8, 2024 16:39
@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Jan 8, 2024
@zhyass
Copy link
Member Author

zhyass commented Jan 16, 2024

Need wait for the bug #14334 fix

@JackTan25
Copy link
Collaborator

JackTan25 commented Jan 16, 2024

should we support the replace and merge into for this actions track? cc @zhyass

@zhyass
Copy link
Member Author

zhyass commented Jan 16, 2024

should we support the replace and merge into for this actions track? cc @zhyass

Not supported in current pr, planned to be supported in a later.

@zhyass zhyass force-pushed the feat_stream branch 2 times, most recently from 3d7ecff to d596cbc Compare January 17, 2024 04:27
@zhyass zhyass marked this pull request as ready for review January 17, 2024 06:13
@zhyass zhyass marked this pull request as draft January 17, 2024 08:20
@zhyass zhyass added the ci-cloud Build docker image for cloud test label Jan 17, 2024
@JackTan25
Copy link
Collaborator

I'm not familiar with this pr's logic, can we give a logic picture for all cases if possible? I think that will be more clear.

@zhyass zhyass requested a review from sundy-li January 17, 2024 09:35
@zhyass zhyass force-pushed the feat_stream branch 3 times, most recently from 024d57c to b004cee Compare January 17, 2024 16:24
@zhyass zhyass marked this pull request as ready for review January 17, 2024 17:31
@zhyass
Copy link
Member Author

zhyass commented Jan 17, 2024

I'm not familiar with this pr's logic, can we give a logic picture for all cases if possible? I think that will be more clear.

Added a design description in the summary.

@BohuTANG BohuTANG added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jan 18, 2024
@zhyass zhyass marked this pull request as draft January 18, 2024 07:18
@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jan 18, 2024

This comment was marked as outdated.

@zhyass zhyass removed the ci-cloud Build docker image for cloud test label Jan 18, 2024
Copy link
Contributor

Docker Image for PR

  • tag: pr-14272-7c1e13c

note: this image tag is only available for internal use,
please check the internal doc for more details.

@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jan 18, 2024

This comment was marked as outdated.

@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jan 18, 2024
Copy link
Contributor

Docker Image for PR

  • tag: pr-14272-646b1a7

note: this image tag is only available for internal use,
please check the internal doc for more details.

@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jan 19, 2024
Copy link
Contributor

Docker Image for PR

  • tag: pr-14272-c129dbd

note: this image tag is only available for internal use,
please check the internal doc for more details.

@dantengsky

This comment was marked as off-topic.

@zhyass zhyass marked this pull request as ready for review January 20, 2024 02:58
@dantengsky
Copy link
Member

dantengsky commented Jan 20, 2024

👍

LGTM

  • The following PR chore: minor refactor zhyass/databend#11

    • format the SQL template used during the binding phase of the stream
      hope it makes the code easier to read
    • rename ChangeAction to ChangeType

    Hope these minor refactors make sense (and please feel free to reject it, if not appropriate)

  • the way that binding the stream is handy, but it may introduce some unexpected results, e.g.

    The results of query select * from s _change_delete and select * from s may differ, where s is a standard stream.

    This issue should be resolvable, let's address it in another PR.

@BohuTANG BohuTANG merged commit e56422d into datafuselabs:main Jan 20, 2024
73 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-cloud Build docker image for cloud test pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: stream support track update and delete
4 participants