From 1b7b06a9eab59cb6e2ae80f2741bc3f51f304e4a Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Wed, 9 Aug 2023 10:29:57 +0800 Subject: [PATCH] feat: support aggregate function first --- Cargo.lock | 18 +- .../query/benches/aggregate_function.rs | 10 ++ .../expr/aggregate_function/first.rs | 170 ++++++++++++++++++ .../extension/expr/aggregate_function/mod.rs | 3 + .../sqllogicaltests/cases/ddl/user.slt | 4 +- .../cases/function/compact_state_agg.slt | 4 +- .../sqllogicaltests/cases/function/first.slt | 25 +++ .../cases/function/gauge/gauge_agg.slt | 2 +- .../sqllogicaltests/cases/stream/syntax.slt | 2 +- 9 files changed, 223 insertions(+), 15 deletions(-) create mode 100644 query_server/query/src/extension/expr/aggregate_function/first.rs create mode 100644 query_server/sqllogicaltests/cases/function/first.slt diff --git a/Cargo.lock b/Cargo.lock index 73ccd0bbf4..83692617f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,7 +1543,7 @@ dependencies = [ [[package]] name = "datafusion" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "ahash 0.8.3", "arrow", @@ -1591,7 +1591,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "arrow", "arrow-array", @@ -1605,7 +1605,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "dashmap", "datafusion-common", @@ -1622,7 +1622,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "ahash 0.8.3", "arrow", @@ -1636,7 +1636,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "arrow", "async-trait", @@ -1653,7 +1653,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "ahash 0.8.3", "arrow", @@ -1685,7 +1685,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "arrow", "chrono", @@ -1699,7 +1699,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "arrow", "datafusion-common", @@ -1710,7 +1710,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#65dc4a97e48a10ad3bdf9173731c47cdfd3adb2e" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#c7379ecc59f4674496bdffa7a6ad953fbd491c00" dependencies = [ "arrow", "arrow-schema", diff --git a/query_server/query/benches/aggregate_function.rs b/query_server/query/benches/aggregate_function.rs index b006c7d5dc..160012b497 100644 --- a/query_server/query/benches/aggregate_function.rs +++ b/query_server/query/benches/aggregate_function.rs @@ -42,6 +42,16 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| data_utils::query(ctx.clone(), "select gauge_agg(ts, f64) FROM t")) }); group.finish(); + + c.bench_function("aggregate_query_no_group_by_first", |b| { + b.iter(|| { + data_utils::query( + ctx.clone(), + "SELECT first(ts, f64) \ + FROM t", + ) + }) + }); } criterion_group!(benches, criterion_benchmark); diff --git a/query_server/query/src/extension/expr/aggregate_function/first.rs b/query_server/query/src/extension/expr/aggregate_function/first.rs new file mode 100644 index 0000000000..eacf65e6f6 --- /dev/null +++ b/query_server/query/src/extension/expr/aggregate_function/first.rs @@ -0,0 +1,170 @@ +use std::cmp::Ordering; +use std::sync::Arc; + +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::compute::{sort_to_indices, SortOptions}; +use datafusion::arrow::datatypes::DataType; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::type_coercion::aggregates::{ + DATES, NUMERICS, STRINGS, TIMES, TIMESTAMPS, +}; +use datafusion::logical_expr::{ + AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature, StateTypeFunction, + TypeSignature, Volatility, +}; +use datafusion::physical_plan::Accumulator; +use datafusion::scalar::ScalarValue; +use spi::query::function::FunctionMetadataManager; +use spi::{QueryError, Result}; + +use super::TSPoint; +use crate::extension::expr::aggregate_function::FIRST_UDAF_NAME; +use crate::extension::expr::BINARYS; + +pub fn register_udaf(func_manager: &mut dyn FunctionMetadataManager) -> Result { + let udf = new(); + func_manager.register_udaf(udf.clone())?; + Ok(udf) +} + +fn new() -> AggregateUDF { + let return_type_func: ReturnTypeFunction = + Arc::new(move |input| Ok(Arc::new(input[1].clone()))); + + let state_type_func: StateTypeFunction = Arc::new(move |input, _| Ok(Arc::new(input.to_vec()))); + + let accumulator: AccumulatorFactoryFunction = Arc::new(|input, _| { + let time_data_type = input[0].clone(); + let value_data_type = input[1].clone(); + + Ok(Box::new(FirstAccumulator::try_new( + time_data_type, + value_data_type, + )?)) + }); + + // first( + // time TIMESTAMP, + // value ANY + // ) + let type_signatures = STRINGS + .iter() + .chain(NUMERICS.iter()) + .chain(TIMESTAMPS.iter()) + .chain(DATES.iter()) + .chain(BINARYS.iter()) + .chain(TIMES.iter()) + .flat_map(|t| { + TIMESTAMPS + .iter() + .map(|s_t| TypeSignature::Exact(vec![s_t.clone(), t.clone()])) + }) + .collect(); + + AggregateUDF::new( + FIRST_UDAF_NAME, + &Signature::one_of(type_signatures, Volatility::Immutable), + &return_type_func, + &accumulator, + &state_type_func, + ) +} + +#[derive(Debug)] +struct FirstAccumulator { + first: TSPoint, + + sort_opts: SortOptions, +} + +impl FirstAccumulator { + fn try_new(time_data_type: DataType, value_data_type: DataType) -> DFResult { + let null = TSPoint::try_new_null(time_data_type, value_data_type)?; + Ok(Self { + first: null, + sort_opts: SortOptions { + descending: false, + nulls_first: false, + }, + }) + } + + fn update_inner(&mut self, point: TSPoint) -> DFResult<()> { + if point.ts().is_null() || point.val().is_null() { + return Ok(()); + } + + if self.first.ts().is_null() { + self.first = point; + return Ok(()); + } + + match point.ts().partial_cmp(self.first.ts()) { + Some(ordering) => { + if ordering == Ordering::Less { + self.first = point; + } + } + None => { + return Err(DataFusionError::External(Box::new(QueryError::Internal { + reason: format!("cannot compare {:?} with {:?}", point.ts(), self.first.ts()), + }))) + } + } + + Ok(()) + } +} + +impl Accumulator for FirstAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> DFResult<()> { + trace::trace!("update_batch: {:?}", values); + + if values.is_empty() { + return Ok(()); + } + + debug_assert!( + values.len() == 2, + "gauge_agg can only take 2 param, but found {}", + values.len() + ); + + let times_records = values[0].as_ref(); + let value_records = values[1].as_ref(); + + let indices = sort_to_indices(times_records, Some(self.sort_opts), Some(1))?; + + if !indices.is_empty() { + let idx = indices.value(0) as usize; + let ts = ScalarValue::try_from_array(times_records, idx)?; + let val = ScalarValue::try_from_array(value_records, idx)?; + let point = TSPoint { ts, val }; + self.update_inner(point)?; + } + + Ok(()) + } + + fn evaluate(&self) -> DFResult { + Ok(self.first.val().clone()) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(self.first.ts()) + + self.first.ts().size() + - std::mem::size_of_val(self.first.ts()) + + self.first.ts().size() + } + + fn state(&self) -> DFResult> { + Ok(vec![self.first.ts().clone(), self.first.val().clone()]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> DFResult<()> { + trace::trace!("merge_batch: {:?}", states); + + self.update_batch(states) + } +} diff --git a/query_server/query/src/extension/expr/aggregate_function/mod.rs b/query_server/query/src/extension/expr/aggregate_function/mod.rs index 09fafd5b27..c010fad977 100644 --- a/query_server/query/src/extension/expr/aggregate_function/mod.rs +++ b/query_server/query/src/extension/expr/aggregate_function/mod.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod example; +mod first; mod gauge; mod sample; mod state_agg; @@ -17,6 +18,7 @@ use spi::{QueryError, Result}; pub const SAMPLE_UDAF_NAME: &str = "sample"; pub const COMPACT_STATE_AGG_UDAF_NAME: &str = "compact_state_agg"; pub const GAUGE_AGG_UDAF_NAME: &str = "gauge_agg"; +pub const FIRST_UDAF_NAME: &str = "first"; pub use gauge::GaugeData; pub fn register_udafs(func_manager: &mut dyn FunctionMetadataManager) -> Result<()> { @@ -26,6 +28,7 @@ pub fn register_udafs(func_manager: &mut dyn FunctionMetadataManager) -> Result< sample::register_udaf(func_manager)?; state_agg::register_udafs(func_manager)?; gauge::register_udafs(func_manager)?; + first::register_udaf(func_manager)?; Ok(()) } diff --git a/query_server/sqllogicaltests/cases/ddl/user.slt b/query_server/sqllogicaltests/cases/ddl/user.slt index fedfa8116e..06de2763fb 100644 --- a/query_server/sqllogicaltests/cases/ddl/user.slt +++ b/query_server/sqllogicaltests/cases/ddl/user.slt @@ -38,5 +38,5 @@ select * from cluster_schema.users where user_name = 'test_alter_options_u'; test_alter_options_u false {"password":"*****","must_change_password":true,"comment":"ooo ooo","granted_admin":false} # table not found -statement error .*Table not found: \\"a_non_existent_table\\".* -drop table a_non_existent_table; \ No newline at end of file +statement error .*Table not found: \\"a_non_existent_table\\".* +drop table a_non_existent_table; diff --git a/query_server/sqllogicaltests/cases/function/compact_state_agg.slt b/query_server/sqllogicaltests/cases/function/compact_state_agg.slt index b7fd28a8ad..2f1f6f6fe9 100644 --- a/query_server/sqllogicaltests/cases/function/compact_state_agg.slt +++ b/query_server/sqllogicaltests/cases/function/compact_state_agg.slt @@ -8,8 +8,8 @@ statement ok with tmp as (select compact_state_agg(time, f1) as state from func_tbl) select state.state_duration, state.state_periods from tmp; -statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Failed to do analyze. err: The function \\"compact_state_agg\\" expects 2 arguments, but 3 were provided",.* +statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Datafusion: Error during planning: No function matches the given name and argument types 'compact_state_agg\(Timestamp\(Nanosecond, None\), Timestamp\(Nanosecond, None\), Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\.\\n\\tCandidate functions:\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt64\)", .* select compact_state_agg(time, time, time) as state from func_tbl; -statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Failed to do analyze. err: The function \\"compact_state_agg\\" expects 2 arguments, but 1 were provided",.* +statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Datafusion: Error during planning: No function matches the given name and argument types 'compact_state_agg\(Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\.\\n\\tCandidate functions:\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Utf8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), LargeUtf8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int16\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int32\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), Int64\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt8\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt16\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt32\)\\n\\tcompact_state_agg\(Timestamp\(Second, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Millisecond, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Microsecond, None\), UInt64\)\\n\\tcompact_state_agg\(Timestamp\(Nanosecond, None\), UInt64\)", .* select compact_state_agg(time) as state from func_tbl; diff --git a/query_server/sqllogicaltests/cases/function/first.slt b/query_server/sqllogicaltests/cases/function/first.slt new file mode 100644 index 0000000000..4393fa8b52 --- /dev/null +++ b/query_server/sqllogicaltests/cases/function/first.slt @@ -0,0 +1,25 @@ +include ./setup.slt + +########## +## Query +########## + +query +select first(time, f1) from func_tbl; +---- +444 + +query +select first(time, f0) from func_tbl; +---- +111 + +query +select first(time, f0), t0 from func_tbl group by t0 order by t0; +---- +111 tag11 +222 tag12 +444 tag14 + +query error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Datafusion: Error during planning: No function matches the given name and argument types 'first\(Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\.\\n\\tCandidate functions:\\n\\tfirst\(Timestamp\(Second, None\), Utf8\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Utf8\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Utf8\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Utf8\)\\n\\tfirst\(Timestamp\(Second, None\), LargeUtf8\)\\n\\tfirst\(Timestamp\(Millisecond, None\), LargeUtf8\)\\n\\tfirst\(Timestamp\(Microsecond, None\), LargeUtf8\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), LargeUtf8\)\\n\\tfirst\(Timestamp\(Second, None\), Int8\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Int8\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Int8\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Int8\)\\n\\tfirst\(Timestamp\(Second, None\), Int16\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Int16\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Int16\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Int16\)\\n\\tfirst\(Timestamp\(Second, None\), Int32\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Int32\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Int32\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Int32\)\\n\\tfirst\(Timestamp\(Second, None\), Int64\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Int64\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Int64\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Int64\)\\n\\tfirst\(Timestamp\(Second, None\), UInt8\)\\n\\tfirst\(Timestamp\(Millisecond, None\), UInt8\)\\n\\tfirst\(Timestamp\(Microsecond, None\), UInt8\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), UInt8\)\\n\\tfirst\(Timestamp\(Second, None\), UInt16\)\\n\\tfirst\(Timestamp\(Millisecond, None\), UInt16\)\\n\\tfirst\(Timestamp\(Microsecond, None\), UInt16\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), UInt16\)\\n\\tfirst\(Timestamp\(Second, None\), UInt32\)\\n\\tfirst\(Timestamp\(Millisecond, None\), UInt32\)\\n\\tfirst\(Timestamp\(Microsecond, None\), UInt32\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), UInt32\)\\n\\tfirst\(Timestamp\(Second, None\), UInt64\)\\n\\tfirst\(Timestamp\(Millisecond, None\), UInt64\)\\n\\tfirst\(Timestamp\(Microsecond, None\), UInt64\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), UInt64\)\\n\\tfirst\(Timestamp\(Second, None\), Float32\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Float32\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Float32\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Float32\)\\n\\tfirst\(Timestamp\(Second, None\), Float64\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Float64\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Float64\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Float64\)\\n\\tfirst\(Timestamp\(Second, None\), Timestamp\(Second, None\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Timestamp\(Second, None\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Timestamp\(Second, None\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Timestamp\(Second, None\)\)\\n\\tfirst\(Timestamp\(Second, None\), Timestamp\(Millisecond, None\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Timestamp\(Millisecond, None\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Timestamp\(Millisecond, None\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Timestamp\(Millisecond, None\)\)\\n\\tfirst\(Timestamp\(Second, None\), Timestamp\(Microsecond, None\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Timestamp\(Microsecond, None\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Timestamp\(Microsecond, None\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Timestamp\(Microsecond, None\)\)\\n\\tfirst\(Timestamp\(Second, None\), Timestamp\(Nanosecond, None\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Timestamp\(Nanosecond, None\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Timestamp\(Nanosecond, None\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Timestamp\(Nanosecond, None\)\)\\n\\tfirst\(Timestamp\(Second, None\), Date32\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Date32\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Date32\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Date32\)\\n\\tfirst\(Timestamp\(Second, None\), Date64\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Date64\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Date64\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Date64\)\\n\\tfirst\(Timestamp\(Second, None\), Binary\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Binary\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Binary\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Binary\)\\n\\tfirst\(Timestamp\(Second, None\), LargeBinary\)\\n\\tfirst\(Timestamp\(Millisecond, None\), LargeBinary\)\\n\\tfirst\(Timestamp\(Microsecond, None\), LargeBinary\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), LargeBinary\)\\n\\tfirst\(Timestamp\(Second, None\), FixedSizeBinary\(2147483647\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), FixedSizeBinary\(2147483647\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), FixedSizeBinary\(2147483647\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), FixedSizeBinary\(2147483647\)\)\\n\\tfirst\(Timestamp\(Second, None\), Time32\(Second\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Time32\(Second\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Time32\(Second\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Time32\(Second\)\)\\n\\tfirst\(Timestamp\(Second, None\), Time32\(Millisecond\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Time32\(Millisecond\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Time32\(Millisecond\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Time32\(Millisecond\)\)\\n\\tfirst\(Timestamp\(Second, None\), Time64\(Microsecond\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Time64\(Microsecond\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Time64\(Microsecond\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Time64\(Microsecond\)\)\\n\\tfirst\(Timestamp\(Second, None\), Time64\(Nanosecond\)\)\\n\\tfirst\(Timestamp\(Millisecond, None\), Time64\(Nanosecond\)\)\\n\\tfirst\(Timestamp\(Microsecond, None\), Time64\(Nanosecond\)\)\\n\\tfirst\(Timestamp\(Nanosecond, None\), Time64\(Nanosecond\)\)", .* +select first(time) from func_tbl; diff --git a/query_server/sqllogicaltests/cases/function/gauge/gauge_agg.slt b/query_server/sqllogicaltests/cases/function/gauge/gauge_agg.slt index f829e56e68..bafc7381b1 100644 --- a/query_server/sqllogicaltests/cases/function/gauge/gauge_agg.slt +++ b/query_server/sqllogicaltests/cases/function/gauge/gauge_agg.slt @@ -14,5 +14,5 @@ select gauge_agg(time, f0 order by time) from func_tbl; ---- {first: {ts: 1999-12-31T00:00:00, val: 111.0}, second: {ts: 1999-12-31T00:00:00.005, val: 222.0}, penultimate: {ts: 1999-12-31T00:10:00.030, val: 444.0}, last: {ts: 1999-12-31T01:00:00.035, val: 555.0}, num_elements: 8} -statement error Arrow error: Io error: Status \{ code: Internal, message: "Execute logical plan: Datafusion: type_coercion\\ncaused by\\nError during planning: Coercion from \[Timestamp\(Nanosecond, None\), Utf8\] to the signature OneOf\(\[Exact\(\[Timestamp\(Second, None\), Float64\]\), Exact\(\[Timestamp\(Millisecond, None\), Float64\]\), Exact\(\[Timestamp\(Microsecond, None\), Float64\]\), Exact\(\[Timestamp\(Nanosecond, None\), Float64\]\)\]\) failed\.", .* +statement error Arrow error: Io error: Status \{ code: Internal, message: "Build logical plan: Datafusion: Error during planning: No function matches the given name and argument types 'gauge_agg\(Timestamp\(Nanosecond, None\), Utf8\)'\. You might need to add explicit type casts\.\\n\\tCandidate functions:\\n\\tgauge_agg\(Timestamp\(Second, None\), Float64\)\\n\\tgauge_agg\(Timestamp\(Millisecond, None\), Float64\)\\n\\tgauge_agg\(Timestamp\(Microsecond, None\), Float64\)\\n\\tgauge_agg\(Timestamp\(Nanosecond, None\), Float64\)", .* select gauge_agg(time, t0 order by time) from func_tbl; diff --git a/query_server/sqllogicaltests/cases/stream/syntax.slt b/query_server/sqllogicaltests/cases/stream/syntax.slt index cbbcaff2aa..1bb0bd8c70 100644 --- a/query_server/sqllogicaltests/cases/stream/syntax.slt +++ b/query_server/sqllogicaltests/cases/stream/syntax.slt @@ -25,7 +25,7 @@ statement ok DROP TABLE IF EXISTS TskvTable; # field not found -statement error .*Unable to get field named \\"time_x\\".* +statement error .*Unable to get field named \\"time_x\\".* CREATE STREAM TABLE TskvTable ( time_x TIMESTAMP, name STRING,