diff --git a/Cargo.lock b/Cargo.lock index a2939f425712..002c3ff93296 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,6 +2219,7 @@ dependencies = [ "async-ffi", "async-trait", "datafusion", + "datafusion-common", "datafusion-functions-aggregate-common", "datafusion-proto", "datafusion-proto-common", diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 77bc8d3d2000..58c19160d12b 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -26,9 +26,9 @@ use clap::ValueEnum; use datafusion::arrow::array::{ArrayRef, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::exec_err; use datafusion::common::instant::Instant; -use datafusion::error::{DataFusionError, Result}; +use datafusion::common::{exec_datafusion_err, exec_err}; +use datafusion::error::Result; use std::fs::File; use std::io::BufReader; use std::str::FromStr; @@ -84,9 +84,7 @@ impl Command { Self::Include(filename) => { if let Some(filename) = filename { let file = File::open(filename).map_err(|e| { - DataFusionError::Execution(format!( - "Error opening {filename:?} {e}" - )) + exec_datafusion_err!("Error opening {filename:?} {e}") })?; exec_from_lines(ctx, &mut BufReader::new(file), print_options) .await?; diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index a7c5c970f23a..533ac3ba03d3 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -295,10 +295,7 @@ pub fn get_gcs_object_store_builder( fn get_bucket_name(url: &Url) -> Result<&str> { url.host_str().ok_or_else(|| { - DataFusionError::Execution(format!( - "Not able to parse bucket name from url: {}", - url.as_str() - )) + exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str()) }) } diff --git a/datafusion-examples/examples/function_factory.rs b/datafusion-examples/examples/function_factory.rs index adc88f4305d2..d4312ae59409 100644 --- a/datafusion-examples/examples/function_factory.rs +++ b/datafusion-examples/examples/function_factory.rs @@ -17,7 +17,7 @@ use arrow::datatypes::DataType; use datafusion::common::tree_node::{Transformed, TreeNode}; -use datafusion::common::{exec_err, internal_err, DataFusionError}; +use datafusion::common::{exec_datafusion_err, exec_err, internal_err, DataFusionError}; use datafusion::error::Result; use datafusion::execution::context::{ FunctionFactory, RegisterFunction, SessionContext, SessionState, @@ -185,9 +185,7 @@ impl ScalarFunctionWrapper { fn parse_placeholder_identifier(placeholder: &str) -> Result { if let Some(value) = placeholder.strip_prefix('$') { Ok(value.parse().map(|v: usize| v - 1).map_err(|e| { - DataFusionError::Execution(format!( - "Placeholder `{placeholder}` parsing error: {e}!" - )) + exec_datafusion_err!("Placeholder `{placeholder}` parsing error: {e}!") })?) } else { exec_err!("Placeholder should start with `$`!") diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs index 44ea62d04c09..c7d0146a001f 100644 --- a/datafusion-examples/examples/json_shredding.rs +++ b/datafusion-examples/examples/json_shredding.rs @@ -25,7 +25,7 @@ use datafusion::assert_batches_eq; use datafusion::common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion::common::{assert_contains, Result}; +use datafusion::common::{assert_contains, exec_datafusion_err, Result}; use datafusion::datasource::listing::{ ListingTable, ListingTableConfig, ListingTableUrl, }; @@ -230,8 +230,8 @@ impl ScalarUDFImpl for JsonGetStr { let key = match &args.args[0] { ColumnarValue::Scalar(ScalarValue::Utf8(Some(key))) => key, _ => { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_str first argument must be a string".to_string(), + return Err(exec_datafusion_err!( + "json_get_str first argument must be a string" )) } }; @@ -241,13 +241,13 @@ impl ScalarUDFImpl for JsonGetStr { .as_any() .downcast_ref::() .ok_or_else(|| { - datafusion::error::DataFusionError::Execution( - "json_get_str second argument must be a string array".to_string(), + exec_datafusion_err!( + "json_get_str second argument must be a string array" ) })?, _ => { - return Err(datafusion::error::DataFusionError::Execution( - "json_get_str second argument must be a string array".to_string(), + return Err(exec_datafusion_err!( + "json_get_str second argument must be a string array" )) } }; diff --git a/datafusion-examples/examples/memory_pool_execution_plan.rs b/datafusion-examples/examples/memory_pool_execution_plan.rs index 7a77e99691ef..3258cde17625 100644 --- a/datafusion-examples/examples/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/memory_pool_execution_plan.rs @@ -27,8 +27,9 @@ use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; use datafusion::common::record_batch; +use datafusion::common::{exec_datafusion_err, internal_err}; use datafusion::datasource::{memory::MemTable, DefaultTableSource}; -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::Result; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -247,9 +248,7 @@ impl ExecutionPlan for BufferingExecutionPlan { children[0].clone(), ))) } else { - Err(DataFusionError::Internal( - "BufferingExecutionPlan must have exactly one child".to_string(), - )) + internal_err!("BufferingExecutionPlan must have exactly one child") } } @@ -289,9 +288,7 @@ impl ExecutionPlan for BufferingExecutionPlan { // In a real implementation, you would create a batch stream from the processed results record_batch!(("id", Int32, vec![5]), ("name", Utf8, vec!["Eve"])) .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create final RecordBatch: {e}", - )) + exec_datafusion_err!("Failed to create final RecordBatch: {e}") }) }), ))) diff --git a/datafusion/catalog/src/listing_schema.rs b/datafusion/catalog/src/listing_schema.rs index 7e19c1ecaab0..af96cfc15fc8 100644 --- a/datafusion/catalog/src/listing_schema.rs +++ b/datafusion/catalog/src/listing_schema.rs @@ -25,7 +25,9 @@ use std::sync::{Arc, Mutex}; use crate::{SchemaProvider, TableProvider, TableProviderFactory}; use crate::Session; -use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference}; +use datafusion_common::{ + internal_datafusion_err, DFSchema, DataFusionError, HashMap, TableReference, +}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -109,17 +111,13 @@ impl ListingSchemaProvider { let file_name = table .path .file_name() - .ok_or_else(|| { - DataFusionError::Internal("Cannot parse file name!".to_string()) - })? + .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))? .to_str() - .ok_or_else(|| { - DataFusionError::Internal("Cannot parse file name!".to_string()) - })?; + .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?; let table_name = file_name.split('.').collect_vec()[0]; - let table_path = table.to_string().ok_or_else(|| { - DataFusionError::Internal("Cannot parse file name!".to_string()) - })?; + let table_path = table + .to_string() + .ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?; if !self.table_exist(table_name) { let table_url = format!("{}/{}", self.authority, table_path); diff --git a/datafusion/common/src/utils/memory.rs b/datafusion/common/src/utils/memory.rs index 7ac081e0beb8..29e523996cf4 100644 --- a/datafusion/common/src/utils/memory.rs +++ b/datafusion/common/src/utils/memory.rs @@ -17,7 +17,8 @@ //! This module provides a function to estimate the memory size of a HashTable prior to allocation -use crate::{DataFusionError, Result}; +use crate::error::_exec_datafusion_err; +use crate::Result; use std::mem::size_of; /// Estimates the memory size required for a hash table prior to allocation. @@ -36,7 +37,7 @@ use std::mem::size_of; /// buckets. /// - One byte overhead for each bucket. /// - The fixed size overhead of the collection. -/// - If the estimation overflows, we return a [`DataFusionError`] +/// - If the estimation overflows, we return a [`crate::error::DataFusionError`] /// /// # Examples /// --- @@ -94,9 +95,7 @@ pub fn estimate_memory_size(num_elements: usize, fixed_size: usize) -> Result .checked_add(fixed_size) }) .ok_or_else(|| { - DataFusionError::Execution( - "usize overflow while estimating the number of buckets".to_string(), - ) + _exec_datafusion_err!("usize overflow while estimating the number of buckets") }) } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 7443625276d0..c72e3b3a8df7 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -22,8 +22,8 @@ pub mod memory; pub mod proxy; pub mod string_utils; -use crate::error::{_exec_datafusion_err, _internal_err}; -use crate::{DataFusionError, Result, ScalarValue}; +use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err}; +use crate::{Result, ScalarValue}; use arrow::array::{ cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray, OffsetSizeTrait, @@ -147,9 +147,7 @@ pub fn bisect( let low: usize = 0; let high: usize = item_columns .first() - .ok_or_else(|| { - DataFusionError::Internal("Column array shouldn't be empty".to_string()) - })? + .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))? .len(); let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { let cmp = compare_rows(current, target, sort_options)?; @@ -198,9 +196,7 @@ pub fn linear_search( let low: usize = 0; let high: usize = item_columns .first() - .ok_or_else(|| { - DataFusionError::Internal("Column array shouldn't be empty".to_string()) - })? + .ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))? .len(); let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { let cmp = compare_rows(current, target, sort_options)?; @@ -365,9 +361,7 @@ pub fn get_at_indices>( .map(|idx| items.get(*idx.borrow()).cloned()) .collect::>>() .ok_or_else(|| { - DataFusionError::Execution( - "Expects indices to be in the range of searched vector".to_string(), - ) + _exec_datafusion_err!("Expects indices to be in the range of searched vector") }) } @@ -808,7 +802,7 @@ pub fn find_indices>( .into_iter() .map(|target| items.iter().position(|e| target.borrow().eq(e))) .collect::>() - .ok_or_else(|| DataFusionError::Execution("Target not found".to_string())) + .ok_or_else(|| _exec_datafusion_err!("Target not found")) } /// Transposes the given vector of vectors. diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 02c2c81ad54b..287a133273d8 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -51,9 +51,9 @@ use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ - exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, - DataFusionError, ParamValues, ScalarValue, SchemaError, TableReference, - UnnestOptions, + exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError, + TableReference, UnnestOptions, }; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ @@ -1347,9 +1347,9 @@ impl DataFrame { .and_then(|r| r.columns().first()) .and_then(|c| c.as_any().downcast_ref::()) .and_then(|a| a.values().first()) - .ok_or(DataFusionError::Internal( - "Unexpected output when collecting for count()".to_string(), - ))? as usize; + .ok_or_else(|| { + internal_datafusion_err!("Unexpected output when collecting for count()") + })? as usize; Ok(len) } diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 5ce70e32843d..25bc166d657a 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -44,7 +44,8 @@ use arrow::ipc::{root_as_message, CompressionType}; use datafusion_catalog::Session; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, + internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics, + DEFAULT_ARROW_EXTENSION, }; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; @@ -128,8 +129,8 @@ impl FileFormat for ArrowFormat { let ext = self.get_ext(); match file_compression_type.get_variant() { CompressionTypeVariant::UNCOMPRESSED => Ok(ext), - _ => Err(DataFusionError::Internal( - "Arrow FileFormat does not support compression.".into(), + _ => Err(internal_datafusion_err!( + "Arrow FileFormat does not support compression." )), } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index de58740bcd05..3ce58938d77e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -31,8 +31,8 @@ use arrow_schema::Schema; use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ - config_datafusion_err, config_err, internal_err, plan_err, project_schema, - stats::Precision, Constraints, DataFusionError, Result, SchemaExt, + config_datafusion_err, config_err, internal_datafusion_err, internal_err, plan_err, + project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt, }; use datafusion_datasource::{ compute_all_files_statistics, @@ -249,7 +249,7 @@ impl ListingTableConfig { .await? .next() .await - .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; + .ok_or_else(|| internal_datafusion_err!("No files for table"))??; let (file_extension, maybe_compression_type) = ListingTableConfig::infer_file_extension_and_compression_type( @@ -984,11 +984,11 @@ impl ListingTable { let file_schema = config .file_schema - .ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?; + .ok_or_else(|| internal_datafusion_err!("No schema provided."))?; - let options = config.options.ok_or_else(|| { - DataFusionError::Internal("No ListingOptions provided".into()) - })?; + let options = config + .options + .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?; // Add the partition columns to the file schema let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned()); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 23329830f20c..a8148b80495e 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -34,7 +34,7 @@ use crate::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }, datasource::{provider_as_source, MemTable, ViewTable}, - error::{DataFusionError, Result}, + error::Result, execution::{ options::ArrowReadOptions, runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, @@ -66,9 +66,10 @@ use datafusion_catalog::{ use datafusion_common::config::ConfigOptions; use datafusion_common::{ config::{ConfigExtension, TableOptions}, - exec_datafusion_err, exec_err, not_impl_err, plan_datafusion_err, plan_err, + exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err, + plan_datafusion_err, plan_err, tree_node::{TreeNodeRecursion, TreeNodeVisitor}, - DFSchema, ParamValues, ScalarValue, SchemaReference, TableReference, + DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaReference, TableReference, }; pub use datafusion_execution::config::SessionConfig; use datafusion_execution::registry::SerializerRegistry; @@ -297,13 +298,13 @@ impl SessionContext { pub async fn refresh_catalogs(&self) -> Result<()> { let cat_names = self.catalog_names().clone(); for cat_name in cat_names.iter() { - let cat = self.catalog(cat_name.as_str()).ok_or_else(|| { - DataFusionError::Internal("Catalog not found!".to_string()) - })?; + let cat = self + .catalog(cat_name.as_str()) + .ok_or_else(|| internal_datafusion_err!("Catalog not found!"))?; for schema_name in cat.schema_names() { - let schema = cat.schema(schema_name.as_str()).ok_or_else(|| { - DataFusionError::Internal("Schema not found!".to_string()) - })?; + let schema = cat + .schema(schema_name.as_str()) + .ok_or_else(|| internal_datafusion_err!("Schema not found!"))?; let lister = schema.as_any().downcast_ref::(); if let Some(lister) = lister { lister.refresh(&self.state()).await?; @@ -949,17 +950,15 @@ impl SessionContext { let state = self.state.read(); let name = &state.config().options().catalog.default_catalog; let catalog = state.catalog_list().catalog(name).ok_or_else(|| { - DataFusionError::Execution(format!( - "Missing default catalog '{name}'" - )) + exec_datafusion_err!("Missing default catalog '{name}'") })?; (catalog, tokens[0]) } 2 => { let name = &tokens[0]; - let catalog = self.catalog(name).ok_or_else(|| { - DataFusionError::Execution(format!("Missing catalog '{name}'")) - })?; + let catalog = self + .catalog(name) + .ok_or_else(|| exec_datafusion_err!("Missing catalog '{name}'"))?; (catalog, tokens[1]) } _ => return exec_err!("Unable to parse catalog from {schema_name}"), @@ -1099,11 +1098,7 @@ impl SessionContext { let limit = Self::parse_memory_limit(value)?; builder.with_metadata_cache_limit(limit) } - _ => { - return Err(DataFusionError::Plan(format!( - "Unknown runtime configuration: {variable}" - ))) - } + _ => return plan_err!("Unknown runtime configuration: {variable}"), }; *state = SessionStateBuilder::from(state.clone()) @@ -1126,18 +1121,14 @@ impl SessionContext { pub fn parse_memory_limit(limit: &str) -> Result { let (number, unit) = limit.split_at(limit.len() - 1); let number: f64 = number.parse().map_err(|_| { - DataFusionError::Plan(format!( - "Failed to parse number from memory limit '{limit}'" - )) + plan_datafusion_err!("Failed to parse number from memory limit '{limit}'") })?; match unit { "K" => Ok((number * 1024.0) as usize), "M" => Ok((number * 1024.0 * 1024.0) as usize), "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize), - _ => Err(DataFusionError::Plan(format!( - "Unsupported unit '{unit}' in memory limit '{limit}'" - ))), + _ => plan_err!("Unsupported unit '{unit}' in memory limit '{limit}'"), } } @@ -1152,10 +1143,7 @@ impl SessionContext { .table_factories() .get(file_type.as_str()) .ok_or_else(|| { - DataFusionError::Execution(format!( - "Unable to find factory for {}", - cmd.file_type - )) + exec_datafusion_err!("Unable to find factory for {}", cmd.file_type) })?; let table = (*factory).create(&state, cmd).await?; Ok(table) @@ -1196,9 +1184,11 @@ impl SessionContext { match function_factory { Some(f) => f.create(&state, stmt).await?, - _ => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, + _ => { + return Err(DataFusionError::Configuration( + "Function factory has not been configured".to_string(), + )) + } } }; @@ -1974,6 +1964,7 @@ mod tests { use crate::test; use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; + use datafusion_common::DataFusionError; use std::error::Error; use std::path::PathBuf; diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e9b531723fd6..aa538f6dee81 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -64,8 +64,8 @@ use datafusion::test_util::{ use datafusion_catalog::TableProvider; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{ - assert_contains, Constraint, Constraints, DFSchema, DataFusionError, ParamValues, - ScalarValue, TableReference, UnnestOptions, + assert_contains, internal_datafusion_err, Constraint, Constraints, DFSchema, + DataFusionError, ParamValues, ScalarValue, TableReference, UnnestOptions, }; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::file_format::format_as_file_type; @@ -5446,11 +5446,11 @@ async fn union_literal_is_null_and_not_null() -> Result<()> { for batch in batches { // Verify schema is the same for all batches if !schema.contains(&batch.schema()) { - return Err(DataFusionError::Internal(format!( + return Err(internal_datafusion_err!( "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", &schema, batch.schema() - ))); + )); } } diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index a775ef4270f3..b6f406e96750 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -28,7 +28,7 @@ use datafusion::physical_plan::aggregates::{ use datafusion::physical_plan::execution_plan::Boundedness; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; -use datafusion_common::{DataFusionError, JoinType, ScalarValue}; +use datafusion_common::{exec_datafusion_err, DataFusionError, JoinType, ScalarValue}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr_common::operator::Operator; use datafusion_expr_common::operator::Operator::{Divide, Eq, Gt, Modulo}; @@ -783,7 +783,7 @@ async fn stream_yields( Ok(Pending) => Yielded::ReadyOrPending, Ok(Ready(Ok(_))) => Yielded::ReadyOrPending, Ok(Ready(Err(e))) => Yielded::Err(e), - Err(_) => Yielded::Err(DataFusionError::Execution("join error".into())), + Err(_) => Yielded::Err(exec_datafusion_err!("join error")), } }, _ = tokio::time::sleep(Duration::from_secs(10)) => { diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs index 564232651424..b90b3e5e32df 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use arrow::array::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, Result}; use datafusion_common_runtime::JoinSet; use rand::{rng, Rng}; @@ -197,7 +197,7 @@ impl AggregationFuzzer { while let Some(join_handle) = join_set.join_next().await { // propagate errors join_handle.map_err(|e| { - DataFusionError::Internal(format!("AggregationFuzzer task error: {e:?}")) + internal_datafusion_err!("AggregationFuzzer task error: {e:?}") })??; } Ok(()) @@ -308,7 +308,7 @@ impl AggregationFuzzTestTask { format_batches_with_limit(expected_result), format_batches_with_limit(&self.dataset_ref.batches), ); - DataFusionError::Internal(message) + internal_datafusion_err!("{message}") }) } diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs index 3e8ab82f9a63..be35ddca8f02 100644 --- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs +++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs @@ -23,7 +23,7 @@ use arrow::array::{ArrayRef, Float32Array, Float64Array, RecordBatch, UInt32Arra use arrow::compute::{lexsort_to_indices, take_record_batch, SortColumn, SortOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::utils::{compare_rows, get_row_at_idx}; -use datafusion_common::{exec_err, plan_err, DataFusionError, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, plan_err, Result}; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -562,11 +562,11 @@ impl ScalarUDFImpl for TestScalarUDF { DataType::Float64 => Arc::new({ let arg = &args[0].as_any().downcast_ref::().ok_or_else( || { - DataFusionError::Internal(format!( + internal_datafusion_err!( "could not cast {} to {}", self.name(), std::any::type_name::() - )) + ) }, )?; @@ -577,11 +577,11 @@ impl ScalarUDFImpl for TestScalarUDF { DataType::Float32 => Arc::new({ let arg = &args[0].as_any().downcast_ref::().ok_or_else( || { - DataFusionError::Internal(format!( + internal_datafusion_err!( "could not cast {} to {}", self.name(), std::any::type_name::() - )) + ) }, )?; diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index ec77c1e64c74..49e2caaa7417 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -16,7 +16,7 @@ // under the License. use arrow_schema::SchemaRef; -use datafusion_common::DataFusionError; +use datafusion_common::internal_datafusion_err; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -108,8 +108,6 @@ impl ExecutionPlan for OnceExec { let stream = self.stream.lock().unwrap().take(); - stream.ok_or(DataFusionError::Internal( - "Stream already consumed".to_string(), - )) + stream.ok_or_else(|| internal_datafusion_err!("Stream already consumed")) } } diff --git a/datafusion/core/tests/parquet/encryption.rs b/datafusion/core/tests/parquet/encryption.rs index 034e99b4408f..819d8bf3a283 100644 --- a/datafusion/core/tests/parquet/encryption.rs +++ b/datafusion/core/tests/parquet/encryption.rs @@ -25,7 +25,7 @@ use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::listing::ListingOptions; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_common::config::{EncryptionFactoryOptions, TableParquetOptions}; -use datafusion_common::{assert_batches_sorted_eq, DataFusionError}; +use datafusion_common::{assert_batches_sorted_eq, exec_datafusion_err, DataFusionError}; use datafusion_datasource_parquet::ParquetFormat; use datafusion_execution::parquet_encryption::EncryptionFactory; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; @@ -359,9 +359,9 @@ impl EncryptionFactory for MockEncryptionFactory { Some(&"test value".to_string()) ); let keys = self.encryption_keys.lock().unwrap(); - let key = keys.get(file_path).ok_or_else(|| { - DataFusionError::Execution(format!("No key for file {file_path:?}")) - })?; + let key = keys + .get(file_path) + .ok_or_else(|| exec_datafusion_err!("No key for file {file_path:?}"))?; let decryption_properties = FileDecryptionProperties::builder(key.clone()).build()?; Ok(Some(decryption_properties)) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index b0d0c94c1737..f1af66de9b59 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -37,8 +37,8 @@ use datafusion_common::cast::{as_float64_array, as_int32_array}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::utils::take_function_args; use datafusion_common::{ - assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err, not_impl_err, - plan_err, DFSchema, DataFusionError, Result, ScalarValue, + assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_datafusion_err, + exec_err, not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::FieldMetadata; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; @@ -1009,9 +1009,7 @@ impl ScalarFunctionWrapper { fn parse_placeholder_identifier(placeholder: &str) -> Result { if let Some(value) = placeholder.strip_prefix('$') { Ok(value.parse().map(|v: usize| v - 1).map_err(|e| { - DataFusionError::Execution(format!( - "Placeholder `{placeholder}` parsing error: {e}!" - )) + exec_datafusion_err!("Placeholder `{placeholder}` parsing error: {e}!") })?) } else { exec_err!("Placeholder should start with `$`!") diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 79f0082d514f..7389a52b3a99 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -37,8 +37,8 @@ use arrow::{ }; use datafusion_common::config::ConfigOptions; use datafusion_common::{ - exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue, - Statistics, + exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics, + Constraints, Result, ScalarValue, Statistics, }; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, @@ -876,8 +876,8 @@ impl FileScanConfig { target_partitions: usize, ) -> Result> { if target_partitions == 0 { - return Err(DataFusionError::Internal( - "target_partitions must be greater than 0".to_string(), + return Err(internal_datafusion_err!( + "target_partitions must be greater than 0" )); } @@ -1123,12 +1123,9 @@ impl PartitionColumnProjector { let mut cols = file_batch.columns().to_vec(); for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; + let p_value = partition_values.get(pidx).ok_or_else(|| { + exec_datafusion_err!("Invalid partitioning found on disk") + })?; let mut partition_value = Cow::Borrowed(p_value); diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 75fb557b63d2..e80099823054 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -38,7 +38,7 @@ use datafusion_common::cast::{ as_int8_array, as_string_array, as_string_view_array, as_uint16_array, as_uint32_array, as_uint64_array, as_uint8_array, }; -use datafusion_common::{exec_datafusion_err, not_impl_err, DataFusionError}; +use datafusion_common::{exec_datafusion_err, internal_datafusion_err, not_impl_err}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; @@ -203,9 +203,7 @@ async fn row_count_demuxer( .send(rb) .await .map_err(|_| { - DataFusionError::Execution( - "Error sending RecordBatch to file stream!".into(), - ) + exec_datafusion_err!("Error sending RecordBatch to file stream!") })?; next_send_steam = (next_send_steam + 1) % minimum_parallel_files; @@ -248,9 +246,8 @@ fn create_new_file_stream( single_file_output, ); let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2); - tx.send((file_path, rx_file)).map_err(|_| { - DataFusionError::Execution("Error sending RecordBatch to file stream!".into()) - })?; + tx.send((file_path, rx_file)) + .map_err(|_| exec_datafusion_err!("Error sending RecordBatch to file stream!"))?; Ok(tx_file) } @@ -307,17 +304,13 @@ async fn hive_style_partitions_demuxer( ); tx.send((file_path, part_rx)).map_err(|_| { - DataFusionError::Execution( - "Error sending new file stream!".into(), - ) + exec_datafusion_err!("Error sending new file stream!") })?; value_map.insert(part_key.clone(), part_tx); - value_map - .get_mut(&part_key) - .ok_or(DataFusionError::Internal( - "Key must exist since it was just inserted!".into(), - ))? + value_map.get_mut(&part_key).ok_or_else(|| { + exec_datafusion_err!("Key must exist since it was just inserted!") + })? } }; @@ -329,7 +322,7 @@ async fn hive_style_partitions_demuxer( // Finally send the partial batch partitioned by distinct value! part_tx.send(final_batch_to_send).await.map_err(|_| { - DataFusionError::Internal("Unexpected error sending parted batch!".into()) + internal_datafusion_err!("Unexpected error sending parted batch!") })?; } } diff --git a/datafusion/datasource/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs index c6d4b25cbccd..ab836b7b7f38 100644 --- a/datafusion/datasource/src/write/orchestration.rs +++ b/datafusion/datasource/src/write/orchestration.rs @@ -27,7 +27,9 @@ use crate::file_compression_type::FileCompressionType; use datafusion_common::error::Result; use arrow::array::RecordBatch; -use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError}; +use datafusion_common::{ + exec_datafusion_err, internal_datafusion_err, internal_err, DataFusionError, +}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_execution::TaskContext; @@ -117,9 +119,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( Err(e) => { return SerializedRecordBatchResult::failure( None, - DataFusionError::Execution(format!( - "Error writing to object store: {e}" - )), + exec_datafusion_err!("Error writing to object store: {e}"), ) } }; @@ -133,9 +133,9 @@ pub(crate) async fn serialize_rb_stream_to_object_store( // Handle task panic or cancellation return SerializedRecordBatchResult::failure( Some(writer), - DataFusionError::Execution(format!( + exec_datafusion_err!( "Serialization task panicked or was cancelled: {e}" - )), + ), ); } } diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index cd75c9f3c49e..ef83128ac681 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -20,7 +20,7 @@ //! and query data inside these systems. use dashmap::DashMap; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, DataFusionError, Result}; #[cfg(not(target_arch = "wasm32"))] use object_store::local::LocalFileSystem; use object_store::ObjectStore; @@ -236,9 +236,7 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { .get(&s) .map(|o| Arc::clone(o.value())) .ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {url}. See `RuntimeEnv::register_object_store`" - )) + internal_datafusion_err!("No suitable object store found for {url}. See `RuntimeEnv::register_object_store`") }) } } diff --git a/datafusion/execution/src/parquet_encryption.rs b/datafusion/execution/src/parquet_encryption.rs index c06764a0eb55..73881e11ca72 100644 --- a/datafusion/execution/src/parquet_encryption.rs +++ b/datafusion/execution/src/parquet_encryption.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::config::EncryptionFactoryOptions; use datafusion_common::error::Result; -use datafusion_common::DataFusionError; +use datafusion_common::internal_datafusion_err; use object_store::path::Path; use parquet::encryption::decrypt::FileDecryptionProperties; use parquet::encryption::encrypt::FileEncryptionProperties; @@ -75,9 +75,9 @@ impl EncryptionFactoryRegistry { .get(id) .map(|f| Arc::clone(f.value())) .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "No Parquet encryption factory found for id '{id}'" - )) + ) }) } } diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 19f97f9e79ec..c2a6cfe2c833 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -19,7 +19,7 @@ use crate::{ config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry, runtime_env::RuntimeEnv, }; -use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, plan_datafusion_err, Result}; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use std::collections::HashSet; @@ -168,9 +168,9 @@ impl FunctionRegistry for TaskContext { let result = self.window_functions.get(name); result.cloned().ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "There is no UDWF named \"{name}\" in the TaskContext" - )) + ) }) } fn register_udaf( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 42eda4aea75c..7a283b0420d3 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,9 +54,9 @@ use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err, - plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality, - Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, + exec_err, get_target_functional_dependencies, internal_datafusion_err, not_impl_err, + plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, + NullEquality, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -1188,7 +1188,7 @@ impl LogicalPlanBuilder { if join_on.is_empty() { let join = Self::from(self.plan).cross_join(right)?; join.filter(filters.ok_or_else(|| { - DataFusionError::Internal("filters should not be None here".to_string()) + internal_datafusion_err!("filters should not be None here") })?) } else { let join = Join::try_new( @@ -2191,7 +2191,9 @@ mod tests { use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery}; use crate::test::function_stub::sum; - use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError}; + use datafusion_common::{ + Constraint, DataFusionError, RecursionUnnestOption, SchemaError, + }; use insta::assert_snapshot; #[test] diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 174ab28a1e6d..ea08c223e8f4 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -31,7 +31,7 @@ use crate::dml::CopyTo; use arrow::datatypes::Schema; use datafusion_common::display::GraphvizBuilder; use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::{Column, DataFusionError}; +use datafusion_common::{internal_datafusion_err, Column, DataFusionError}; use serde_json::json; /// Formats plans with a single line per node. For example: @@ -200,14 +200,14 @@ impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> { self.graphviz_builder .add_node(self.f, id, &label, None) - .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?; + .map_err(|_e| internal_datafusion_err!("Fail to format"))?; // Create an edge to our parent node, if any // parent_id -> id if let Some(parent_id) = self.parent_ids.last() { self.graphviz_builder .add_edge(self.f, *parent_id, id) - .map_err(|_e| DataFusionError::Internal("Fail to format".to_string()))?; + .map_err(|_e| internal_datafusion_err!("Fail to format"))?; } self.parent_ids.push(id); @@ -221,7 +221,7 @@ impl<'n> TreeNodeVisitor<'n> for GraphvizVisitor<'_, '_> { // always be non-empty as pre_visit always pushes // So it should always be Ok(true) let res = self.parent_ids.pop(); - res.ok_or(DataFusionError::Internal("Fail to format".to_string())) + res.ok_or(internal_datafusion_err!("Fail to format")) .map(|_| TreeNodeRecursion::Continue) } } @@ -686,9 +686,10 @@ impl<'n> TreeNodeVisitor<'n> for PgJsonVisitor<'_, '_> { ) -> datafusion_common::Result { let id = self.parent_ids.pop().unwrap(); - let current_node = self.objects.remove(&id).ok_or_else(|| { - DataFusionError::Internal("Missing current node!".to_string()) - })?; + let current_node = self + .objects + .remove(&id) + .ok_or_else(|| internal_datafusion_err!("Missing current node!"))?; if let Some(parent_id) = self.parent_ids.last() { let parent_node = self diff --git a/datafusion/expr/src/simplify.rs b/datafusion/expr/src/simplify.rs index 467ce8bf53e2..02794271a9ee 100644 --- a/datafusion/expr/src/simplify.rs +++ b/datafusion/expr/src/simplify.rs @@ -18,7 +18,7 @@ //! Structs and traits to provide the information needed for expression simplification. use arrow::datatypes::DataType; -use datafusion_common::{DFSchemaRef, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, DFSchemaRef, Result}; use crate::{execution_props::ExecutionProps, Expr, ExprSchemable}; @@ -86,9 +86,7 @@ impl SimplifyInfo for SimplifyContext<'_> { /// Returns true if expr is nullable fn nullable(&self, expr: &Expr) -> Result { let schema = self.schema.as_ref().ok_or_else(|| { - DataFusionError::Internal( - "attempt to get nullability without schema".to_string(), - ) + internal_datafusion_err!("attempt to get nullability without schema") })?; expr.nullable(schema.as_ref()) } @@ -96,9 +94,7 @@ impl SimplifyInfo for SimplifyContext<'_> { /// Returns data type of this expr needed for determining optimized int type of a value fn get_data_type(&self, expr: &Expr) -> Result { let schema = self.schema.as_ref().ok_or_else(|| { - DataFusionError::Internal( - "attempt to get data type without schema".to_string(), - ) + internal_datafusion_err!("attempt to get data type without schema") })?; expr.get_type(schema) } diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index 12eda9a745db..cdfb18ee1ddd 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -28,9 +28,9 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::{ - internal_err, + internal_datafusion_err, internal_err, utils::{compare_rows, get_row_at_idx, search_in_slice}, - DataFusionError, Result, ScalarValue, + Result, ScalarValue, }; /// Holds the state of evaluating a window function @@ -402,8 +402,8 @@ impl WindowFrameStateRange { .sort_options .first() .ok_or_else(|| { - DataFusionError::Internal( - "Sort options unexpectedly absent in a window frame".to_string(), + internal_datafusion_err!( + "Sort options unexpectedly absent in a window frame" ) })? .descending; diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 21822cf2a94b..babfe28ad557 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -44,6 +44,7 @@ arrow-schema = { workspace = true } async-ffi = { version = "0.5.0", features = ["abi_stable"] } async-trait = { workspace = true } datafusion = { workspace = true, default-features = false } +datafusion-common = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index 6c2282df88dd..1739235d1703 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -32,6 +32,7 @@ use datafusion::{ error::DataFusionError, execution::{RecordBatchStream, SendableRecordBatchStream}, }; +use datafusion_common::{exec_datafusion_err, exec_err}; use futures::{Stream, TryStreamExt}; use tokio::runtime::Handle; @@ -163,9 +164,8 @@ fn wrapped_array_to_record_batch(array: WrappedArray) -> Result { let struct_array = array .as_any() .downcast_ref::() - .ok_or(DataFusionError::Execution( - "Unexpected array type during record batch collection in FFI_RecordBatchStream" - .to_string(), + .ok_or_else(|| exec_datafusion_err!( + "Unexpected array type during record batch collection in FFI_RecordBatchStream - expected StructArray" ))?; Ok(struct_array.into()) @@ -178,9 +178,7 @@ fn maybe_wrapped_array_to_record_batch( ROption::RSome(RResult::ROk(wrapped_array)) => { Some(wrapped_array_to_record_batch(wrapped_array)) } - ROption::RSome(RResult::RErr(e)) => { - Some(Err(DataFusionError::Execution(e.to_string()))) - } + ROption::RSome(RResult::RErr(e)) => Some(exec_err!("FFI error: {e}")), ROption::RNone => None, } } @@ -200,9 +198,9 @@ impl Stream for FFI_RecordBatchStream { Poll::Ready(maybe_wrapped_array_to_record_batch(array)) } FfiPoll::Pending => Poll::Pending, - FfiPoll::Panicked => Poll::Ready(Some(Err(DataFusionError::Execution( - "Error occurred during poll_next on FFI_RecordBatchStream".to_string(), - )))), + FfiPoll::Panicked => Poll::Ready(Some(exec_err!( + "Panic occurred during poll_next on FFI_RecordBatchStream" + ))), } } } diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 60434a7dda12..cef4161d8c1f 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -33,12 +33,13 @@ use arrow::datatypes::Schema; use async_trait::async_trait; use datafusion::{ catalog::{Session, TableProvider}, - error::{DataFusionError, Result}, + error::Result, execution::RecordBatchStream, physical_expr::EquivalenceProperties, physical_plan::{ExecutionPlan, Partitioning}, prelude::Expr, }; +use datafusion_common::exec_err; use futures::Stream; use tokio::{ runtime::Handle, @@ -259,9 +260,9 @@ impl Stream for AsyncTestRecordBatchStream { }); if let Err(e) = this.batch_request.try_send(true) { - return std::task::Poll::Ready(Some(Err(DataFusionError::Execution( - format!("Unable to send batch request, {e}"), - )))); + return std::task::Poll::Ready(Some(exec_err!( + "Failed to send batch request: {e}" + ))); } match this.batch_receiver.blocking_recv() { @@ -269,9 +270,9 @@ impl Stream for AsyncTestRecordBatchStream { Some(batch) => std::task::Poll::Ready(Some(Ok(batch))), None => std::task::Poll::Ready(None), }, - Err(e) => std::task::Poll::Ready(Some(Err(DataFusionError::Execution( - format!("Unable to receive record batch: {e}"), - )))), + Err(e) => std::task::Poll::Ready(Some(exec_err!( + "Failed to receive record batch: {e}" + ))), } } } diff --git a/datafusion/ffi/src/udaf/accumulator_args.rs b/datafusion/ffi/src/udaf/accumulator_args.rs index 594b839458b0..0302c26a2e6b 100644 --- a/datafusion/ffi/src/udaf/accumulator_args.rs +++ b/datafusion/ffi/src/udaf/accumulator_args.rs @@ -30,6 +30,7 @@ use datafusion::{ physical_expr::{PhysicalExpr, PhysicalSortExpr}, prelude::SessionContext, }; +use datafusion_common::exec_datafusion_err; use datafusion_proto::{ physical_plan::{ from_proto::{parse_physical_exprs, parse_physical_sort_exprs}, @@ -108,9 +109,12 @@ impl TryFrom for ForeignAccumulatorArgs { type Error = DataFusionError; fn try_from(value: FFI_AccumulatorArgs) -> Result { - let proto_def = - PhysicalAggregateExprNode::decode(value.physical_expr_def.as_ref()) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; + let proto_def = PhysicalAggregateExprNode::decode( + value.physical_expr_def.as_ref(), + ) + .map_err(|e| { + exec_datafusion_err!("Failed to decode PhysicalAggregateExprNode: {e}") + })?; let return_field = Arc::new((&value.return_field.0).try_into()?); let schema = Schema::try_from(&value.schema.0)?; diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index e46c15512aab..1ea1798c7c8b 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -37,6 +37,7 @@ use datafusion::{ error::Result, logical_expr::{AggregateUDF, AggregateUDFImpl, Signature}, }; +use datafusion_common::exec_datafusion_err; use datafusion_proto_common::from_proto::parse_proto_fields_to_fields; use groups_accumulator::{FFI_GroupsAccumulator, ForeignGroupsAccumulator}; use std::hash::{Hash, Hasher}; @@ -487,13 +488,13 @@ impl AggregateUDFImpl for ForeignAggregateUDF { .into_iter() .map(|field_bytes| { datafusion_proto_common::Field::decode(field_bytes.as_ref()) - .map_err(|e| DataFusionError::Execution(e.to_string())) + .map_err(|e| exec_datafusion_err!("{e}")) }) .collect::>>()?; parse_proto_fields_to_fields(fields.iter()) .map(|fields| fields.into_iter().map(Arc::new).collect()) - .map_err(|e| DataFusionError::Execution(e.to_string())) + .map_err(|e| exec_datafusion_err!("{e}")) } } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 5362734db2f7..2bede01bb69f 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -36,6 +36,7 @@ use datafusion::{ error::Result, logical_expr::{Signature, WindowUDF, WindowUDFImpl}, }; +use datafusion_common::exec_err; use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator}; use partition_evaluator_args::{ FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, @@ -336,9 +337,9 @@ impl WindowUDFImpl for ForeignWindowUDF { let schema: SchemaRef = schema.into(); match schema.fields().is_empty() { - true => Err(DataFusionError::Execution( - "Unable to retrieve field in WindowUDF via FFI".to_string(), - )), + true => exec_err!( + "Unable to retrieve field in WindowUDF via FFI - schema has no fields" + ), false => Ok(schema.field(0).to_owned().into()), } } diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs b/datafusion/ffi/src/udwf/partition_evaluator_args.rs index b6f9d2a13ece..cd2641256437 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator_args.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs @@ -31,6 +31,7 @@ use datafusion::{ physical_plan::{expressions::Column, PhysicalExpr}, prelude::SessionContext, }; +use datafusion_common::exec_datafusion_err; use datafusion_proto::{ physical_plan::{ from_proto::parse_physical_expr, to_proto::serialize_physical_exprs, @@ -145,7 +146,7 @@ impl TryFrom for ForeignPartitionEvaluatorArgs { .into_iter() .map(|input_expr_bytes| PhysicalExprNode::decode(input_expr_bytes.as_ref())) .collect::, prost::DecodeError>>() - .map_err(|e| DataFusionError::Execution(e.to_string()))? + .map_err(|e| exec_datafusion_err!("Failed to decode PhysicalExprNode: {e}"))? .iter() .map(|expr_node| { parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec) diff --git a/datafusion/ffi/src/util.rs b/datafusion/ffi/src/util.rs index 5588996c3c6a..151464dc9745 100644 --- a/datafusion/ffi/src/util.rs +++ b/datafusion/ffi/src/util.rs @@ -30,7 +30,7 @@ macro_rules! df_result { match $x { abi_stable::std_types::RResult::ROk(v) => Ok(v), abi_stable::std_types::RResult::RErr(e) => { - Err(datafusion::error::DataFusionError::Execution(e.to_string())) + datafusion_common::exec_err!("FFI error: {}", e) } } }; @@ -142,21 +142,21 @@ mod tests { let returned_err_result = df_result!(err_r_result); assert!(returned_err_result.is_err()); assert!( - returned_err_result.unwrap_err().to_string() - == format!("Execution error: {ERROR_VALUE}") + returned_err_result.unwrap_err().strip_backtrace() + == format!("Execution error: FFI error: {ERROR_VALUE}") ); let ok_result: Result = Ok(VALID_VALUE.to_string()); let err_result: Result = - Err(DataFusionError::Execution(ERROR_VALUE.to_string())); + datafusion_common::exec_err!("{ERROR_VALUE}"); let returned_ok_r_result = wrap_result(ok_result); assert!(returned_ok_r_result == RResult::ROk(VALID_VALUE.into())); let returned_err_r_result = wrap_result(err_result); - assert!( - returned_err_r_result - == RResult::RErr(format!("Execution error: {ERROR_VALUE}").into()) - ); + assert!(returned_err_r_result.is_err()); + assert!(returned_err_r_result + .unwrap_err() + .starts_with(format!("Execution error: {ERROR_VALUE}").as_str())); } } diff --git a/datafusion/functions-aggregate-common/src/utils.rs b/datafusion/functions-aggregate-common/src/utils.rs index 3e527bafe9f0..b01f2c8629c9 100644 --- a/datafusion/functions-aggregate-common/src/utils.rs +++ b/datafusion/functions-aggregate-common/src/utils.rs @@ -20,7 +20,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{ ArrowNativeType, DataType, DecimalType, Field, FieldRef, ToByteSlice, }; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, Result}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::sync::Arc; @@ -112,15 +112,17 @@ impl DecimalAverager { ) -> Result { let sum_mul = T::Native::from_usize(10_usize) .map(|b| b.pow_wrapping(sum_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute sum_mul in DecimalAverager".to_string(), - ))?; + .ok_or_else(|| { + internal_datafusion_err!("Failed to compute sum_mul in DecimalAverager") + })?; let target_mul = T::Native::from_usize(10_usize) .map(|b| b.pow_wrapping(target_scale as u32)) - .ok_or(DataFusionError::Internal( - "Failed to compute target_mul in DecimalAverager".to_string(), - ))?; + .ok_or_else(|| { + internal_datafusion_err!( + "Failed to compute target_mul in DecimalAverager" + ) + })?; if target_mul >= sum_mul { Ok(Self { diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index abb144c045c5..9affdb3ee5f6 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -32,7 +32,8 @@ use arrow::datatypes::{ use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; use datafusion_common::ScalarValue; use datafusion_common::{ - downcast_value, internal_err, not_impl_err, DataFusionError, Result, + downcast_value, internal_datafusion_err, internal_err, not_impl_err, DataFusionError, + Result, }; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; @@ -64,9 +65,7 @@ impl TryFrom<&[u8]> for HyperLogLog { type Error = DataFusionError; fn try_from(v: &[u8]) -> Result> { let arr: [u8; 16384] = v.try_into().map_err(|_| { - DataFusionError::Internal( - "Impossibly got invalid binary array from states".into(), - ) + internal_datafusion_err!("Impossibly got invalid binary array from states") })?; Ok(HyperLogLog::::new_with_registers(arr)) } @@ -182,8 +181,8 @@ macro_rules! default_accumulator_impl { let binary_array = downcast_value!(states[0], BinaryArray); for v in binary_array.iter() { let v = v.ok_or_else(|| { - DataFusionError::Internal( - "Impossibly got empty binary array from states".into(), + internal_datafusion_err!( + "Impossibly got empty binary array from states" ) })?; let other = v.try_into()?; diff --git a/datafusion/functions-nested/src/extract.rs b/datafusion/functions-nested/src/extract.rs index 7aad167f1da4..a46c9c75094c 100644 --- a/datafusion/functions-nested/src/extract.rs +++ b/datafusion/functions-nested/src/extract.rs @@ -32,8 +32,8 @@ use datafusion_common::cast::as_large_list_array; use datafusion_common::cast::as_list_array; use datafusion_common::utils::ListCoercion; use datafusion_common::{ - exec_err, internal_datafusion_err, plan_err, utils::take_function_args, - DataFusionError, Result, + exec_datafusion_err, exec_err, internal_datafusion_err, plan_err, + utils::take_function_args, Result, }; use datafusion_expr::{ ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature, @@ -237,9 +237,7 @@ where i64: TryInto, { let index: O = index.try_into().map_err(|_| { - DataFusionError::Execution(format!( - "array_element got invalid index: {index}" - )) + exec_datafusion_err!("array_element got invalid index: {index}") })?; // 0 ~ len - 1 let adjusted_zero_index = if index < O::usize_as(0) { diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 82be31a15837..d00f3d734d76 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -269,9 +269,9 @@ impl GenerateSeriesTable { .map(|s| Tz::from_str(s.as_ref())) .transpose() .map_err(|e| { - datafusion_common::DataFusionError::Internal(format!( + datafusion_common::internal_datafusion_err!( "Failed to parse timezone: {e}" - )) + ) })? .unwrap_or_else(|| Tz::from_str("+00:00").unwrap()); Arc::new(RwLock::new(GenericSeriesState { diff --git a/datafusion/functions-window/src/ntile.rs b/datafusion/functions-window/src/ntile.rs index f8deac6b3365..14055b885023 100644 --- a/datafusion/functions-window/src/ntile.rs +++ b/datafusion/functions-window/src/ntile.rs @@ -23,7 +23,7 @@ use crate::utils::{ use arrow::datatypes::FieldRef; use datafusion_common::arrow::array::{ArrayRef, UInt64Array}; use datafusion_common::arrow::datatypes::{DataType, Field}; -use datafusion_common::{exec_err, DataFusionError, Result}; +use datafusion_common::{exec_datafusion_err, exec_err, Result}; use datafusion_expr::{ Documentation, Expr, PartitionEvaluator, Signature, Volatility, WindowUDFImpl, }; @@ -129,9 +129,7 @@ impl WindowUDFImpl for Ntile { let scalar_n = get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)? .ok_or_else(|| { - DataFusionError::Execution( - "NTILE requires a positive integer".to_string(), - ) + exec_datafusion_err!("NTILE requires a positive integer") })?; if scalar_n.is_null() { diff --git a/datafusion/functions/src/datetime/common.rs b/datafusion/functions/src/datetime/common.rs index df7de0083d31..65f9c9323925 100644 --- a/datafusion/functions/src/datetime/common.rs +++ b/datafusion/functions/src/datetime/common.rs @@ -29,7 +29,8 @@ use chrono::{DateTime, TimeZone, Utc}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::{ - exec_err, unwrap_or_internal_err, DataFusionError, Result, ScalarType, ScalarValue, + exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result, + ScalarType, ScalarValue, }; use datafusion_expr::ColumnarValue; @@ -83,9 +84,9 @@ pub(crate) fn string_to_datetime_formatted( format: &str, ) -> Result, DataFusionError> { let err = |err_ctx: &str| { - DataFusionError::Execution(format!( + exec_datafusion_err!( "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}" - )) + ) }; let mut parsed = Parsed::new(); @@ -149,9 +150,7 @@ pub(crate) fn string_to_timestamp_nanos_formatted( .naive_utc() .and_utc() .timestamp_nanos_opt() - .ok_or_else(|| { - DataFusionError::Execution(ERR_NANOSECONDS_NOT_SUPPORTED.to_string()) - }) + .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}")) } /// Accepts a string with a `chrono` format and converts it to a diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 36af504c9609..405aabfde991 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -32,7 +32,9 @@ use arrow::array::{Array, ArrayRef, Int64Array, PrimitiveArray}; use arrow::datatypes::DataType::{self, Null, Timestamp, Utf8, Utf8View}; use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second}; use datafusion_common::cast::as_primitive_array; -use datafusion_common::{exec_err, plan_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + exec_datafusion_err, exec_err, plan_err, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ @@ -427,16 +429,13 @@ fn date_trunc_coarse(granularity: &str, value: i64, tz: Option) -> Result to clear the various fields because need to clear per timezone, // and NaiveDateTime (ISO 8601) has no concept of timezones let value = as_datetime_with_timezone::(value, tz) - .ok_or(DataFusionError::Execution(format!( - "Timestamp {value} out of range" - )))?; + .ok_or(exec_datafusion_err!("Timestamp {value} out of range"))?; _date_trunc_coarse_with_tz(granularity, Some(value)) } None => { // Use chrono NaiveDateTime to clear the various fields, if we don't have a timezone. - let value = timestamp_ns_to_datetime(value).ok_or_else(|| { - DataFusionError::Execution(format!("Timestamp {value} out of range")) - })?; + let value = timestamp_ns_to_datetime(value) + .ok_or_else(|| exec_datafusion_err!("Timestamp {value} out of range"))?; _date_trunc_coarse_without_tz(granularity, Some(value)) } }?; @@ -541,9 +540,8 @@ fn general_date_trunc( fn parse_tz(tz: &Option>) -> Result> { tz.as_ref() .map(|tz| { - Tz::from_str(tz).map_err(|op| { - DataFusionError::Execution(format!("failed on timezone {tz}: {op:?}")) - }) + Tz::from_str(tz) + .map_err(|op| exec_datafusion_err!("failed on timezone {tz}: {op:?}")) }) .transpose() } diff --git a/datafusion/functions/src/datetime/to_local_time.rs b/datafusion/functions/src/datetime/to_local_time.rs index a8c413dd0eaf..a2a54398a33b 100644 --- a/datafusion/functions/src/datetime/to_local_time.rs +++ b/datafusion/functions/src/datetime/to_local_time.rs @@ -31,7 +31,8 @@ use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc}; use datafusion_common::cast::as_primitive_array; use datafusion_common::{ - exec_err, plan_err, utils::take_function_args, DataFusionError, Result, ScalarValue, + exec_err, internal_datafusion_err, plan_err, utils::take_function_args, Result, + ScalarValue, }; use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, @@ -326,15 +327,15 @@ fn adjust_to_local_time(ts: i64, tz: Tz) -> Result { // This should not fail under normal circumstances as the // maximum possible offset is 26 hours (93,600 seconds) TimeDelta::try_seconds(offset_seconds) - .ok_or(DataFusionError::Internal("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000".to_string()))?, + .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?, ); // convert the naive datetime back to i64 match T::UNIT { - Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or( - DataFusionError::Internal( - "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807".to_string(), - ), + Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(|| + internal_datafusion_err!( + "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807" + ) ), Microsecond => Ok(adjusted_date_time.timestamp_micros()), Millisecond => Ok(adjusted_date_time.timestamp_millis()), diff --git a/datafusion/functions/src/datetime/to_timestamp.rs b/datafusion/functions/src/datetime/to_timestamp.rs index d2a5f8102b37..dcd52aa07be3 100644 --- a/datafusion/functions/src/datetime/to_timestamp.rs +++ b/datafusion/functions/src/datetime/to_timestamp.rs @@ -953,7 +953,7 @@ mod tests { let expected = format!("Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"); let actual = string_to_datetime_formatted(&Utc, s, f) .unwrap_err() - .to_string(); + .strip_backtrace(); assert_eq!(actual, expected) } } @@ -981,7 +981,7 @@ mod tests { let expected = format!("Execution error: Error parsing timestamp from '{s}' using format '{f}': {ctx}"); let actual = string_to_datetime_formatted(&Utc, s, f) .unwrap_err() - .to_string(); + .strip_backtrace(); assert_eq!(actual, expected) } } diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index 721088c58e06..5baa91936320 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -30,7 +30,7 @@ use datafusion_common::{ not_impl_err, plan_err, utils::take_function_args, }; -use datafusion_common::{exec_err, ScalarValue}; +use datafusion_common::{exec_err, internal_datafusion_err, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ColumnarValue, Documentation}; use std::sync::Arc; @@ -309,18 +309,15 @@ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result { // only write input / 2 bytes to buf let out_len = input.len() / 2; let buf = &mut buf[..out_len]; - hex::decode_to_slice(input, buf).map_err(|e| { - DataFusionError::Internal(format!("Failed to decode from hex: {e}")) - })?; + hex::decode_to_slice(input, buf) + .map_err(|e| internal_datafusion_err!("Failed to decode from hex: {e}"))?; Ok(out_len) } fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result { general_purpose::STANDARD_NO_PAD .decode_slice(input, buf) - .map_err(|e| { - DataFusionError::Internal(format!("Failed to decode from base64: {e}")) - }) + .map_err(|e| internal_datafusion_err!("Failed to decode from base64: {e}")) } macro_rules! encode_to_array { @@ -418,15 +415,13 @@ impl Encoding { general_purpose::STANDARD_NO_PAD .decode(value) .map_err(|e| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Failed to decode value using base64: {e}" - )) + ) })? } Self::Hex => hex::decode(value).map_err(|e| { - DataFusionError::Internal(format!( - "Failed to decode value using hex: {e}" - )) + internal_datafusion_err!("Failed to decode value using hex: {e}") })?, }; @@ -444,15 +439,13 @@ impl Encoding { general_purpose::STANDARD_NO_PAD .decode(value) .map_err(|e| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Failed to decode value using base64: {e}" - )) + ) })? } Self::Hex => hex::decode(value).map_err(|e| { - DataFusionError::Internal(format!( - "Failed to decode value using hex: {e}" - )) + internal_datafusion_err!("Failed to decode value using hex: {e}") })?, }; diff --git a/datafusion/functions/src/math/round.rs b/datafusion/functions/src/math/round.rs index de5c0930e0af..837f0be43240 100644 --- a/datafusion/functions/src/math/round.rs +++ b/datafusion/functions/src/math/round.rs @@ -313,6 +313,6 @@ mod test { let result = round(&args); assert!(result.is_err()); - assert!(matches!(result, Err(DataFusionError::Execution { .. }))); + assert!(matches!(result, Err(DataFusionError::Execution(_)))); } } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index fffa1cbe6f21..3d5dee3a7255 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -29,8 +29,9 @@ use crate::utils::NamePreserver; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, - DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, + exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, + plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, + TableReference, }; use datafusion_expr::expr::{ self, AggregateFunctionParams, Alias, Between, BinaryExpr, Case, Exists, InList, @@ -440,23 +441,23 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> { let low_type = low.get_type(self.schema)?; let low_coerced_type = comparison_coercion(&expr_type, &low_type) .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Failed to coerce types {expr_type} and {low_type} in BETWEEN expression" - )) + ) })?; let high_type = high.get_type(self.schema)?; let high_coerced_type = comparison_coercion(&expr_type, &high_type) .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Failed to coerce types {expr_type} and {high_type} in BETWEEN expression" - )) + ) })?; let coercion_type = comparison_coercion(&low_coerced_type, &high_coerced_type) .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Failed to coerce types {expr_type} and {high_type} in BETWEEN expression" - )) + ) })?; Ok(Transformed::yes(Expr::Between(Between::new( Box::new(expr.cast_to(&coercion_type, self.schema)?), diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index de8cc5a94aa5..4710ece24f91 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -31,7 +31,9 @@ use datafusion_common::{ cast::{as_large_list_array, as_list_array}, tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, }; -use datafusion_common::{internal_err, DFSchema, DataFusionError, Result, ScalarValue}; +use datafusion_common::{ + exec_datafusion_err, internal_err, DFSchema, DataFusionError, Result, ScalarValue, +}; use datafusion_expr::{ and, binary::BinaryTypeCoercer, lit, or, BinaryExpr, Case, ColumnarValue, Expr, Like, Operator, Volatility, @@ -695,7 +697,7 @@ impl<'a> ConstEvaluator<'a> { ColumnarValue::Array(a) => { if a.len() != 1 { ConstSimplifyResult::SimplifyRuntimeError( - DataFusionError::Execution(format!("Could not evaluate the expression, found a result of length {}", a.len())), + exec_datafusion_err!("Could not evaluate the expression, found a result of length {}", a.len()), expr, ) } else if as_list_array(&a).is_ok() { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 7536def35385..745ae855efee 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -275,7 +275,7 @@ pub(crate) mod tests { use arrow::array::{ArrayRef, Float32Array, Float64Array}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_common::{exec_err, DataFusionError, ScalarValue}; + use datafusion_common::{exec_err, internal_datafusion_err, ScalarValue}; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -335,11 +335,11 @@ pub(crate) mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "could not cast {} to {}", self.name(), std::any::type_name::() - )) + ) })?; arg.iter() @@ -351,11 +351,11 @@ pub(crate) mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "could not cast {} to {}", self.name(), std::any::type_name::() - )) + ) })?; arg.iter() diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 7b51300d30e7..2ed9770902d5 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -33,7 +33,7 @@ use arrow::array::ArrayRef; use arrow::array::BooleanArray; use arrow::datatypes::FieldRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{exec_datafusion_err, Result, ScalarValue}; use datafusion_expr::{Accumulator, WindowFrame, WindowFrameBound, WindowFrameUnits}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; @@ -157,10 +157,9 @@ impl WindowExpr for PlainAggregateWindowExpr { // This enables us to run queries involving UNBOUNDED PRECEDING frames // using bounded memory for suitable aggregations. for partition_row in partition_batches.keys() { - let window_state = - window_agg_state.get_mut(partition_row).ok_or_else(|| { - DataFusionError::Execution("Cannot find state".to_string()) - })?; + let window_state = window_agg_state + .get_mut(partition_row) + .ok_or_else(|| exec_datafusion_err!("Cannot find state"))?; let state = &mut window_state.state; if self.window_frame.start_bound.is_unbounded() { state.window_frame_range.start = diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index c783862d579d..a6b5bf187116 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -32,7 +32,8 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::utils::compare_rows; use datafusion_common::{ - arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, + arrow_datafusion_err, exec_datafusion_err, internal_err, DataFusionError, Result, + ScalarValue, }; use datafusion_expr::window_state::{ PartitionBatchState, WindowAggState, WindowFrameContext, WindowFrameStateGroups, @@ -244,10 +245,9 @@ pub trait AggregateWindowExpr: WindowExpr { }, ); }; - let window_state = - window_agg_state.get_mut(partition_row).ok_or_else(|| { - DataFusionError::Execution("Cannot find state".to_string()) - })?; + let window_state = window_agg_state + .get_mut(partition_row) + .ok_or_else(|| exec_datafusion_err!("Cannot find state"))?; let accumulator = match &mut window_state.window_fn { WindowFn::Aggregate(accumulator) => accumulator, _ => unreachable!(), diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index d19e62550ef6..5ee7023ff6ee 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -36,7 +36,7 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_common::{config::ConfigOptions, internal_err, Result}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::is_volatile; use datafusion_physical_plan::filter_pushdown::{ @@ -462,24 +462,20 @@ fn push_down_filters( let filter_description_parent_filters = filter_description.parent_filters(); let filter_description_self_filters = filter_description.self_filters(); if filter_description_parent_filters.len() != children.len() { - return Err(datafusion_common::DataFusionError::Internal( - format!( - "Filter pushdown expected FilterDescription to have parent filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", - expected_num_children = children.len(), - actual_num_children = filter_description_parent_filters.len(), - node_name = node.name(), - ), - )); + return internal_err!( + "Filter pushdown expected FilterDescription to have parent filters for {}, but got {} for node {}", + children.len(), + filter_description_parent_filters.len(), + node.name() + ); } if filter_description_self_filters.len() != children.len() { - return Err(datafusion_common::DataFusionError::Internal( - format!( - "Filter pushdown expected FilterDescription to have self filters for {expected_num_children}, but got {actual_num_children} for node {node_name}", - expected_num_children = children.len(), - actual_num_children = filter_description_self_filters.len(), - node_name = node.name(), - ), - )); + return internal_err!( + "Filter pushdown expected FilterDescription to have self filters for {}, but got {} for node {}", + children.len(), + filter_description_self_filters.len(), + node.name() + ); } for (child_idx, (child, parent_filters, self_filters)) in izip!( @@ -529,15 +525,13 @@ fn push_down_filters( // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); if all_filters.len() != num_self_filters + num_parent_filters { - return Err(datafusion_common::DataFusionError::Internal( - format!( - "Filter pushdown did not return the expected number of filters: expected {num_self_filters} self filters and {num_parent_filters} parent filters, but got {num_filters_from_child}. Likely culprit is {child}", - num_self_filters = num_self_filters, - num_parent_filters = num_parent_filters, - num_filters_from_child = all_filters.len(), - child = child.name(), - ), - )); + return internal_err!( + "Filter pushdown did not return the expected number of filters: expected {} self filters and {} parent filters, but got {}. Likely culprit is {}", + num_self_filters, + num_parent_filters, + all_filters.len(), + child.name() + ); } let parent_filters = all_filters .split_off(num_self_filters) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs index be1f68ea453f..860905663e47 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType}; use datafusion_common::utils::proxy::VecAllocExt; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::{exec_datafusion_err, Result}; use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY}; use itertools::izip; use std::mem::size_of; @@ -201,10 +201,10 @@ where self.buffer.append_slice(value); if self.buffer.len() > self.max_buffer_size { - return Err(DataFusionError::Execution(format!( + return Err(exec_datafusion_err!( "offset overflow, buffer size > {}", self.max_buffer_size - ))); + )); } self.offsets.push(O::usize_as(self.buffer.len())); diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index 5d96ac6dcced..aeb7775fe258 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -40,7 +40,7 @@ use arrow::datatypes::{ UInt8Type, }; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, not_impl_err, Result}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; @@ -1170,9 +1170,9 @@ impl GroupValues for GroupValuesColumn { if let DataType::Dictionary(_, v) = expected { let actual = array.data_type(); if v.as_ref() != actual { - return Err(DataFusionError::Internal(format!( + return Err(internal_datafusion_err!( "Converted group rows expected dictionary of {v} got {actual}" - ))); + )); } *array = cast(array.as_ref(), expected)?; } diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 00b65b7e770d..974aea3b6292 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -26,7 +26,7 @@ use arrow::array::{ ArrowPrimitiveType, LargeStringArray, PrimitiveArray, StringArray, StringViewArray, }; use arrow::datatypes::{i256, DataType}; -use datafusion_common::DataFusionError; +use datafusion_common::exec_datafusion_err; use datafusion_common::Result; use half::f16; use hashbrown::raw::RawTable; @@ -432,9 +432,9 @@ pub fn new_hash_table( _ => {} } - Err(DataFusionError::Execution(format!( + Err(exec_datafusion_err!( "Can't create HashTable for type: {kt:?}" - ))) + )) } #[cfg(test)] diff --git a/datafusion/physical-plan/src/aggregates/topk/heap.rs b/datafusion/physical-plan/src/aggregates/topk/heap.rs index ce47504daf03..23ccf5e17ef6 100644 --- a/datafusion/physical-plan/src/aggregates/topk/heap.rs +++ b/datafusion/physical-plan/src/aggregates/topk/heap.rs @@ -24,7 +24,7 @@ use arrow::array::{ use arrow::array::{downcast_primitive, ArrayRef, ArrowPrimitiveType, PrimitiveArray}; use arrow::buffer::ScalarBuffer; use arrow::datatypes::{i256, DataType}; -use datafusion_common::DataFusionError; +use datafusion_common::exec_datafusion_err; use datafusion_common::Result; use half::f16; @@ -478,9 +478,7 @@ pub fn new_heap( _ => {} } - Err(DataFusionError::Execution(format!( - "Can't group type: {vt:?}" - ))) + Err(exec_datafusion_err!("Can't group type: {vt:?}")) } #[cfg(test)] diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index bf02692486cc..9aaadfd52b96 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -26,7 +26,7 @@ use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use arrow::util::pretty::print_batches; -use datafusion_common::DataFusionError; +use datafusion_common::internal_datafusion_err; use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -61,7 +61,7 @@ impl GroupedTopKAggregateStream { aggregate_expressions(&aggr.aggr_expr, &aggr.mode, group_by.expr.len())?; let (val_field, desc) = aggr .get_minmax_desc() - .ok_or_else(|| DataFusionError::Internal("Min/max required".to_string()))?; + .ok_or_else(|| internal_datafusion_err!("Min/max required"))?; let (expr, _) = &aggr.group_expr().expr()[0]; let kt = expr.data_type(&aggr.input().schema())?; diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index e386f79e428a..2420edfc743d 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1087,7 +1087,7 @@ mod tests { use std::fmt::Write; use std::sync::Arc; - use datafusion_common::{DataFusionError, Result, Statistics}; + use datafusion_common::{internal_datafusion_err, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use crate::{DisplayAs, ExecutionPlan, PlanProperties}; @@ -1153,9 +1153,7 @@ mod tests { } match self { Self::Panic => panic!("expected panic"), - Self::Error => { - Err(DataFusionError::Internal("expected error".to_string())) - } + Self::Error => Err(internal_datafusion_err!("expected error")), Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())), } } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 700a9076fecf..b4cdf2dff2bf 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -32,7 +32,7 @@ use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, not_impl_err, Result}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; @@ -183,9 +183,9 @@ impl ExecutionPlan for RecursiveQueryExec { ) -> Result { // TODO: we might be able to handle multiple partitions in the future. if partition != 0 { - return Err(DataFusionError::Internal(format!( + return Err(internal_datafusion_err!( "RecursiveQueryExec got an invalid partition {partition} (expected 0)" - ))); + )); } let static_stream = self.static_term.execute(partition, Arc::clone(&context))?; diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index a0ccada2f9e7..72bdebdf7de4 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -43,7 +43,7 @@ use crate::stream::ObservedStream; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; -use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; +use datafusion_common::{exec_err, internal_datafusion_err, internal_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties}; @@ -213,11 +213,9 @@ impl ExecutionPlan for UnionExec { fn check_invariants(&self, check: InvariantLevel) -> Result<()> { check_default_invariants(self, check)?; - (self.inputs().len() >= 2) - .then_some(()) - .ok_or(DataFusionError::Internal( - "UnionExec should have at least 2 children".into(), - )) + (self.inputs().len() >= 2).then_some(()).ok_or_else(|| { + internal_datafusion_err!("UnionExec should have at least 2 children") + }) } fn children(&self) -> Vec<&Arc> { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 8b6a40625b4f..cb8bd0a3cf19 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -52,7 +52,7 @@ use datafusion_common::utils::{ evaluate_partition_ranges, get_at_indices, get_row_at_idx, }; use datafusion_common::{ - arrow_datafusion_err, exec_err, DataFusionError, HashMap, Result, + arrow_datafusion_err, exec_datafusion_err, exec_err, DataFusionError, HashMap, Result, }; use datafusion_execution::TaskContext; use datafusion_expr::window_state::{PartitionBatchState, WindowAggState}; @@ -1223,8 +1223,7 @@ fn get_aggregate_result_out_column( "Generated row number should be {len_to_show}, it is {running_length}" ); } - result - .ok_or_else(|| DataFusionError::Execution("Should contain something".to_string())) + result.ok_or_else(|| exec_datafusion_err!("Should contain something")) } /// Constructs a batch from the last row of batch in the argument. diff --git a/datafusion/proto/src/common.rs b/datafusion/proto/src/common.rs index 2b052a31b8b7..2aa12dd3504b 100644 --- a/datafusion/proto/src/common.rs +++ b/datafusion/proto/src/common.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; pub(crate) fn str_to_byte(s: &String, description: &str) -> Result { if s.len() != 1 { @@ -29,9 +29,9 @@ pub(crate) fn str_to_byte(s: &String, description: &str) -> Result { pub(crate) fn byte_to_string(b: u8, description: &str) -> Result { let b = &[b]; let b = std::str::from_utf8(b).map_err(|_| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Invalid CSV {description}: can not represent {b:0x?} as utf8" - )) + ) })?; Ok(b.to_owned()) } diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index f9989bdb2c64..0e76e19ecb1a 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -26,7 +26,7 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_common::{ - exec_err, not_impl_err, parsers::CompressionTypeVariant, DataFusionError, + exec_datafusion_err, exec_err, not_impl_err, parsers::CompressionTypeVariant, TableReference, }; use prost::Message; @@ -203,7 +203,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec { _ctx: &SessionContext, ) -> datafusion_common::Result> { let proto = CsvOptionsProto::decode(buf).map_err(|e| { - DataFusionError::Execution(format!("Failed to decode CsvOptionsProto: {e:?}")) + exec_datafusion_err!("Failed to decode CsvOptionsProto: {e:?}") })?; let options: CsvOptions = (&proto).into(); Ok(Arc::new(CsvFormatFactory { @@ -227,9 +227,9 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec { options: Some(options), }); - proto.encode(buf).map_err(|e| { - DataFusionError::Execution(format!("Failed to encode CsvOptions: {e:?}")) - })?; + proto + .encode(buf) + .map_err(|e| exec_datafusion_err!("Failed to encode CsvOptions: {e:?}"))?; Ok(()) } @@ -310,9 +310,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { _ctx: &SessionContext, ) -> datafusion_common::Result> { let proto = JsonOptionsProto::decode(buf).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to decode JsonOptionsProto: {e:?}" - )) + exec_datafusion_err!("Failed to decode JsonOptionsProto: {e:?}") })?; let options: JsonOptions = (&proto).into(); Ok(Arc::new(JsonFormatFactory { @@ -330,18 +328,16 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec { { json_factory.options.clone().unwrap_or_default() } else { - return Err(DataFusionError::Execution( - "Unsupported FileFormatFactory type".to_string(), - )); + return exec_err!("Unsupported FileFormatFactory type"); }; let proto = JsonOptionsProto::from_factory(&JsonFormatFactory { options: Some(options), }); - proto.encode(buf).map_err(|e| { - DataFusionError::Execution(format!("Failed to encode JsonOptions: {e:?}")) - })?; + proto + .encode(buf) + .map_err(|e| exec_datafusion_err!("Failed to encode JsonOptions: {e:?}"))?; Ok(()) } @@ -638,9 +634,7 @@ mod parquet { _ctx: &SessionContext, ) -> datafusion_common::Result> { let proto = TableParquetOptionsProto::decode(buf).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to decode TableParquetOptionsProto: {e:?}" - )) + exec_datafusion_err!("Failed to decode TableParquetOptionsProto: {e:?}") })?; let options: TableParquetOptions = (&proto).into(); Ok(Arc::new( @@ -662,9 +656,7 @@ mod parquet { { parquet_factory.options.clone().unwrap_or_default() } else { - return Err(DataFusionError::Execution( - "Unsupported FileFormatFactory type".to_string(), - )); + return exec_err!("Unsupported FileFormatFactory type"); }; let proto = TableParquetOptionsProto::from_factory(&ParquetFormatFactory { @@ -672,9 +664,7 @@ mod parquet { }); proto.encode(buf).map_err(|e| { - DataFusionError::Execution(format!( - "Failed to encode TableParquetOptionsProto: {e:?}" - )) + exec_datafusion_err!("Failed to encode TableParquetOptionsProto: {e:?}") })?; Ok(()) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 6687cc31a3b0..5cfcbc459be3 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -57,8 +57,8 @@ use datafusion::{ }; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - context, internal_datafusion_err, internal_err, not_impl_err, plan_err, - DataFusionError, Result, TableReference, ToDFSchema, + context, internal_datafusion_err, internal_err, not_impl_err, plan_err, Result, + TableReference, ToDFSchema, }; use datafusion_expr::{ dml, @@ -229,9 +229,9 @@ fn from_table_reference( error_context: &str, ) -> Result { let table_ref = table_ref.ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Protobuf deserialization error, {error_context} was missing required field name." - )) + ) })?; Ok(table_ref.clone().try_into()?) @@ -281,9 +281,8 @@ impl AsLogicalPlan for LogicalPlanNode { where Self: Sized, { - LogicalPlanNode::decode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to decode logical plan: {e:?}")) - }) + LogicalPlanNode::decode(buf) + .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}")) } fn try_encode(&self, buf: &mut B) -> Result<()> @@ -291,9 +290,8 @@ impl AsLogicalPlan for LogicalPlanNode { B: BufMut, Self: Sized, { - self.encode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to encode logical plan: {e:?}")) - }) + self.encode(buf) + .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}")) } fn try_into_logical_plan( @@ -581,15 +579,15 @@ impl AsLogicalPlan for LogicalPlanNode { } LogicalPlanType::CreateExternalTable(create_extern_table) => { let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| { - DataFusionError::Internal(String::from( + internal_datafusion_err!( "Protobuf deserialization error, CreateExternalTableNode was missing required field schema." - )) + ) })?; let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| { - DataFusionError::Internal(String::from( - "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.", - )) + internal_datafusion_err!( + "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints." + ) })?; let definition = if !create_extern_table.definition.is_empty() { Some(create_extern_table.definition.clone()) @@ -644,9 +642,9 @@ impl AsLogicalPlan for LogicalPlanNode { } LogicalPlanType::CreateView(create_view) => { let plan = create_view - .input.clone().ok_or_else(|| DataFusionError::Internal(String::from( - "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.", - )))? + .input.clone().ok_or_else(|| internal_datafusion_err!( + "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input." + ))? .try_into_logical_plan(ctx, extension_codec)?; let definition = if !create_view.definition.is_empty() { Some(create_view.definition.clone()) @@ -664,9 +662,9 @@ impl AsLogicalPlan for LogicalPlanNode { } LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => { let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| { - DataFusionError::Internal(String::from( - "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.", - )) + internal_datafusion_err!( + "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema." + ) })?; Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema( @@ -679,9 +677,9 @@ impl AsLogicalPlan for LogicalPlanNode { } LogicalPlanType::CreateCatalog(create_catalog) => { let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| { - DataFusionError::Internal(String::from( - "Protobuf deserialization error, CreateCatalogNode was missing required field schema.", - )) + internal_datafusion_err!( + "Protobuf deserialization error, CreateCatalogNode was missing required field schema." + ) })?; Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog( @@ -790,9 +788,9 @@ impl AsLogicalPlan for LogicalPlanNode { } LogicalPlanType::Union(union) => { if union.inputs.len() < 2 { - return Err( DataFusionError::Internal(String::from( - "Protobuf deserialization error, Union was require at least two input.", - ))); + return internal_err!( + "Protobuf deserialization error, Union was require at least two input." + ); } let (first, rest) = union.inputs.split_first().unwrap(); let mut builder = LogicalPlanBuilder::from( @@ -932,17 +930,17 @@ impl AsLogicalPlan for LogicalPlanNode { let static_term = recursive_query_node .static_term .as_ref() - .ok_or_else(|| DataFusionError::Internal(String::from( - "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.", - )))? + .ok_or_else(|| internal_datafusion_err!( + "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term." + ))? .try_into_logical_plan(ctx, extension_codec)?; let recursive_term = recursive_query_node .recursive_term .as_ref() - .ok_or_else(|| DataFusionError::Internal(String::from( - "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.", - )))? + .ok_or_else(|| internal_datafusion_err!( + "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term." + ))? .try_into_logical_plan(ctx, extension_codec)?; Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ee69ab75b25c..693adc6da03a 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -47,7 +47,7 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field}; use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{internal_datafusion_err, not_impl_err, DataFusionError, Result}; use datafusion_proto_common::common::proto_error; use crate::convert_required; @@ -138,11 +138,9 @@ pub fn parse_physical_window_expr( .as_ref() .map(|wf| wf.clone().try_into()) .transpose() - .map_err(|e| DataFusionError::Internal(format!("{e}")))? + .map_err(|e| internal_datafusion_err!("{e}"))? .ok_or_else(|| { - DataFusionError::Internal( - "Missing required field 'window_frame' in protobuf".to_string(), - ) + internal_datafusion_err!("Missing required field 'window_frame' in protobuf") })?; let fun = if let Some(window_func) = proto.window_function.as_ref() { @@ -421,9 +419,7 @@ fn parse_required_physical_expr( ) -> Result> { expr.map(|e| parse_physical_expr(e, ctx, input_schema, codec)) .transpose()? - .ok_or_else(|| { - DataFusionError::Internal(format!("Missing required field {field:?}")) - }) + .ok_or_else(|| internal_datafusion_err!("Missing required field {field:?}")) } pub fn parse_protobuf_hash_partitioning( diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e7d8479c1405..9c7397d00a29 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -94,7 +94,9 @@ use datafusion::physical_plan::{ ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; use datafusion_common::config::TableParquetOptions; -use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; +use datafusion_common::{ + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, +}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use prost::bytes::BufMut; @@ -109,7 +111,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Self: Sized, { protobuf::PhysicalPlanNode::decode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to decode physical plan: {e:?}")) + internal_datafusion_err!("failed to decode physical plan: {e:?}") }) } @@ -119,7 +121,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { Self: Sized, { self.encode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to encode physical plan: {e:?}")) + internal_datafusion_err!("failed to encode physical plan: {e:?}") }) } @@ -556,8 +558,8 @@ impl protobuf::PhysicalPlanNode { }) .transpose()? .ok_or_else(|| { - DataFusionError::Internal( - "filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(), + internal_datafusion_err!( + "filter (FilterExecNode) in PhysicalPlanNode is missing." ) })?; @@ -580,8 +582,8 @@ impl protobuf::PhysicalPlanNode { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, )), - Err(_) => Err(DataFusionError::Internal( - "filter_selectivity in PhysicalPlanNode is invalid ".to_owned(), + Err(_) => Err(internal_datafusion_err!( + "filter_selectivity in PhysicalPlanNode is invalid " )), } } @@ -746,9 +748,7 @@ impl protobuf::PhysicalPlanNode { .collect::>>()?; let proto_schema = scan.schema.as_ref().ok_or_else(|| { - DataFusionError::Internal( - "schema in MemoryScanExecNode is missing.".to_owned(), - ) + internal_datafusion_err!("schema in MemoryScanExecNode is missing.") })?; let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?); @@ -983,9 +983,7 @@ impl protobuf::PhysicalPlanNode { }; let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| { - DataFusionError::Internal( - "input_schema in AggregateNode is missing.".to_owned(), - ) + internal_datafusion_err!("input_schema in AggregateNode is missing.") })?; let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?); @@ -1903,7 +1901,7 @@ impl protobuf::PhysicalPlanNode { } Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => { let step_proto = args.step.as_ref().ok_or_else(|| { - DataFusionError::Internal("Missing step in TimestampArgs".to_string()) + internal_datafusion_err!("Missing step in TimestampArgs") })?; let step = IntervalMonthDayNanoType::make_value( step_proto.months, @@ -1921,7 +1919,7 @@ impl protobuf::PhysicalPlanNode { } Some(protobuf::generate_series_node::Args::DateArgs(args)) => { let step_proto = args.step.as_ref().ok_or_else(|| { - DataFusionError::Internal("Missing step in DateArgs".to_string()) + internal_datafusion_err!("Missing step in DateArgs") })?; let step = IntervalMonthDayNanoType::make_value( step_proto.months, @@ -3340,13 +3338,11 @@ impl ComposedPhysicalExtensionCodec { buf: &[u8], decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result, ) -> Result { - let proto = DataEncoderTuple::decode(buf) - .map_err(|e| DataFusionError::Internal(e.to_string()))?; + let proto = + DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?; let codec = self.codecs.get(proto.encoder_position as usize).ok_or( - DataFusionError::Internal( - "Can't find required codec in codec list".to_owned(), - ), + internal_datafusion_err!("Can't find required codec in codec list"), )?; decode(codec.as_ref(), &proto.blob) @@ -3387,7 +3383,7 @@ impl ComposedPhysicalExtensionCodec { }; proto .encode(buf) - .map_err(|e| DataFusionError::Internal(e.to_string())) + .map_err(|e| internal_datafusion_err!("{e}")) } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 11db88cacb0d..28c3f84c5c7e 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -42,7 +42,9 @@ use datafusion::{ }, physical_plan::expressions::LikeExpr, }; -use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; +use datafusion_common::{ + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result, +}; use datafusion_expr::WindowFrame; use crate::protobuf::{ @@ -159,7 +161,7 @@ pub fn serialize_physical_window_expr( let window_frame: protobuf::WindowFrame = window_frame .as_ref() .try_into() - .map_err(|e| DataFusionError::Internal(format!("{e}")))?; + .map_err(|e| internal_datafusion_err!("{e}"))?; Ok(protobuf::PhysicalWindowExprNode { args, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index d0f25a85f798..72e9123b299b 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -191,9 +191,8 @@ impl LogicalExtensionCodec for TestTableProviderCodec { schema: SchemaRef, _ctx: &SessionContext, ) -> Result> { - let msg = TestTableProto::decode(buf).map_err(|_| { - DataFusionError::Internal("Error decoding test table".to_string()) - })?; + let msg = TestTableProto::decode(buf) + .map_err(|_| internal_datafusion_err!("Error decoding test table"))?; assert_eq!(msg.table_name, table_ref.to_string()); let provider = TestTableProvider { url: msg.url, @@ -217,9 +216,8 @@ impl LogicalExtensionCodec for TestTableProviderCodec { url: table.url.clone(), table_name: table_ref.to_string(), }; - msg.encode(buf).map_err(|_| { - DataFusionError::Internal("Error encoding test table".to_string()) - }) + msg.encode(buf) + .map_err(|_| internal_datafusion_err!("Error encoding test table")) } } @@ -1164,7 +1162,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec { ) -> Result { if let Some((input, _)) = inputs.split_first() { let proto = proto::TopKPlanProto::decode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to decode logical plan: {e:?}")) + internal_datafusion_err!("failed to decode logical plan: {e:?}") })?; if let Some(expr) = proto.expr.as_ref() { @@ -1193,7 +1191,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec { }; proto.encode(buf).map_err(|e| { - DataFusionError::Internal(format!("failed to encode logical plan: {e:?}")) + internal_datafusion_err!("failed to encode logical plan: {e:?}") })?; Ok(()) @@ -1261,7 +1259,7 @@ impl LogicalExtensionCodec for UDFExtensionCodec { fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { if name == "regex_udf" { let proto = MyRegexUdfNode::decode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to decode regex_udf: {err}")) + internal_datafusion_err!("failed to decode regex_udf: {err}") })?; Ok(Arc::new(ScalarUDF::from(MyRegexUdf::new(proto.pattern)))) @@ -1276,18 +1274,16 @@ impl LogicalExtensionCodec for UDFExtensionCodec { let proto = MyRegexUdfNode { pattern: udf.pattern.clone(), }; - proto.encode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to encode udf: {err}")) - })?; + proto + .encode(buf) + .map_err(|err| internal_datafusion_err!("failed to encode udf: {err}"))?; Ok(()) } fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { if name == "aggregate_udf" { let proto = MyAggregateUdfNode::decode(buf).map_err(|err| { - DataFusionError::Internal(format!( - "failed to decode aggregate_udf: {err}" - )) + internal_datafusion_err!("failed to decode aggregate_udf: {err}") })?; Ok(Arc::new(AggregateUDF::from(MyAggregateUDF::new( @@ -1304,9 +1300,9 @@ impl LogicalExtensionCodec for UDFExtensionCodec { let proto = MyAggregateUdfNode { result: udf.result.clone(), }; - proto.encode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to encode udf: {err}")) - })?; + proto + .encode(buf) + .map_err(|err| internal_datafusion_err!("failed to encode udf: {err}"))?; Ok(()) } } @@ -2280,7 +2276,7 @@ fn roundtrip_scalar_udf() { if name == "dummy" { Ok(Arc::new(dummy_udf())) } else { - Err(DataFusionError::Internal(format!("UDF {name} not found"))) + Err(internal_datafusion_err!("UDF {name} not found")) } } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index c88c62952a50..825fc8e7bf64 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -99,7 +99,8 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_err, not_impl_err, DataFusionError, NullEquality, Result, UnnestOptions, + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, NullEquality, + Result, UnnestOptions, }; use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, @@ -1146,7 +1147,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result> { if name == "regex_udf" { let proto = MyRegexUdfNode::decode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to decode regex_udf: {err}")) + internal_datafusion_err!("failed to decode regex_udf: {err}") })?; Ok(Arc::new(ScalarUDF::from(MyRegexUdf::new(proto.pattern)))) @@ -1161,9 +1162,9 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { let proto = MyRegexUdfNode { pattern: udf.pattern.clone(), }; - proto.encode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to encode udf: {err}")) - })?; + proto + .encode(buf) + .map_err(|err| internal_datafusion_err!("failed to encode udf: {err}"))?; } Ok(()) } @@ -1171,9 +1172,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result> { if name == "aggregate_udf" { let proto = MyAggregateUdfNode::decode(buf).map_err(|err| { - DataFusionError::Internal(format!( - "failed to decode aggregate_udf: {err}" - )) + internal_datafusion_err!("failed to decode aggregate_udf: {err}") })?; Ok(Arc::new(AggregateUDF::from(MyAggregateUDF::new( @@ -1191,7 +1190,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { result: udf.result.clone(), }; proto.encode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to encode udf: {err:?}")) + internal_datafusion_err!("failed to encode udf: {err:?}") })?; } Ok(()) @@ -1200,7 +1199,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result> { if name == "custom_udwf" { let proto = CustomUDWFNode::decode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to decode custom_udwf: {err}")) + internal_datafusion_err!("failed to decode custom_udwf: {err}") })?; Ok(Arc::new(WindowUDF::from(CustomUDWF::new(proto.payload)))) @@ -1218,7 +1217,7 @@ impl PhysicalExtensionCodec for UDFExtensionCodec { payload: udwf.payload.clone(), }; proto.encode(buf).map_err(|err| { - DataFusionError::Internal(format!("failed to encode udwf: {err:?}")) + internal_datafusion_err!("failed to encode udwf: {err:?}") })?; } Ok(()) diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index f0f87ffe1e60..fa3454ce5644 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -34,10 +34,10 @@ use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_plan::metrics::Count; use log::{debug, trace}; -use datafusion_common::error::{DataFusionError, Result}; +use datafusion_common::error::Result; use datafusion_common::tree_node::TransformedResult; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, + internal_datafusion_err, internal_err, plan_datafusion_err, plan_err, tree_node::{Transformed, TreeNode}, ScalarValue, }; @@ -1218,9 +1218,9 @@ fn rewrite_column_expr( fn reverse_operator(op: Operator) -> Result { op.swap().ok_or_else(|| { - DataFusionError::Internal(format!( + internal_datafusion_err!( "Could not reverse operator {op} while building pruning predicate" - )) + ) }) } diff --git a/datafusion/spark/src/function/datetime/make_dt_interval.rs b/datafusion/spark/src/function/datetime/make_dt_interval.rs index c44ab69b8b30..bbfba4486134 100644 --- a/datafusion/spark/src/function/datetime/make_dt_interval.rs +++ b/datafusion/spark/src/function/datetime/make_dt_interval.rs @@ -211,7 +211,7 @@ mod tests { use arrow::datatypes::DataType::Duration; use arrow::datatypes::Field; use arrow::datatypes::TimeUnit::Microsecond; - use datafusion_common::{DataFusionError, Result}; + use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use super::*; @@ -267,7 +267,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; for i in 0..out.len() { @@ -293,7 +293,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; for i in 0..out.len() { @@ -331,7 +331,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(arr.len(), number_rows); @@ -355,7 +355,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(out.len(), 2); @@ -378,7 +378,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(out.len(), 2); @@ -401,7 +401,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(out.len(), 2); @@ -427,7 +427,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(out.len(), 2); @@ -452,7 +452,7 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal("expected DurationMicrosecondArray".into()) + internal_datafusion_err!("expected DurationMicrosecondArray") })?; assert_eq!(out.len(), 2); @@ -477,7 +477,7 @@ mod tests { assert!( matches!(res, Err(DataFusionError::Execution(_))), - "make_dt_interval expects between 0 and 4 arguments, got 5" + "make_dt_interval should return execution error for too many arguments" ); Ok(()) diff --git a/datafusion/spark/src/function/datetime/make_interval.rs b/datafusion/spark/src/function/datetime/make_interval.rs index c66f97ff5c22..8e3169556b95 100644 --- a/datafusion/spark/src/function/datetime/make_interval.rs +++ b/datafusion/spark/src/function/datetime/make_interval.rs @@ -238,7 +238,7 @@ mod tests { use arrow::array::{Float64Array, Int32Array, IntervalMonthDayNanoArray}; use arrow::datatypes::Field; use datafusion_common::config::ConfigOptions; - use datafusion_common::Result; + use datafusion_common::{internal_datafusion_err, internal_err, Result}; use super::*; fn run_make_interval_month_day_nano(arrs: Vec) -> Result { @@ -332,9 +332,7 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("expected IntervalMonthDayNano".into()) - }) + .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano")) .unwrap(); for i in 0..out.len() { @@ -360,9 +358,7 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("expected IntervalMonthDayNano".into()) - }) + .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano")) .unwrap(); for i in 0..out.len() { @@ -387,9 +383,7 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("expected IntervalMonthDayNano".into()) - }) + .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano")) .unwrap(); for i in 0..out.len() { @@ -413,9 +407,7 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("expected IntervalMonthDayNano".into()) - }) + .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano")) .unwrap(); for i in 0..out.len() { @@ -439,9 +431,7 @@ mod tests { let out = out .as_any() .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal("expected IntervalMonthDayNano".into()) - }) + .ok_or_else(|| internal_datafusion_err!("expected IntervalMonthDayNano")) .unwrap(); for i in 0..out.len() { @@ -541,38 +531,40 @@ mod tests { .as_any() .downcast_ref::() .ok_or_else(|| { - DataFusionError::Internal( - "expected IntervalMonthDayNanoArray".into(), - ) + internal_datafusion_err!("expected IntervalMonthDayNanoArray") })?; if arr.len() != number_rows { - return Err(DataFusionError::Internal(format!( + return internal_err!( "expected array length {number_rows}, got {}", arr.len() - ))); + ); } for i in 0..number_rows { let iv = arr.value(i); if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) { - return Err(DataFusionError::Internal(format!( + return internal_err!( "row {i}: expected (0,0,0), got ({},{},{})", - iv.months, iv.days, iv.nanoseconds - ))); + iv.months, + iv.days, + iv.nanoseconds + ); } } } ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(iv))) => { if (iv.months, iv.days, iv.nanoseconds) != (0, 0, 0) { - return Err(DataFusionError::Internal(format!( + return internal_err!( "expected scalar 0s, got ({},{},{})", - iv.months, iv.days, iv.nanoseconds - ))); + iv.months, + iv.days, + iv.nanoseconds + ); } } other => { - return Err(DataFusionError::Internal(format!( + return internal_err!( "expected Array or Scalar IntervalMonthDayNano, got {other:?}" - ))); + ); } } diff --git a/datafusion/spark/src/function/math/factorial.rs b/datafusion/spark/src/function/math/factorial.rs index ebb489d4ceb7..4921e73d262a 100644 --- a/datafusion/spark/src/function/math/factorial.rs +++ b/datafusion/spark/src/function/math/factorial.rs @@ -22,7 +22,7 @@ use arrow::array::{Array, Int64Array}; use arrow::datatypes::DataType; use arrow::datatypes::DataType::{Int32, Int64}; use datafusion_common::cast::as_int32_array; -use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; +use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::Signature; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility}; @@ -100,9 +100,7 @@ const FACTORIALS: [i64; 21] = [ pub fn spark_factorial(args: &[ColumnarValue]) -> Result { if args.len() != 1 { - return Err(DataFusionError::Internal( - "`factorial` expects exactly one argument".to_string(), - )); + return internal_err!("`factorial` expects exactly one argument"); } match &args[0] { diff --git a/datafusion/spark/src/function/math/hex.rs b/datafusion/spark/src/function/math/hex.rs index 120a053bb4f9..cdd13e903326 100644 --- a/datafusion/spark/src/function/math/hex.rs +++ b/datafusion/spark/src/function/math/hex.rs @@ -30,7 +30,7 @@ use arrow::{ use datafusion_common::cast::as_string_view_array; use datafusion_common::{ cast::{as_binary_array, as_fixed_size_binary_array, as_int64_array}, - exec_err, DataFusionError, + exec_err, internal_err, DataFusionError, }; use datafusion_expr::Signature; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Volatility}; @@ -185,9 +185,7 @@ pub fn compute_hex( lowercase: bool, ) -> Result { if args.len() != 1 { - return Err(DataFusionError::Internal( - "hex expects exactly one argument".to_string(), - )); + return internal_err!("hex expects exactly one argument"); } let input = match &args[0] { diff --git a/datafusion/spark/src/function/math/modulus.rs b/datafusion/spark/src/function/math/modulus.rs index 146cd2789840..fea0297a7ae9 100644 --- a/datafusion/spark/src/function/math/modulus.rs +++ b/datafusion/spark/src/function/math/modulus.rs @@ -18,7 +18,7 @@ use arrow::compute::kernels::numeric::add; use arrow::compute::kernels::{cmp::lt, numeric::rem, zip::zip}; use arrow::datatypes::DataType; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{internal_err, Result, ScalarValue}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -28,9 +28,7 @@ use std::any::Any; /// This function directly uses Arrow's arithmetic_op function for modulo operations pub fn spark_mod(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return Err(DataFusionError::Internal( - "mod expects exactly two arguments".to_string(), - )); + return internal_err!("mod expects exactly two arguments"); } let args = ColumnarValue::values_to_arrays(args)?; let result = rem(&args[0], &args[1])?; @@ -41,9 +39,7 @@ pub fn spark_mod(args: &[ColumnarValue]) -> Result { /// This function directly uses Arrow's arithmetic_op function for modulo operations pub fn spark_pmod(args: &[ColumnarValue]) -> Result { if args.len() != 2 { - return Err(DataFusionError::Internal( - "pmod expects exactly two arguments".to_string(), - )); + return internal_err!("pmod expects exactly two arguments"); } let args = ColumnarValue::values_to_arrays(args)?; let left = &args[0]; @@ -92,9 +88,7 @@ impl ScalarUDFImpl for SparkMod { fn return_type(&self, arg_types: &[DataType]) -> Result { if arg_types.len() != 2 { - return Err(DataFusionError::Internal( - "mod expects exactly two arguments".to_string(), - )); + return internal_err!("mod expects exactly two arguments"); } // Return the same type as the first argument for simplicity @@ -142,9 +136,7 @@ impl ScalarUDFImpl for SparkPmod { fn return_type(&self, arg_types: &[DataType]) -> Result { if arg_types.len() != 2 { - return Err(DataFusionError::Internal( - "pmod expects exactly two arguments".to_string(), - )); + return internal_err!("pmod expects exactly two arguments"); } // Return the same type as the first argument for simplicity diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 46b0aae7565e..3c57d195ade6 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -17,8 +17,8 @@ use arrow::datatypes::Field; use datafusion_common::{ - internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, - DataFusionError, Result, Span, TableReference, + exec_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, Result, Span, TableReference, }; use datafusion_expr::planner::PlannerResult; use datafusion_expr::{Case, Expr}; @@ -117,9 +117,7 @@ impl SqlToRel<'_, S> { .context_provider .get_variable_type(&var_names) .ok_or_else(|| { - DataFusionError::Execution(format!( - "variable {var_names:?} has no type information" - )) + exec_datafusion_err!("variable {var_names:?} has no type information") })?; Ok(Expr::ScalarVariable(ty, var_names)) } else { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index daee722526d9..3c86d2d04905 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -26,8 +26,8 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, }; use datafusion_common::{ - exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, Diagnostic, - HashMap, Result, ScalarValue, + exec_datafusion_err, exec_err, internal_err, plan_err, Column, DFSchemaRef, + Diagnostic, HashMap, Result, ScalarValue, }; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{ @@ -273,9 +273,7 @@ pub fn window_expr_common_partition_keys(window_exprs: &[Expr]) -> Result<&[Expr let result = all_partition_keys .iter() .min_by_key(|s| s.len()) - .ok_or_else(|| { - DataFusionError::Execution("No window expressions found".to_owned()) - })?; + .ok_or_else(|| exec_datafusion_err!("No window expressions found"))?; Ok(result) } diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 8dc2f15eef21..7aca0fdd6e8d 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -18,7 +18,7 @@ use clap::Parser; use datafusion::common::instant::Instant; use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{exec_err, DataFusionError, Result}; +use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_sqllogictest::{ df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file, should_skip_record, value_normalizer, DataFusion, DataFusionSubstraitRoundTrip, @@ -484,9 +484,7 @@ async fn run_complete_file( ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. - .map_err(|e| { - DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }); + .map_err(|e| exec_datafusion_err!("Error completing {relative_path:?}: {e}")); pb.finish_and_clear(); @@ -536,9 +534,7 @@ async fn run_complete_file_with_postgres( ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. - .map_err(|e| { - DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }); + .map_err(|e| exec_datafusion_err!("Error completing {relative_path:?}: {e}")); pb.finish_and_clear(); diff --git a/datafusion/sqllogictest/regenerate/sqllogictests.rs b/datafusion/sqllogictest/regenerate/sqllogictests.rs index edad16bc84b1..a50c4ae1cb7b 100644 --- a/datafusion/sqllogictest/regenerate/sqllogictests.rs +++ b/datafusion/sqllogictest/regenerate/sqllogictests.rs @@ -497,7 +497,7 @@ async fn run_complete_file_with_postgres( .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { - DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) + exec_datafusion_err!("Failed to complete test file {relative_path:?}: {e}") }); pb.finish_and_clear(); diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 05e1f284c560..87108b67424b 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -22,7 +22,7 @@ use arrow::array::{Array, AsArray}; use arrow::datatypes::{Fields, Schema}; use arrow::util::display::ArrayFormatter; use arrow::{array, array::ArrayRef, datatypes::DataType, record_batch::RecordBatch}; -use datafusion::common::DataFusionError; +use datafusion::common::internal_datafusion_err; use datafusion::config::ConfigField; use std::path::PathBuf; use std::sync::LazyLock; @@ -37,12 +37,10 @@ pub fn convert_batches( for batch in batches { // Verify schema if !schema.contains(&batch.schema()) { - return Err(DFSqlLogicTestError::DataFusion(DataFusionError::Internal( - format!( - "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", - &schema, - batch.schema() - ), + return Err(DFSqlLogicTestError::DataFusion(internal_datafusion_err!( + "Schema mismatch. Previously had\n{:#?}\n\nGot:\n{:#?}", + &schema, + batch.schema() ))); }