Skip to content

Commit

Permalink
feat(generated_columns): support select generated columns from source (
Browse files Browse the repository at this point in the history
…risingwavelabs#8841)

Co-authored-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: stonepage <40830455+st1page@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 31, 2023
1 parent e51b240 commit dbbd1cb
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 171 deletions.
53 changes: 52 additions & 1 deletion e2e_test/source/basic/kafka.slt
Expand Up @@ -244,7 +244,6 @@ create source s18 with (
) row format avro
row schema location 'file:///risingwave/avro-complex-schema.avsc'


# we cannot use confluent schema registry when connector is not kafka
statement error
create table s19
Expand All @@ -256,6 +255,38 @@ with (
row format avro
row schema location confluent schema registry 'http://127.0.0.1:8081'

# we cannot create debezium source with generated column
statement error
create table s20 (
id integer primary key,
first_name varchar,
last_name varchar,
email varchar,
gen_id integer as id+1
) with (
connector = 'kafka',
topic = 'debezium_log',
properties.bootstrap.server = '127.0.0.1:29092'
) row format debezium_json

# create kafka source table with generated column
statement ok
create table s21 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format json

# create kafka source with generated column
statement ok
create source s22 (v1 int as v2-1, v2 int, v3 int as v2+1) with (
connector = 'kafka',
topic = 'kafka_4_partition_topic_generated_columns',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format json

statement ok
flush;

Expand Down Expand Up @@ -422,6 +453,20 @@ select count(*) from s16
----
0

query III rowsort
select * from s21;
----
19 20 21
20 21 22
NULL NULL NULL

query III rowsort
select * from s22;
----
19 20 21
20 21 22
NULL NULL NULL

statement ok
drop materialized view source_mv1

Expand Down Expand Up @@ -463,3 +508,9 @@ drop source s17

statement ok
drop source s18

statement ok
drop table s21

statement ok
drop source s22
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Expand Up @@ -127,7 +127,8 @@ message StreamSource {
catalog.Table state_table = 2;
optional uint32 row_id_index = 3;
repeated plan_common.ColumnCatalog columns = 4;
repeated int32 pk_column_ids = 5;
reserved "pk_column_ids";
reserved 5;
map<string, string> properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
Expand Down
@@ -0,0 +1,3 @@
{"v1": 10, "v2": 20}
{"v2": 21, "v3": "10"}
{"v1": 0}
2 changes: 1 addition & 1 deletion src/common/src/catalog/column.rs
Expand Up @@ -207,7 +207,7 @@ impl ColumnDesc {
Self::from_field_with_column_id(field, 0)
}

pub fn is_generated_column(&self) -> bool {
pub fn is_generated(&self) -> bool {
self.generated_column.is_some()
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/compute/tests/integration_tests.rs
Expand Up @@ -121,15 +121,9 @@ async fn test_table_materialize() -> StreamResult<()> {
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
));
let pk_column_ids = vec![0];
let row_id_index: usize = 0;
let source_builder = create_source_desc_builder(
&schema,
pk_column_ids,
Some(row_id_index),
source_info,
properties,
);
let source_builder =
create_source_desc_builder(&schema, Some(row_id_index), source_info, properties);

// Ensure the source exists.
let source_desc = source_builder.build().await.unwrap();
Expand Down
Expand Up @@ -9,3 +9,11 @@
└─StreamProject { exprs: [(v2 - 1:Int32) as $expr1, v2, (v2 + 1:Int32) as $expr2, _row_id] }
└─StreamDml { columns: [v2, _row_id] }
└─StreamSource
- name: source with generated columns
sql: |
create source s1 (v1 int as v2-1, v2 int, v3 int as v2+1) with (connector = 'kinesis') ROW FORMAT JSON;;
select v3 from s1
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [(v2 + 1:Int32) as $expr1] }
└─BatchSource { source: "s1", columns: ["v2", "_row_id"], filter: (None, None) }
2 changes: 1 addition & 1 deletion src/frontend/src/binder/insert.rs
Expand Up @@ -120,7 +120,7 @@ impl Binder {

// TODO(yuhao): refine this if row_id is always the last column.
//
// `row_id_index` in bin insert operation should rule out generated column
// `row_id_index` in insert operation should rule out generated column
let row_id_index = {
if let Some(row_id_index) = table_catalog.row_id_index {
let mut cnt = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Expand Up @@ -693,10 +693,10 @@ pub async fn handle_create_source(

bind_sql_column_constraints(&session, name.clone(), &mut columns, stmt.columns)?;

if columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow generated columns on source
if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(RwError::from(ErrorCode::BindError(
"Generated columns on source has not been implemented.".to_string(),
"Generated columns are only allowed in an append only source.".to_string(),
)));
}

Expand Down
17 changes: 11 additions & 6 deletions src/frontend/src/handler/create_table.rs
Expand Up @@ -400,6 +400,14 @@ pub(crate) async fn gen_create_table_plan_with_source(

bind_sql_column_constraints(session, table_name.real_value(), &mut columns, column_defs)?;

if row_id_index.is_none() && columns.iter().any(|c| c.is_generated()) {
// TODO(yuhao): allow delete from a non append only source
return Err(ErrorCode::BindError(
"Generated columns are only allowed in an append only source.".to_string(),
)
.into());
}

gen_table_plan_inner(
context.into(),
table_name,
Expand Down Expand Up @@ -532,19 +540,15 @@ fn gen_table_plan_inner(
let source_catalog = source.as_ref().map(|source| Rc::new((source).into()));
let source_node: PlanRef = LogicalSource::new(
source_catalog,
columns
.iter()
.map(|column| column.column_desc.clone())
.collect_vec(),
pk_column_ids,
columns.clone(),
row_id_index,
false,
true,
context.clone(),
)
.into();

let required_cols = FixedBitSet::with_capacity(source_node.schema().len());
let required_cols = FixedBitSet::with_capacity(columns.len());
let mut plan_root = PlanRoot::new(
source_node,
RequiredDist::Any,
Expand Down Expand Up @@ -572,6 +576,7 @@ fn gen_table_plan_inner(
name,
columns,
definition,
pk_column_ids,
row_id_index,
append_only,
watermark_descs,
Expand Down
30 changes: 27 additions & 3 deletions src/frontend/src/optimizer/mod.rs
@@ -1,3 +1,4 @@
use std::collections::HashMap;
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -35,7 +36,7 @@ pub use logical_optimization::*;
pub use optimizer_context::*;
use plan_expr_rewriter::ConstEvalRewriter;
use property::Order;
use risingwave_common::catalog::{ColumnCatalog, ConflictBehavior, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, ColumnId, ConflictBehavior, Field, Schema};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -392,6 +393,7 @@ impl PlanRoot {
table_name: String,
columns: Vec<ColumnCatalog>,
definition: String,
pk_column_ids: Vec<ColumnId>,
row_id_index: Option<usize>,
append_only: bool,
watermark_descs: Vec<WatermarkDesc>,
Expand Down Expand Up @@ -433,14 +435,36 @@ impl PlanRoot {
true => ConflictBehavior::NoCheck,
false => ConflictBehavior::Overwrite,
};

let pk_column_indices = {
let mut id_to_idx = HashMap::new();

columns.iter().enumerate().for_each(|(idx, c)| {
id_to_idx.insert(c.column_id(), idx);
});
pk_column_ids
.iter()
.map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
.collect_vec()
};

let table_required_dist = {
let mut bitset = FixedBitSet::with_capacity(columns.len());
for idx in &pk_column_indices {
bitset.insert(*idx);
}
RequiredDist::ShardByKey(bitset)
};

StreamMaterialize::create_for_table(
stream_plan,
table_name,
self.required_dist.clone(),
self.required_order.clone(),
table_required_dist,
Order::any(),
columns,
definition,
conflict_behavior,
pk_column_indices,
row_id_index,
version,
)
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_source.rs
Expand Up @@ -97,8 +97,10 @@ impl ToBatchPb for BatchSource {
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
columns: source_catalog
.columns
columns: self
.logical
.core
.column_catalog
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
Expand Down
38 changes: 10 additions & 28 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Expand Up @@ -15,14 +15,13 @@ use std::collections::HashMap;
use std::rc::Rc;

use derivative::Derivative;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;

use super::super::utils::TableCatalogBuilder;
use super::GenericPlanNode;
use crate::catalog::source_catalog::SourceCatalog;
use crate::catalog::ColumnId;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;
use crate::{TableCatalog, WithOptions};
Expand All @@ -35,8 +34,7 @@ pub struct Source {
pub catalog: Option<Rc<SourceCatalog>>,
/// NOTE(Yuanxin): Here we store column descriptions, pk column ids, and row id index for plan
/// generating, even if there is no external stream source.
pub column_descs: Vec<ColumnDesc>,
pub pk_col_ids: Vec<ColumnId>,
pub column_catalog: Vec<ColumnCatalog>,
pub row_id_index: Option<usize>,
/// Whether the "SourceNode" should generate the row id column for append only source
pub gen_row_id: bool,
Expand All @@ -49,24 +47,16 @@ pub struct Source {

impl GenericPlanNode for Source {
fn schema(&self) -> Schema {
let fields = self.non_generated_columns().map(Into::into).collect();
// let fields = self.column_descs.iter().map(Into::into).collect();
let fields = self
.column_catalog
.iter()
.map(|c| (&c.column_desc).into())
.collect();
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let mut id_to_idx = HashMap::new();
// self.column_descs.iter().filter(|c| !c.is_generated_column()).enumerate().for_each(|(idx,
// c)| {
self.non_generated_columns()
.enumerate()
.for_each(|(idx, c)| {
id_to_idx.insert(c.column_id, idx);
});
self.pk_col_ids
.iter()
.map(|c| id_to_idx.get(c).copied())
.collect::<Option<Vec<_>>>()
self.row_id_index.map(|idx| vec![idx])
}

fn ctx(&self) -> OptimizerContextRef {
Expand All @@ -75,12 +65,11 @@ impl GenericPlanNode for Source {

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let non_generated_columns_count = self.non_generated_columns().count();
match pk_indices {
Some(pk_indices) => {
FunctionalDependencySet::with_key(non_generated_columns_count, &pk_indices)
FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
}
None => FunctionalDependencySet::new(non_generated_columns_count),
None => FunctionalDependencySet::new(self.column_catalog.len()),
}
}
}
Expand Down Expand Up @@ -114,11 +103,4 @@ impl Source {

builder.build(vec![], 1)
}

/// Non-generated columns
fn non_generated_columns(&self) -> impl Iterator<Item = &ColumnDesc> {
self.column_descs
.iter()
.filter(|c| !c.is_generated_column())
}
}

0 comments on commit dbbd1cb

Please sign in to comment.