Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
.default_session()
.get_settings()
.set_data_retention_time_in_days(0)?;
fixture
.default_session()
.get_settings()
.set_enable_experimental_virtual_column(1)?;
fixture.create_default_database().await?;
fixture.create_variant_table().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ use jsonb::OwnedJsonb;
async fn test_virtual_column_builder() -> Result<()> {
let fixture = TestFixture::setup_with_custom(EESetup::new()).await?;

fixture
.default_session()
.get_settings()
.set_enable_experimental_virtual_column(1)?;
fixture.create_default_database().await?;
fixture.create_variant_table().await?;

Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=100)),
}),
("enable_experimental_virtual_column", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables experimental virtual column",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,4 +952,12 @@ impl Settings {
pub fn get_trace_sample_rate(&self) -> Result<u64> {
self.try_get_u64("trace_sample_rate")
}

pub fn set_enable_experimental_virtual_column(&self, val: u64) -> Result<()> {
self.try_set_u64("enable_experimental_virtual_column", val)
}

pub fn get_enable_experimental_virtual_column(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_experimental_virtual_column")? == 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ impl MutationExpression {
.ok_or_else(|| ErrorCode::Internal("Can't get target table index"))?;

// Remove stream columns in source context.
source_context
.columns
.retain(|v| v.visibility == Visibility::Visible);
source_context.columns.retain(|col| {
let read_guard = binder.metadata.read();
let column_entry = read_guard.column(col.index);
!column_entry.is_stream_column()
});

// Add source table columns to required columns.
for column_index in source_context.column_set().iter() {
Expand Down Expand Up @@ -199,9 +201,12 @@ impl MutationExpression {
let from_s_expr = if let Some(from) = from {
let (from_s_expr, mut from_context) =
binder.bind_table_reference(&mut bind_context, from)?;
from_context
.columns
.retain(|v| v.visibility == Visibility::Visible);
// Remove stream columns in source context.
let read_guard = binder.metadata.read();
from_context.columns.retain(|col| {
let column_entry = read_guard.column(col.index);
!column_entry.is_stream_column()
});
for column in from_context.columns.iter() {
required_columns.insert(column.index);
bind_context.add_column_binding(column.clone());
Expand Down
12 changes: 8 additions & 4 deletions src/query/sql/src/planner/binder/bind_query/bind_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ impl Binder {
}

// whether allow rewrite virtual column and pushdown
let allow_virtual_column = LicenseManagerSwitch::instance()
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn)
.is_ok();
bind_context.allow_virtual_column = allow_virtual_column;
bind_context.allow_virtual_column = self
.ctx
.get_settings()
.get_enable_experimental_virtual_column()
.unwrap_or_default()
&& LicenseManagerSwitch::instance()
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn)
.is_ok();

let (mut s_expr, mut from_context) = if stmt.from.is_empty() {
let select_list = &stmt.select_list;
Expand Down
14 changes: 14 additions & 0 deletions src/query/sql/src/planner/metadata/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::InternalColumn;
use databend_common_catalog::table::Table;
use databend_common_expression::display::display_tuple_field_name;
use databend_common_expression::is_stream_column_id;
use databend_common_expression::types::DataType;
use databend_common_expression::ComputedExpr;
use databend_common_expression::TableDataType;
Expand Down Expand Up @@ -766,6 +767,19 @@ impl ColumnEntry {
ColumnEntry::VirtualColumn(VirtualColumn { table_index, .. }) => Some(*table_index),
}
}

pub fn is_stream_column(&self) -> bool {
if let ColumnEntry::BaseTableColumn(BaseTableColumn {
column_id: Some(column_id),
..
}) = self
{
if is_stream_column_id(*column_id) {
return true;
}
}
false
}
}

pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool {
Expand Down
162 changes: 12 additions & 150 deletions src/query/sql/src/planner/semantic/type_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,10 @@ use crate::plans::WindowOrderBy;
use crate::BaseTableColumn;
use crate::BindContext;
use crate::ColumnBinding;
use crate::ColumnBindingBuilder;
use crate::ColumnEntry;
use crate::DefaultExprBinder;
use crate::IndexType;
use crate::MetadataRef;
use crate::Visibility;

/// A helper for type checking.
///
Expand Down Expand Up @@ -601,15 +599,6 @@ impl<'a> TypeChecker<'a> {
// as we cast JSON null to SQL NULL.
let target_type = if data_type.remove_nullable() == DataType::Variant {
let target_type = checked_expr.data_type().nest_wrap_nullable();

if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
expr.span(),
&scalar,
&target_type,
false,
) {
return Ok(Box::new((new_scalar, target_type)));
}
target_type
// if the source type is nullable, cast target type should also be nullable.
} else if data_type.is_nullable_or_null() {
Expand Down Expand Up @@ -659,15 +648,6 @@ impl<'a> TypeChecker<'a> {
// as we cast JSON null to SQL NULL.
let target_type = if data_type.remove_nullable() == DataType::Variant {
let target_type = checked_expr.data_type().nest_wrap_nullable();

if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
expr.span(),
&scalar,
&target_type,
true,
) {
return Ok(Box::new((new_scalar, target_type)));
}
target_type
} else {
checked_expr.data_type().clone()
Expand Down Expand Up @@ -1194,105 +1174,6 @@ impl<'a> TypeChecker<'a> {
Ok(Box::new((scalar, data_type)))
}

fn try_rewrite_virtual_column_cast(
&mut self,
span: Span,
scalar: &ScalarExpr,
target_type: &DataType,
is_try: bool,
) -> Option<ScalarExpr> {
let Ok(cast_ty) = infer_schema_type(target_type) else {
return None;
};
let ScalarExpr::BoundColumnRef(BoundColumnRef { ref column, .. }) = scalar else {
return None;
};
let table_index = column.table_index?;

if column.index >= self.metadata.read().columns().len() {
return None;
}

// Change the type of virtual column to user specified cast type avoids additional casting overhead,
// since the user usually knows the real type.
let column_entry = self.metadata.read().column(column.index).clone();
let ColumnEntry::VirtualColumn(virtual_column) = column_entry else {
return None;
};

let virtual_column_name = if is_try {
format!(
"try_cast({} as {})",
column.column_name,
target_type.remove_nullable().to_string().to_lowercase()
)
} else {
format!(
"{}::{}",
column.column_name,
target_type.remove_nullable().to_string().to_lowercase()
)
};

// Try resolve the virtual column with the cast type.
if let Ok(box (new_scalar, _)) = self.resolve(&Expr::ColumnRef {
span,
column: ColumnRef {
database: column
.database_name
.as_ref()
.map(|name| Identifier::from_name(span, name)),
table: column
.table_name
.as_ref()
.map(|name| Identifier::from_name(span, name)),
column: ColumnID::Name(Identifier::from_name(span, &virtual_column_name)),
},
}) {
return Some(new_scalar);
}

// Generate a new virtual column with the cast type.
let database_name = column.database_name.clone();
let table_name = column.table_name.clone();

let mut guard = self.metadata.write();
let new_column_index = guard.add_virtual_column(
virtual_column.table_index,
virtual_column.source_column_name.clone(),
virtual_column.source_column_id,
virtual_column.column_id,
virtual_column_name.clone(),
cast_ty,
is_try,
);

let new_column_binding = ColumnBindingBuilder::new(
virtual_column_name,
new_column_index,
Box::new(target_type.clone()),
Visibility::InVisible,
)
.table_name(table_name)
.database_name(database_name)
.table_index(Some(table_index))
.build();
// Add virtual column with the cast type to the context.
self.bind_context
.add_column_binding(new_column_binding.clone());

if let Some(scan_id) = guard.base_column_scan_id(virtual_column.column_index) {
let mut base_column_scan_id = HashMap::new();
base_column_scan_id.insert(new_column_index, scan_id);
guard.add_base_column_scan_id(base_column_scan_id);
}

Some(ScalarExpr::BoundColumnRef(BoundColumnRef {
span,
column: new_column_binding,
}))
}

// TODO: remove this function
fn rewrite_substring(args: &mut [ScalarExpr]) {
if let ScalarExpr::ConstantExpr(expr) = &args[1] {
Expand Down Expand Up @@ -3996,22 +3877,12 @@ impl<'a> TypeChecker<'a> {
{
if data_type.remove_nullable() == DataType::Variant {
let target_type = DataType::Nullable(Box::new(DataType::String));
let new_scalar = if let Some(new_scalar) = self
.try_rewrite_virtual_column_cast(
scalar.span(),
&scalar,
&target_type,
false,
) {
new_scalar
} else {
ScalarExpr::CastExpr(CastExpr {
span: scalar.span(),
is_try: false,
argument: Box::new(scalar),
target_type: Box::new(target_type.clone()),
})
};
let new_scalar = ScalarExpr::CastExpr(CastExpr {
span: scalar.span(),
is_try: false,
argument: Box::new(scalar),
target_type: Box::new(target_type.clone()),
});
return Some(Ok(Box::new((new_scalar, target_type))));
}
}
Expand Down Expand Up @@ -4161,21 +4032,12 @@ impl<'a> TypeChecker<'a> {
) {
if func_name == "get_by_keypath_string" {
let target_type = DataType::Nullable(Box::new(DataType::String));
let new_scalar = if let Some(new_scalar) = self.try_rewrite_virtual_column_cast(
scalar.span(),
&scalar,
&target_type,
false,
) {
new_scalar
} else {
ScalarExpr::CastExpr(CastExpr {
span: scalar.span(),
is_try: false,
argument: Box::new(scalar),
target_type: Box::new(target_type.clone()),
})
};
let new_scalar = ScalarExpr::CastExpr(CastExpr {
span: scalar.span(),
is_try: false,
argument: Box::new(scalar),
target_type: Box::new(target_type.clone()),
});
return Some(Ok(Box::new((new_scalar, target_type))));
} else {
return Some(Ok(Box::new((scalar, data_type))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ impl VirtualColumnBuilder {
ctx: Arc<dyn TableContext>,
table_info: &TableInfo,
) -> Option<VirtualColumnBuilder> {
if !ctx
.get_settings()
.get_enable_experimental_virtual_column()
.unwrap_or_default()
{
return None;
}
if LicenseManagerSwitch::instance()
.check_enterprise_enabled(ctx.get_license_key(), Feature::VirtualColumn)
.is_err()
Expand Down
5 changes: 4 additions & 1 deletion src/query/storages/fuse/src/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ impl BlockPruner {
// Check whether can read virtual columns,
// and ignore the source columns.
let virtual_block_meta = virtual_column_pruner
.prune_virtual_columns(&block_meta.virtual_block_meta)
.prune_virtual_columns(
block_meta.row_count,
&block_meta.virtual_block_meta,
)
.await?;
prune_result.virtual_block_meta = virtual_block_meta;
}
Expand Down
Loading
Loading