Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SQL insertion and column default constraint aware of timezone #3266

Merged
merged 7 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ pub fn check_permission(
}

fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx.clone())
let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl GrpcQueryHandler for Instance {
// TODO(weny): supports to create multiple region table.
let _ = self
.statement_executor
.create_table_inner(&mut expr, None)
.create_table_inner(&mut expr, None, &ctx)
.await?;
Output::AffectedRows(0)
}
Expand Down
128 changes: 119 additions & 9 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use api::v1::{
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_time::Timezone;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
use file_engine::FileOptions;
use query::sql::{
Expand Down Expand Up @@ -122,7 +123,7 @@ pub(crate) async fn create_external_expr(
query_ctx: QueryContextRef,
) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
table_idents_to_full_name(&create.name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -141,7 +142,8 @@ pub(crate) async fn create_external_expr(
// expanded form
let time_index = find_time_index(&create.constraints)?;
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let column_schemas = columns_to_column_schemas(&create.columns, &time_index)?;
let column_schemas =
columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
(time_index, primary_keys, column_schemas)
} else {
// inferred form
Expand Down Expand Up @@ -182,7 +184,7 @@ pub(crate) async fn create_external_expr(
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
table_idents_to_full_name(&create.name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -199,7 +201,12 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
schema_name,
table_name,
desc: String::default(),
column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?,
column_defs: columns_to_expr(
&create.columns,
&time_index,
&primary_keys,
Some(&query_ctx.timezone()),
)?,
time_index,
primary_keys,
create_if_not_exists: create.if_not_exists,
Expand Down Expand Up @@ -293,18 +300,23 @@ fn columns_to_expr(
column_defs: &[ColumnDef],
time_index: &str,
primary_keys: &[String],
timezone: Option<&Timezone>,
) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = columns_to_column_schemas(column_defs, time_index)?;
let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
column_schemas_to_defs(column_schemas, primary_keys)
}

fn columns_to_column_schemas(
column_defs: &[ColumnDef],
time_index: &str,
timezone: Option<&Timezone>,
) -> Result<Vec<ColumnSchema>> {
column_defs
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
.map(|c| {
column_def_to_schema(c, c.name.to_string() == time_index, timezone)
.context(ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()
}

Expand Down Expand Up @@ -365,7 +377,7 @@ pub(crate) fn to_alter_expr(
query_ctx: QueryContextRef,
) -> Result<AlterExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
table_idents_to_full_name(alter_table.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -382,7 +394,7 @@ pub(crate) fn to_alter_expr(
} => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
sql_column_def_to_grpc_column_def(column_def, Some(&query_ctx.timezone()))
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
Expand All @@ -409,10 +421,12 @@ pub(crate) fn to_alter_expr(

#[cfg(test)]
mod tests {
use session::context::QueryContext;
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use store_api::storage::ColumnDefaultConstraint;

use super::*;

Expand All @@ -435,4 +449,100 @@ mod tests {
expr.table_options.get("write_buffer_size").unwrap()
);
}

#[test]
fn test_create_to_expr_with_default_timestamp_value() {
let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateTable(create_table) = stmt else {
unreachable!()
};

// query context with system timezone UTC.
let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap();
let ts_column = &expr.column_defs[1];
let constraint = assert_ts_column(ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
);

// query context with timezone `+08:00`
let ctx = QueryContextBuilder::default()
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
.build();
let expr = create_to_expr(&create_table, ctx).unwrap();
let ts_column = &expr.column_defs[1];
let constraint = assert_ts_column(ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
);
}

fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
assert_eq!("ts", ts_column.name);
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
ts_column.data_type
);
assert!(!ts_column.default_constraint.is_empty());

ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
}

#[test]
fn test_to_alter_expr() {
let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::Alter(alter_table) = stmt else {
unreachable!()
};

// query context with system timezone UTC.
let expr = to_alter_expr(alter_table.clone(), QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();

let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};

assert_eq!(1, add_columns.len());
let ts_column = add_columns[0].column_def.clone().unwrap();
let constraint = assert_ts_column(&ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
);

//
// query context with timezone `+08:00`
let ctx = QueryContextBuilder::default()
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
.build();
let expr = to_alter_expr(alter_table, ctx).unwrap();
let kind = expr.kind.unwrap();

let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};

assert_eq!(1, add_columns.len());
let ts_column = add_columns[0].column_def.clone().unwrap();
let constraint = assert_ts_column(&ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
);
}
}
6 changes: 3 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl Inserter {
) -> Result<Output> {
let inserts =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert)
.convert(insert, ctx)
.await?;

let affected_rows = self.do_request(inserts, ctx).await?;
Expand Down Expand Up @@ -334,7 +334,7 @@ impl Inserter {

// create physical table
let res = statement_executor
.create_table_inner(create_table_expr, None)
.create_table_inner(create_table_expr, None, ctx)
.await;

match res {
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Inserter {

// TODO(weny): multiple regions table.
let res = statement_executor
.create_table_inner(create_table_expr, None)
.create_table_inner(create_table_expr, None, ctx)
.await;

match res {
Expand Down
23 changes: 18 additions & 5 deletions src/operator/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
use catalog::CatalogManager;
use common_time::Timezone;
use datatypes::schema::{ColumnSchema, SchemaRef};
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
Expand Down Expand Up @@ -54,7 +55,11 @@ impl<'a> StatementToRegion<'a> {
}
}

pub async fn convert(&self, stmt: &Insert) -> Result<RegionInsertRequests> {
pub async fn convert(
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<RegionInsertRequests> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
Expand Down Expand Up @@ -110,7 +115,11 @@ impl<'a> StatementToRegion<'a> {
schema.push(grpc_column_schema);

for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
let value = sql_value_to_grpc_value(column_schema, &sql_row[i])?;
let value = sql_value_to_grpc_value(
column_schema,
&sql_row[i],
Some(&query_ctx.timezone()),
)?;
grpc_row.values.push(value);
}
}
Expand Down Expand Up @@ -169,7 +178,11 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
}
}

fn sql_value_to_grpc_value(column_schema: &ColumnSchema, sql_val: &SqlValue) -> Result<GrpcValue> {
fn sql_value_to_grpc_value(
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
) -> Result<GrpcValue> {
let column = &column_schema.name;
let value = if replace_default(sql_val) {
let default_value = column_schema
Expand All @@ -182,7 +195,7 @@ fn sql_value_to_grpc_value(column_schema: &ColumnSchema, sql_val: &SqlValue) ->
column: column.clone(),
})?
} else {
statements::sql_value_to_value(column, &column_schema.data_type, sql_val)
statements::sql_value_to_value(column, &column_schema.data_type, sql_val, timezone)
.context(ParseSqlSnafu)?
};

Expand Down
13 changes: 7 additions & 6 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ impl StatementExecutor {
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
table_idents_to_full_name(stmt.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name, stmt.drop_if_exists()).await
}
Statement::TruncateTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
table_idents_to_full_name(stmt.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
Expand All @@ -186,7 +186,7 @@ impl StatementExecutor {

Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, query_ctx.clone())
table_idents_to_full_name(&show.table_name, &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;

Expand Down Expand Up @@ -298,9 +298,10 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
CopyTable::To(arg) => arg,
CopyTable::From(arg) => arg,
};
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table_name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

let pattern = with
.get(common_datasource::file_format::FILE_PATTERN)
Expand Down