diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index 37b8db6b01b84..ae1feb85f6dea 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -34,6 +34,10 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { .default_session() .get_settings() .set_data_retention_time_in_days(0)?; + fixture + .default_session() + .get_settings() + .set_enable_experimental_virtual_column(1)?; fixture.create_default_database().await?; fixture.create_variant_table().await?; diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs index a63824e810f0a..b514f3a63cb3b 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs @@ -37,6 +37,10 @@ use jsonb::OwnedJsonb; async fn test_virtual_column_builder() -> Result<()> { let fixture = TestFixture::setup_with_custom(EESetup::new()).await?; + fixture + .default_session() + .get_settings() + .set_enable_experimental_virtual_column(1)?; fixture.create_default_database().await?; fixture.create_variant_table().await?; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 6c19f19645f7e..031f05dc15783 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1299,6 +1299,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=100)), }), + ("enable_experimental_virtual_column", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enables experimental virtual column", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index fcd26c87b4614..193f9f225dc36 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -952,4 +952,12 @@ impl Settings { pub fn get_trace_sample_rate(&self) -> Result { self.try_get_u64("trace_sample_rate") } + + pub fn set_enable_experimental_virtual_column(&self, val: u64) -> Result<()> { + self.try_set_u64("enable_experimental_virtual_column", val) + } + + pub fn get_enable_experimental_virtual_column(&self) -> Result { + Ok(self.try_get_u64("enable_experimental_virtual_column")? == 1) + } } diff --git a/src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs b/src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs index 87475488a78d3..3d7c79a427dd4 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs @@ -114,9 +114,11 @@ impl MutationExpression { .ok_or_else(|| ErrorCode::Internal("Can't get target table index"))?; // Remove stream columns in source context. - source_context - .columns - .retain(|v| v.visibility == Visibility::Visible); + source_context.columns.retain(|col| { + let read_guard = binder.metadata.read(); + let column_entry = read_guard.column(col.index); + !column_entry.is_stream_column() + }); // Add source table columns to required columns. for column_index in source_context.column_set().iter() { @@ -199,9 +201,12 @@ impl MutationExpression { let from_s_expr = if let Some(from) = from { let (from_s_expr, mut from_context) = binder.bind_table_reference(&mut bind_context, from)?; - from_context - .columns - .retain(|v| v.visibility == Visibility::Visible); + // Remove stream columns in source context. + let read_guard = binder.metadata.read(); + from_context.columns.retain(|col| { + let column_entry = read_guard.column(col.index); + !column_entry.is_stream_column() + }); for column in from_context.columns.iter() { required_columns.insert(column.index); bind_context.add_column_binding(column.clone()); diff --git a/src/query/sql/src/planner/binder/bind_query/bind_select.rs b/src/query/sql/src/planner/binder/bind_query/bind_select.rs index a6b50a7daadff..12650ec9b02dc 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind_select.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind_select.rs @@ -75,10 +75,14 @@ impl Binder { } // whether allow rewrite virtual column and pushdown - let allow_virtual_column = LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn) - .is_ok(); - bind_context.allow_virtual_column = allow_virtual_column; + bind_context.allow_virtual_column = self + .ctx + .get_settings() + .get_enable_experimental_virtual_column() + .unwrap_or_default() + && LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn) + .is_ok(); let (mut s_expr, mut from_context) = if stmt.from.is_empty() { let select_list = &stmt.select_list; diff --git a/src/query/sql/src/planner/metadata/metadata.rs b/src/query/sql/src/planner/metadata/metadata.rs index 9142d4f486faa..11e16b1541f7f 100644 --- a/src/query/sql/src/planner/metadata/metadata.rs +++ b/src/query/sql/src/planner/metadata/metadata.rs @@ -26,6 +26,7 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::InternalColumn; use databend_common_catalog::table::Table; use databend_common_expression::display::display_tuple_field_name; +use databend_common_expression::is_stream_column_id; use databend_common_expression::types::DataType; use databend_common_expression::ComputedExpr; use databend_common_expression::TableDataType; @@ -766,6 +767,19 @@ impl ColumnEntry { ColumnEntry::VirtualColumn(VirtualColumn { table_index, .. }) => Some(*table_index), } } + + pub fn is_stream_column(&self) -> bool { + if let ColumnEntry::BaseTableColumn(BaseTableColumn { + column_id: Some(column_id), + .. + }) = self + { + if is_stream_column_id(*column_id) { + return true; + } + } + false + } } pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool { diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index fd02cca6a4882..2ac26e344c57f 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -171,12 +171,10 @@ use crate::plans::WindowOrderBy; use crate::BaseTableColumn; use crate::BindContext; use crate::ColumnBinding; -use crate::ColumnBindingBuilder; use crate::ColumnEntry; use crate::DefaultExprBinder; use crate::IndexType; use crate::MetadataRef; -use crate::Visibility; /// A helper for type checking. /// @@ -601,15 +599,6 @@ impl<'a> TypeChecker<'a> { // as we cast JSON null to SQL NULL. let target_type = if data_type.remove_nullable() == DataType::Variant { let target_type = checked_expr.data_type().nest_wrap_nullable(); - - if let Some(new_scalar) = self.try_rewrite_virtual_column_cast( - expr.span(), - &scalar, - &target_type, - false, - ) { - return Ok(Box::new((new_scalar, target_type))); - } target_type // if the source type is nullable, cast target type should also be nullable. } else if data_type.is_nullable_or_null() { @@ -659,15 +648,6 @@ impl<'a> TypeChecker<'a> { // as we cast JSON null to SQL NULL. let target_type = if data_type.remove_nullable() == DataType::Variant { let target_type = checked_expr.data_type().nest_wrap_nullable(); - - if let Some(new_scalar) = self.try_rewrite_virtual_column_cast( - expr.span(), - &scalar, - &target_type, - true, - ) { - return Ok(Box::new((new_scalar, target_type))); - } target_type } else { checked_expr.data_type().clone() @@ -1194,105 +1174,6 @@ impl<'a> TypeChecker<'a> { Ok(Box::new((scalar, data_type))) } - fn try_rewrite_virtual_column_cast( - &mut self, - span: Span, - scalar: &ScalarExpr, - target_type: &DataType, - is_try: bool, - ) -> Option { - let Ok(cast_ty) = infer_schema_type(target_type) else { - return None; - }; - let ScalarExpr::BoundColumnRef(BoundColumnRef { ref column, .. }) = scalar else { - return None; - }; - let table_index = column.table_index?; - - if column.index >= self.metadata.read().columns().len() { - return None; - } - - // Change the type of virtual column to user specified cast type avoids additional casting overhead, - // since the user usually knows the real type. - let column_entry = self.metadata.read().column(column.index).clone(); - let ColumnEntry::VirtualColumn(virtual_column) = column_entry else { - return None; - }; - - let virtual_column_name = if is_try { - format!( - "try_cast({} as {})", - column.column_name, - target_type.remove_nullable().to_string().to_lowercase() - ) - } else { - format!( - "{}::{}", - column.column_name, - target_type.remove_nullable().to_string().to_lowercase() - ) - }; - - // Try resolve the virtual column with the cast type. - if let Ok(box (new_scalar, _)) = self.resolve(&Expr::ColumnRef { - span, - column: ColumnRef { - database: column - .database_name - .as_ref() - .map(|name| Identifier::from_name(span, name)), - table: column - .table_name - .as_ref() - .map(|name| Identifier::from_name(span, name)), - column: ColumnID::Name(Identifier::from_name(span, &virtual_column_name)), - }, - }) { - return Some(new_scalar); - } - - // Generate a new virtual column with the cast type. - let database_name = column.database_name.clone(); - let table_name = column.table_name.clone(); - - let mut guard = self.metadata.write(); - let new_column_index = guard.add_virtual_column( - virtual_column.table_index, - virtual_column.source_column_name.clone(), - virtual_column.source_column_id, - virtual_column.column_id, - virtual_column_name.clone(), - cast_ty, - is_try, - ); - - let new_column_binding = ColumnBindingBuilder::new( - virtual_column_name, - new_column_index, - Box::new(target_type.clone()), - Visibility::InVisible, - ) - .table_name(table_name) - .database_name(database_name) - .table_index(Some(table_index)) - .build(); - // Add virtual column with the cast type to the context. - self.bind_context - .add_column_binding(new_column_binding.clone()); - - if let Some(scan_id) = guard.base_column_scan_id(virtual_column.column_index) { - let mut base_column_scan_id = HashMap::new(); - base_column_scan_id.insert(new_column_index, scan_id); - guard.add_base_column_scan_id(base_column_scan_id); - } - - Some(ScalarExpr::BoundColumnRef(BoundColumnRef { - span, - column: new_column_binding, - })) - } - // TODO: remove this function fn rewrite_substring(args: &mut [ScalarExpr]) { if let ScalarExpr::ConstantExpr(expr) = &args[1] { @@ -3996,22 +3877,12 @@ impl<'a> TypeChecker<'a> { { if data_type.remove_nullable() == DataType::Variant { let target_type = DataType::Nullable(Box::new(DataType::String)); - let new_scalar = if let Some(new_scalar) = self - .try_rewrite_virtual_column_cast( - scalar.span(), - &scalar, - &target_type, - false, - ) { - new_scalar - } else { - ScalarExpr::CastExpr(CastExpr { - span: scalar.span(), - is_try: false, - argument: Box::new(scalar), - target_type: Box::new(target_type.clone()), - }) - }; + let new_scalar = ScalarExpr::CastExpr(CastExpr { + span: scalar.span(), + is_try: false, + argument: Box::new(scalar), + target_type: Box::new(target_type.clone()), + }); return Some(Ok(Box::new((new_scalar, target_type)))); } } @@ -4161,21 +4032,12 @@ impl<'a> TypeChecker<'a> { ) { if func_name == "get_by_keypath_string" { let target_type = DataType::Nullable(Box::new(DataType::String)); - let new_scalar = if let Some(new_scalar) = self.try_rewrite_virtual_column_cast( - scalar.span(), - &scalar, - &target_type, - false, - ) { - new_scalar - } else { - ScalarExpr::CastExpr(CastExpr { - span: scalar.span(), - is_try: false, - argument: Box::new(scalar), - target_type: Box::new(target_type.clone()), - }) - }; + let new_scalar = ScalarExpr::CastExpr(CastExpr { + span: scalar.span(), + is_try: false, + argument: Box::new(scalar), + target_type: Box::new(target_type.clone()), + }); return Some(Ok(Box::new((new_scalar, target_type)))); } else { return Some(Ok(Box::new((scalar, data_type)))); diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index ca91c226e7133..639944e66d759 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -76,6 +76,13 @@ impl VirtualColumnBuilder { ctx: Arc, table_info: &TableInfo, ) -> Option { + if !ctx + .get_settings() + .get_enable_experimental_virtual_column() + .unwrap_or_default() + { + return None; + } if LicenseManagerSwitch::instance() .check_enterprise_enabled(ctx.get_license_key(), Feature::VirtualColumn) .is_err() diff --git a/src/query/storages/fuse/src/pruning/block_pruner.rs b/src/query/storages/fuse/src/pruning/block_pruner.rs index 002aa07de5ec0..a08e270689fc1 100644 --- a/src/query/storages/fuse/src/pruning/block_pruner.rs +++ b/src/query/storages/fuse/src/pruning/block_pruner.rs @@ -235,7 +235,10 @@ impl BlockPruner { // Check whether can read virtual columns, // and ignore the source columns. let virtual_block_meta = virtual_column_pruner - .prune_virtual_columns(&block_meta.virtual_block_meta) + .prune_virtual_columns( + block_meta.row_count, + &block_meta.virtual_block_meta, + ) .await?; prune_result.virtual_block_meta = virtual_block_meta; } diff --git a/src/query/storages/fuse/src/pruning/virtual_column_pruner.rs b/src/query/storages/fuse/src/pruning/virtual_column_pruner.rs index f76d77b1762a7..3c79832efc8af 100644 --- a/src/query/storages/fuse/src/pruning/virtual_column_pruner.rs +++ b/src/query/storages/fuse/src/pruning/virtual_column_pruner.rs @@ -21,6 +21,7 @@ use databend_common_catalog::plan::VirtualColumnInfo; use databend_common_exception::Result; use databend_storages_common_pruner::VirtualBlockMetaIndex; use databend_storages_common_table_meta::meta::VirtualBlockMeta; +use log::warn; pub struct VirtualColumnPruner { virtual_column: VirtualColumnInfo, @@ -42,41 +43,55 @@ impl VirtualColumnPruner { #[async_backtrace::framed] pub async fn prune_virtual_columns( &self, + row_count: u64, virtual_block_meta: &Option, ) -> Result> { - if let Some(virtual_block_meta) = virtual_block_meta { - let mut virtual_column_metas = BTreeMap::new(); - let mut need_source_column_ids = HashSet::new(); - for virtual_column_field in &self.virtual_column.virtual_column_fields { - if let Some(virtual_column_meta) = virtual_block_meta - .virtual_column_metas - .get(&virtual_column_field.column_id) - { - virtual_column_metas - .insert(virtual_column_field.column_id, virtual_column_meta.clone()); - continue; - } - // The virtual column does not exist and must be generated from the source column. - need_source_column_ids.insert(virtual_column_field.source_column_id); - } - // The remaining source column can be ignored. - let mut ignored_source_column_ids = HashSet::new(); - for column_id in self - .virtual_column - .source_column_ids - .difference(&need_source_column_ids) - { - ignored_source_column_ids.insert(*column_id); - } + let Some(virtual_block_meta) = virtual_block_meta else { + return Ok(None); + }; + if virtual_block_meta.virtual_column_size == 0 { + return Ok(None); + } - if !virtual_column_metas.is_empty() { - let virtual_block_meta = VirtualBlockMetaIndex { - virtual_block_location: virtual_block_meta.virtual_location.0.clone(), - virtual_column_metas, - ignored_source_column_ids, - }; - return Ok(Some(virtual_block_meta)); + let mut virtual_column_metas = BTreeMap::new(); + let mut need_source_column_ids = HashSet::new(); + for virtual_column_field in &self.virtual_column.virtual_column_fields { + if let Some(virtual_column_meta) = virtual_block_meta + .virtual_column_metas + .get(&virtual_column_field.column_id) + { + // If the number of rows is not same, the virtual column data has been corrupted, ignore the data. + if virtual_column_meta.num_values != row_count { + warn!("Virtual column row count mismatch, location: {}, block rows: {}, virtual column rows: {}", + virtual_block_meta.virtual_location.0, + row_count, + virtual_column_meta.num_values + ); + return Ok(None); + } + virtual_column_metas + .insert(virtual_column_field.column_id, virtual_column_meta.clone()); + continue; } + // The virtual column does not exist and must be generated from the source column. + need_source_column_ids.insert(virtual_column_field.source_column_id); + } + // The remaining source column can be ignored. + let mut ignored_source_column_ids = HashSet::new(); + for column_id in self + .virtual_column + .source_column_ids + .difference(&need_source_column_ids) + { + ignored_source_column_ids.insert(*column_id); + } + if !virtual_column_metas.is_empty() { + let virtual_block_meta = VirtualBlockMetaIndex { + virtual_block_location: virtual_block_meta.virtual_location.0.clone(), + virtual_column_metas, + ignored_source_column_ids, + }; + return Ok(Some(virtual_block_meta)); } Ok(None) } diff --git a/src/query/storages/fuse/src/statistics/accumulator.rs b/src/query/storages/fuse/src/statistics/accumulator.rs index 131f48eaab03b..84edd9aabe499 100644 --- a/src/query/storages/fuse/src/statistics/accumulator.rs +++ b/src/query/storages/fuse/src/statistics/accumulator.rs @@ -78,6 +78,13 @@ impl VirtualColumnAccumulator { schema: &Arc, virtual_schema: &Option, ) -> Option { + if !ctx + .get_settings() + .get_enable_experimental_virtual_column() + .unwrap_or_default() + { + return None; + } if LicenseManagerSwitch::instance() .check_enterprise_enabled(ctx.get_license_key(), Feature::VirtualColumn) .is_err() diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 4a7f734e2942e..55d820ee4cb18 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -21,6 +21,9 @@ CREATE DATABASE test_virtual_column statement ok USE test_virtual_column +statement ok +set enable_experimental_virtual_column = 1; + statement ok drop table if exists t1 @@ -406,6 +409,9 @@ select id, data['id'], data['create'], data['text'], data['user']['id'], data['r 12 12 "3/19" "vv" 16 10 {"lat":4.0} 4.0 NULL NULL 13 13 "4/18" "u" 13 6 {"lat":3.0} 3.0 NULL NULL +statement ok +set enable_experimental_virtual_column = 0; + statement ok DROP DATABASE test_virtual_column diff --git a/tests/sqllogictests/suites/mode/standalone/ee/explain_virtual_column.test b/tests/sqllogictests/suites/mode/standalone/ee/explain_virtual_column.test index 8c6bcb5a71401..6e3be13c63f48 100644 --- a/tests/sqllogictests/suites/mode/standalone/ee/explain_virtual_column.test +++ b/tests/sqllogictests/suites/mode/standalone/ee/explain_virtual_column.test @@ -21,6 +21,9 @@ CREATE DATABASE test_virtual_db statement ok USE test_virtual_db +statement ok +set enable_experimental_virtual_column = 1; + statement ok drop table if exists t1 @@ -112,22 +115,25 @@ Filter query T explain select a, get_by_keypath_string(v, '{"a",0}') from t2 where get_by_keypath_string(v, '{"b","c"}') like '%10%' ---- -Filter -├── output columns: [t2.a (#0), t2.v['a'][0]::string (#7)] -├── filters: [is_true(like(t2.v['b']['c']::string (#8), '%10%'))] +EvalScalar +├── output columns: [t2.a (#0), get_by_keypath_string(v, '{"a",0}') (#7)] +├── expressions: [CAST(t2.v['a'][0] (#2) AS String NULL)] ├── estimated rows: 0.50 -└── TableScan - ├── table: default.test_virtual_db.t2 - ├── output columns: [a (#0), v['a'][0]::string (#7), v['b']['c']::string (#8)] - ├── read rows: 1 - ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [is_true(like(t2.v['b']['c']::string (#8), '%10%'))], limit: NONE] - ├── virtual columns: [v['a'][0]::string, v['b']['c']::string] - └── estimated rows: 1.00 - +└── Filter + ├── output columns: [t2.a (#0), t2.v['a'][0] (#2)] + ├── filters: [is_true(like(CAST(t2.v['b']['c'] (#5) AS String NULL), '%10%'))] + ├── estimated rows: 0.50 + └── TableScan + ├── table: default.test_virtual_db.t2 + ├── output columns: [a (#0), v['a'][0] (#2), v['b']['c'] (#5)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [is_true(like(CAST(t2.v['b']['c'] (#5) AS String NULL), '%10%'))], limit: NONE] + ├── virtual columns: [v['a'][0], v['b']['c']] + └── estimated rows: 1.00 query T explain select a, get(v, 'd') from t2 where get(v, 'd') = 20 @@ -166,17 +172,21 @@ TableScan query T explain select v['d']::string, v['d']::int from t2; ---- -TableScan -├── table: default.test_virtual_db.t2 -├── output columns: [v['d']::string (#7), v['d']::int32 (#8)] -├── read rows: 1 -├── read size: < 1 KiB -├── partitions total: 1 -├── partitions scanned: 1 -├── pruning stats: [segments: , blocks: ] -├── push downs: [filters: [], limit: NONE] -├── virtual columns: [v['d']::int32, v['d']::string] -└── estimated rows: 1.00 +EvalScalar +├── output columns: [v['d']::STRING (#7), v['d']::Int32 (#8)] +├── expressions: [CAST(t2.v['d'] (#6) AS String NULL), CAST(t2.v['d'] (#6) AS Int32 NULL)] +├── estimated rows: 1.00 +└── TableScan + ├── table: default.test_virtual_db.t2 + ├── output columns: [v['d'] (#6)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── virtual columns: [v['d']] + └── estimated rows: 1.00 query T select v['d']::string, v['d']::int from t2; @@ -186,21 +196,25 @@ select v['d']::string, v['d']::int from t2; query T explain select a, get_string(v, 'd') from t2 where get_string(v, 'd') like '%20%' ---- -Filter -├── output columns: [t2.a (#0), t2.v['d']::string (#7)] -├── filters: [is_true(like(t2.v['d']::string (#7), '%20%'))] +EvalScalar +├── output columns: [t2.a (#0), get_string(v, 'd') (#7)] +├── expressions: [CAST(t2.v['d'] (#6) AS String NULL)] ├── estimated rows: 0.50 -└── TableScan - ├── table: default.test_virtual_db.t2 - ├── output columns: [a (#0), v['d']::string (#7)] - ├── read rows: 1 - ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [is_true(like(t2.v['d']::string (#7), '%20%'))], limit: NONE] - ├── virtual columns: [v['d']::string] - └── estimated rows: 1.00 +└── Filter + ├── output columns: [t2.a (#0), t2.v['d'] (#6)] + ├── filters: [is_true(like(CAST(t2.v['d'] (#6) AS String NULL), '%20%'))] + ├── estimated rows: 0.50 + └── TableScan + ├── table: default.test_virtual_db.t2 + ├── output columns: [a (#0), v['d'] (#6)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [is_true(like(CAST(t2.v['d'] (#6) AS String NULL), '%20%'))], limit: NONE] + ├── virtual columns: [v['d']] + └── estimated rows: 1.00 query T explain select t2.a, t2.v['b'] from t2 left outer join t1 on t2.v['b']['c'] = t1.a @@ -399,12 +413,339 @@ AggregateFinal ├── virtual columns: [v['b']['c']] └── estimated rows: 1.00 + +statement ok +CREATE OR REPLACE TABLE data_source_a ( + entity_id VARCHAR, + source_id VARCHAR, + metadata_object VARIANT, + content_object VARIANT, + refresh_time TIMESTAMP +); + +statement ok +CREATE OR REPLACE TABLE config_table ( + entity_id VARCHAR, + source_id VARCHAR, + process_mode VARCHAR +); + +statement ok +INSERT INTO data_source_a VALUES +('ENTITY1', 'SRC1', '{"type": "T1"}', '{"event_date": 1609459200000, "category_a": "CA1", "category_b": "CB1"}', CURRENT_TIMESTAMP()); + +statement ok +INSERT INTO config_table VALUES('ENTITY1', 'SRC1', 'standard_mode'); + +query T +EXPLAIN WITH processed_dates AS ( + SELECT + a.entity_id, + a.source_id, + TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE AS event_date + FROM + data_source_a a + JOIN + config_table c + ON + c.entity_id = a.entity_id + AND c.source_id = a.source_id + AND c.process_mode = 'standard_mode' + GROUP BY + 1, 2, 3 +), +data_aggregation AS ( + SELECT + a.entity_id, + a.source_id, + COALESCE(a.metadata_object:type::VARCHAR, 'Unknown') AS type_code, + a.content_object:category_a::VARCHAR AS primary_category, + a.content_object:category_b::VARCHAR AS secondary_category, + p.event_date AS event_date + FROM + data_source_a a + JOIN + processed_dates p + ON + p.entity_id = a.entity_id + AND p.source_id = a.source_id + AND TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE = p.event_date + WHERE + a.content_object:category_a::VARCHAR IS NOT NULL + AND a.content_object:category_b::VARCHAR IS NOT NULL + AND TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE IS NOT NULL + GROUP BY + 1, 2, 3, 4, 5, 6 +) +SELECT * FROM data_aggregation; +---- +EvalScalar +├── output columns: [a.entity_id (#0), a.source_id (#1), event_date (#22), type_code (#26), primary_category (#27), secondary_category (#28)] +├── expressions: [group_item (#23), group_item (#24), group_item (#25)] +├── estimated rows: 0.04 +└── AggregateFinal + ├── output columns: [a.entity_id (#0), a.source_id (#1), type_code (#23), primary_category (#24), secondary_category (#25), event_date (#22)] + ├── group by: [entity_id, source_id, type_code, primary_category, secondary_category, event_date] + ├── aggregate functions: [] + ├── estimated rows: 0.04 + └── AggregatePartial + ├── group by: [entity_id, source_id, type_code, primary_category, secondary_category, event_date] + ├── aggregate functions: [] + ├── estimated rows: 0.04 + └── EvalScalar + ├── output columns: [a.entity_id (#0), a.source_id (#1), event_date (#22), type_code (#23), primary_category (#24), secondary_category (#25)] + ├── expressions: [if(CAST(is_not_null(CAST(a.metadata_object['type'] (#5) AS String NULL)) AS Boolean NULL), CAST(assume_not_null(CAST(a.metadata_object['type'] (#5) AS String NULL)) AS String NULL), true, 'Unknown', NULL), CAST(a.content_object['category_a'] (#6) AS String NULL), CAST(a.content_object['category_b'] (#7) AS String NULL)] + ├── estimated rows: 0.04 + └── HashJoin + ├── output columns: [a.entity_id (#0), a.source_id (#1), a.metadata_object['type'] (#5), a.content_object['category_a'] (#6), a.content_object['category_b'] (#7), event_date (#22)] + ├── join type: INNER + ├── build keys: [p.entity_id (#9), p.source_id (#10), p.event_date (#22)] + ├── probe keys: [a.entity_id (#0), a.source_id (#1), CAST(CAST(CAST(a.content_object['event_date'] (#8) AS Int64 NULL) AS Timestamp NULL) AS Date NULL)] + ├── keys is null equal: [false, false, false] + ├── filters: [] + ├── build join filters: + │ ├── filter id:2, build key:p.entity_id (#9), probe key:a.entity_id (#0), filter type:inlist,min_max + │ └── filter id:3, build key:p.source_id (#10), probe key:a.source_id (#1), filter type:inlist,min_max + ├── estimated rows: 0.04 + ├── EvalScalar(Build) + │ ├── output columns: [a.entity_id (#9), a.source_id (#10), event_date (#22)] + │ ├── expressions: [group_item (#21)] + │ ├── estimated rows: 0.20 + │ └── AggregateFinal + │ ├── output columns: [a.entity_id (#9), a.source_id (#10), event_date (#21)] + │ ├── group by: [entity_id, source_id, event_date] + │ ├── aggregate functions: [] + │ ├── estimated rows: 0.20 + │ └── AggregatePartial + │ ├── group by: [entity_id, source_id, event_date] + │ ├── aggregate functions: [] + │ ├── estimated rows: 0.20 + │ └── EvalScalar + │ ├── output columns: [a.entity_id (#9), a.source_id (#10), event_date (#21)] + │ ├── expressions: [CAST(CAST(CAST(a.content_object['event_date'] (#17) AS Int64 NULL) AS Timestamp NULL) AS Date NULL)] + │ ├── estimated rows: 0.20 + │ └── HashJoin + │ ├── output columns: [a.content_object['event_date'] (#17), a.entity_id (#9), a.source_id (#10)] + │ ├── join type: INNER + │ ├── build keys: [a.entity_id (#9), a.source_id (#10)] + │ ├── probe keys: [c.entity_id (#18), c.source_id (#19)] + │ ├── keys is null equal: [false, false] + │ ├── filters: [] + │ ├── build join filters: + │ │ ├── filter id:0, build key:a.entity_id (#9), probe key:c.entity_id (#18), filter type:inlist,min_max + │ │ └── filter id:1, build key:a.source_id (#10), probe key:c.source_id (#19), filter type:inlist,min_max + │ ├── estimated rows: 0.20 + │ ├── Filter(Build) + │ │ ├── output columns: [a.entity_id (#9), a.source_id (#10), a.content_object['event_date'] (#17)] + │ │ ├── filters: [is_not_null(CAST(CAST(CAST(a.content_object['event_date'] (#17) AS Int64 NULL) AS Timestamp NULL) AS Date NULL))] + │ │ ├── estimated rows: 0.20 + │ │ └── TableScan + │ │ ├── table: default.test_virtual_db.data_source_a + │ │ ├── output columns: [entity_id (#9), source_id (#10), content_object['event_date'] (#17)] + │ │ ├── read rows: 1 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── pruning stats: [segments: , blocks: ] + │ │ ├── push downs: [filters: [is_not_null(CAST(CAST(CAST(a.content_object['event_date'] (#17) AS Int64 NULL) AS Timestamp NULL) AS Date NULL))], limit: NONE] + │ │ ├── virtual columns: [content_object['event_date']] + │ │ └── estimated rows: 1.00 + │ └── Filter(Probe) + │ ├── output columns: [c.entity_id (#18), c.source_id (#19)] + │ ├── filters: [is_true(c.process_mode (#20) = 'standard_mode')] + │ ├── estimated rows: 1.00 + │ └── TableScan + │ ├── table: default.test_virtual_db.config_table + │ ├── output columns: [entity_id (#18), source_id (#19), process_mode (#20)] + │ ├── read rows: 1 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── pruning stats: [segments: , blocks: ] + │ ├── push downs: [filters: [is_true(config_table.process_mode (#20) = 'standard_mode')], limit: NONE] + │ ├── apply join filters: [#0, #1] + │ └── estimated rows: 1.00 + └── Filter(Probe) + ├── output columns: [a.entity_id (#0), a.source_id (#1), a.metadata_object['type'] (#5), a.content_object['category_a'] (#6), a.content_object['category_b'] (#7), a.content_object['event_date'] (#8)] + ├── filters: [is_not_null(CAST(a.content_object['category_a'] (#6) AS String NULL)), is_not_null(CAST(a.content_object['category_b'] (#7) AS String NULL)), is_not_null(CAST(CAST(CAST(a.content_object['event_date'] (#8) AS Int64 NULL) AS Timestamp NULL) AS Date NULL))] + ├── estimated rows: 0.20 + └── TableScan + ├── table: default.test_virtual_db.data_source_a + ├── output columns: [entity_id (#0), source_id (#1), metadata_object['type'] (#5), content_object['category_a'] (#6), content_object['category_b'] (#7), content_object['event_date'] (#8)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [and_filters(and_filters(is_not_null(CAST(a.content_object['category_a'] (#6) AS String NULL)), is_not_null(CAST(a.content_object['category_b'] (#7) AS String NULL))), is_not_null(CAST(CAST(CAST(a.content_object['event_date'] (#8) AS Int64 NULL) AS Timestamp NULL) AS Date NULL)))], limit: NONE] + ├── apply join filters: [#2, #3] + ├── virtual columns: [content_object['category_a'], content_object['category_b'], content_object['event_date'], metadata_object['type']] + └── estimated rows: 1.00 + +query TTTTTT +WITH processed_dates AS ( + SELECT + a.entity_id, + a.source_id, + TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE AS event_date + FROM + data_source_a a + JOIN + config_table c + ON + c.entity_id = a.entity_id + AND c.source_id = a.source_id + AND c.process_mode = 'standard_mode' + GROUP BY + 1, 2, 3 +), +data_aggregation AS ( + SELECT + a.entity_id, + a.source_id, + COALESCE(a.metadata_object:type::VARCHAR, 'Unknown') AS type_code, + a.content_object:category_a::VARCHAR AS primary_category, + a.content_object:category_b::VARCHAR AS secondary_category, + p.event_date AS event_date + FROM + data_source_a a + JOIN + processed_dates p + ON + p.entity_id = a.entity_id + AND p.source_id = a.source_id + AND TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE = p.event_date + WHERE + a.content_object:category_a::VARCHAR IS NOT NULL + AND a.content_object:category_b::VARCHAR IS NOT NULL + AND TO_TIMESTAMP(a.content_object:event_date::BIGINT)::DATE IS NOT NULL + GROUP BY + 1, 2, 3, 4, 5, 6 +) +SELECT * FROM data_aggregation; +---- +ENTITY1 SRC1 T1 CA1 CB1 2021-01-01 + +statement ok +CREATE OR REPLACE TABLE data_main ( + record_id VARCHAR, + category_id VARCHAR, + data_object VARIANT +); + +statement ok +CREATE OR REPLACE TABLE data_staging ( + data_object VARIANT +); + +statement ok +INSERT INTO data_main (record_id, category_id, data_object) VALUES +('rec1', 'cat1', '{"timestamp": 1625000000000, "metadata": {"category_id": "cat1", "timestamp": 1625000000000}}'); + +statement ok +INSERT INTO data_staging (data_object) VALUES +('{"unique_key": "rec1", "metadata": {"category_id": "cat1"}, "timestamp": 1625100000000}'); + +query T +EXPLAIN MERGE INTO data_main target +USING ( + SELECT + data_object:unique_key AS record_id, + data_object:metadata.category_id AS category_id, + 1624900000000 AS reference_time + FROM data_staging +) source +ON target.record_id = source.record_id AND target.category_id = source.category_id +WHEN MATCHED THEN +UPDATE SET + data_object = json_object_insert( + target.data_object, + 'metadata', + json_object_insert( + json_object_insert( + target.data_object:metadata, + 'reference_time', + source.reference_time::variant, + true + ), + 'time_difference', + coalesce((target.data_object:metadata.timestamp - source.reference_time) / 24 / 60 / 60 / 1000::variant, null::variant), + true + ), + true + ); +---- +CommitSink +└── DataMutation + ├── target table: [catalog: default] [database: test_virtual_db] [table: data_main] + ├── matched update: [condition: None, update set data_object = if(CAST(_predicate (#18446744073709551615) AS Boolean NULL), json_object_insert(target.data_object (#7), 'metadata', json_object_insert(json_object_insert(get_by_keypath(target.data_object (#7), '{"metadata"}'), 'reference_time', CAST(source.reference_time (#4) AS Variant), true), 'time_difference', if(CAST(is_not_null((TRY_CAST(get_by_keypath(target.data_object (#7), '{"metadata","timestamp"}') AS UInt8 NULL) - CAST(source.reference_time (#4) AS UInt64 NULL)) / 24 / 60 / 60 / NULL) AS Boolean NULL), CAST(assume_not_null((TRY_CAST(get_by_keypath(target.data_object (#7), '{"metadata","timestamp"}') AS UInt8 NULL) - CAST(source.reference_time (#4) AS UInt64 NULL)) / 24 / 60 / 60 / NULL) AS Float64 NULL), NULL), true), true), target.data_object (#7))] + └── RowFetch + ├── output columns: [target.record_id (#5), target.category_id (#6), target._row_id (#8), data_staging.data_object['metadata']['category_id'] (#1), data_staging.data_object['unique_key'] (#3), reference_time (#4), target.data_object (#7)] + ├── columns to fetch: [data_object] + └── HashJoin + ├── output columns: [target.record_id (#5), target.category_id (#6), target._row_id (#8), data_staging.data_object['metadata']['category_id'] (#1), data_staging.data_object['unique_key'] (#3), reference_time (#4)] + ├── join type: INNER + ├── build keys: [CAST(source.record_id (#3) AS String NULL), CAST(source.category_id (#1) AS String NULL)] + ├── probe keys: [target.record_id (#5), target.category_id (#6)] + ├── keys is null equal: [false, false] + ├── filters: [] + ├── build join filters: + │ ├── filter id:0, build key:CAST(source.record_id (#3) AS String NULL), probe key:target.record_id (#5), filter type:inlist,min_max + │ └── filter id:1, build key:CAST(source.category_id (#1) AS String NULL), probe key:target.category_id (#6), filter type:inlist,min_max + ├── estimated rows: 1.00 + ├── EvalScalar(Build) + │ ├── output columns: [data_staging.data_object['metadata']['category_id'] (#1), data_staging.data_object['unique_key'] (#3), reference_time (#4)] + │ ├── expressions: [1624900000000] + │ ├── estimated rows: 1.00 + │ └── TableScan + │ ├── table: default.test_virtual_db.data_staging + │ ├── output columns: [data_object['metadata']['category_id'] (#1), data_object['unique_key'] (#3)] + │ ├── read rows: 1 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── pruning stats: [segments: , blocks: ] + │ ├── push downs: [filters: [], limit: NONE] + │ ├── virtual columns: [data_object['metadata']['category_id'], data_object['unique_key']] + │ └── estimated rows: 1.00 + └── TableScan(Probe) + ├── table: default.test_virtual_db.data_main + ├── output columns: [record_id (#5), category_id (#6), _row_id (#8)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + ├── apply join filters: [#0, #1] + └── estimated rows: 1.00 + +query TTT +SELECT * FROM data_main; +---- +rec1 cat1 {"metadata":{"category_id":"cat1","timestamp":1625000000000},"timestamp":1625000000000} + statement ok drop table t1 statement ok drop table t2 +statement ok +drop table data_source_a + +statement ok +drop table config_table + +statement ok +drop table data_main + +statement ok +drop table data_staging + +statement ok +set enable_experimental_virtual_column = 0; + statement ok USE default