Skip to content

Commit

Permalink
fix: add check of parameter event_time_column of stream table
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit authored and ZuoTiJia committed Nov 21, 2023
1 parent ad640cf commit 1bda378
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
15 changes: 15 additions & 0 deletions query_server/query/src/data_source/stream/tskv/factory.rs
@@ -1,6 +1,7 @@
use std::sync::Arc;

use coordinator::service::CoordinatorRef;
use datafusion::logical_expr::type_coercion::is_timestamp;
use meta::error::MetaError;
use meta::model::MetaClientRef;
use models::schema::StreamTable;
Expand All @@ -12,6 +13,7 @@ use super::provider::TskvStreamProvider;
use super::{get_target_db_name, get_target_table_name, STREAM_DB_KEY, STREAM_TABLE_KEY};
use crate::data_source::batch::tskv::ClusterTable;
use crate::data_source::split::SplitManagerRef;
use crate::data_source::stream::EVENT_TIME_COLUMN_OPTION;

pub const TSKV_STREAM_PROVIDER: &str = "tskv";

Expand Down Expand Up @@ -68,6 +70,19 @@ impl SchemaChecker<StreamTable> for TskvStreamProviderFactory {
target_schema.field_with_name(f.name())?;
}

// check 'event_time_column'
let field = target_schema.field_with_name(&table.watermark().column)?;
if !is_timestamp(field.data_type()) {
return Err(QueryError::InvalidTableOption {
option_name: EVENT_TIME_COLUMN_OPTION.to_string(),
table_name: table_name.to_string(),
reason: format!(
"The data type of column '{}' is not timestamp.",
table.watermark().column
),
});
}

Ok(())
}
}
Expand Down
26 changes: 26 additions & 0 deletions query_server/sqllogicaltests/cases/stream/syntax.slt
Expand Up @@ -85,3 +85,29 @@ CREATE STREAM TABLE TskvTable (
table = 'readings_kv',
event_time_column = 'time'
) engine = xxx;

# event_time_column does not match
statement error .*Schema error: Unable to get field named \\"timex\\"\. Valid fields: \[\\"time\\", \\"name\\", \\"fleet\\", \\"driver\\", \\"model\\", \\"device_version\\", \\"latitude\\", \\"longitude\\", \\"elevation\\", \\"velocity\\", \\"heading\\", \\"grade\\", \\"fuel_consumption\\", \\"load_capacity\\", \\"fuel_capacity\\", \\"nominal_fuel_consumption\\"\]", .*
CREATE STREAM TABLE TskvTable (
time TIMESTAMP,
name STRING,
driver STRING,
elevation DOUBLE
) WITH (
db = 'public',
table = 'readings_kv',
event_time_column = 'timex'
) engine = tskv;

# data type of event_time_column does not match
statement error .*Invalid option \[event_time_column\] of table tskvtable: The data type of column 'latitude' is not timestamp.*
CREATE STREAM TABLE TskvTable (
time TIMESTAMP,
name STRING,
driver STRING,
elevation DOUBLE
) WITH (
db = 'public',
table = 'readings_kv',
event_time_column = 'latitude'
) engine = tskv;

0 comments on commit 1bda378

Please sign in to comment.