Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use clap::ValueEnum;
use datafusion::arrow::array::{ArrayRef, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::exec_err;
use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
use datafusion::common::{exec_datafusion_err, exec_err};
use datafusion::error::Result;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
Expand Down Expand Up @@ -84,9 +84,7 @@ impl Command {
Self::Include(filename) => {
if let Some(filename) = filename {
let file = File::open(filename).map_err(|e| {
DataFusionError::Execution(format!(
"Error opening {filename:?} {e}"
))
exec_datafusion_err!("Error opening {filename:?} {e}")
})?;
exec_from_lines(ctx, &mut BufReader::new(file), print_options)
.await?;
Expand Down
5 changes: 1 addition & 4 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ pub fn get_gcs_object_store_builder(

fn get_bucket_name(url: &Url) -> Result<&str> {
url.host_str().ok_or_else(|| {
DataFusionError::Execution(format!(
"Not able to parse bucket name from url: {}",
url.as_str()
))
exec_datafusion_err!("Not able to parse bucket name from url: {}", url.as_str())
})
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow::datatypes::DataType;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{exec_err, internal_err, DataFusionError};
use datafusion::common::{exec_datafusion_err, exec_err, internal_err, DataFusionError};
use datafusion::error::Result;
use datafusion::execution::context::{
FunctionFactory, RegisterFunction, SessionContext, SessionState,
Expand Down Expand Up @@ -185,9 +185,7 @@ impl ScalarFunctionWrapper {
fn parse_placeholder_identifier(placeholder: &str) -> Result<usize> {
if let Some(value) = placeholder.strip_prefix('$') {
Ok(value.parse().map(|v: usize| v - 1).map_err(|e| {
DataFusionError::Execution(format!(
"Placeholder `{placeholder}` parsing error: {e}!"
))
exec_datafusion_err!("Placeholder `{placeholder}` parsing error: {e}!")
})?)
} else {
exec_err!("Placeholder should start with `$`!")
Expand Down
14 changes: 7 additions & 7 deletions datafusion-examples/examples/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::assert_batches_eq;
use datafusion::common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion::common::{assert_contains, Result};
use datafusion::common::{assert_contains, exec_datafusion_err, Result};
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
Expand Down Expand Up @@ -230,8 +230,8 @@ impl ScalarUDFImpl for JsonGetStr {
let key = match &args.args[0] {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(key))) => key,
_ => {
return Err(datafusion::error::DataFusionError::Execution(
"json_get_str first argument must be a string".to_string(),
return Err(exec_datafusion_err!(
"json_get_str first argument must be a string"
))
}
};
Expand All @@ -241,13 +241,13 @@ impl ScalarUDFImpl for JsonGetStr {
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
datafusion::error::DataFusionError::Execution(
"json_get_str second argument must be a string array".to_string(),
exec_datafusion_err!(
"json_get_str second argument must be a string array"
)
})?,
_ => {
return Err(datafusion::error::DataFusionError::Execution(
"json_get_str second argument must be a string array".to_string(),
return Err(exec_datafusion_err!(
"json_get_str second argument must be a string array"
))
}
};
Expand Down
11 changes: 4 additions & 7 deletions datafusion-examples/examples/memory_pool_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
use arrow::record_batch::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::common::record_batch;
use datafusion::common::{exec_datafusion_err, internal_err};
use datafusion::datasource::{memory::MemTable, DefaultTableSource};
use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
Expand Down Expand Up @@ -247,9 +248,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
children[0].clone(),
)))
} else {
Err(DataFusionError::Internal(
"BufferingExecutionPlan must have exactly one child".to_string(),
))
internal_err!("BufferingExecutionPlan must have exactly one child")
}
}

Expand Down Expand Up @@ -289,9 +288,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
// In a real implementation, you would create a batch stream from the processed results
record_batch!(("id", Int32, vec![5]), ("name", Utf8, vec!["Eve"]))
.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to create final RecordBatch: {e}",
))
exec_datafusion_err!("Failed to create final RecordBatch: {e}")
})
}),
)))
Expand Down
18 changes: 8 additions & 10 deletions datafusion/catalog/src/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use std::sync::{Arc, Mutex};
use crate::{SchemaProvider, TableProvider, TableProviderFactory};

use crate::Session;
use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
use datafusion_common::{
internal_datafusion_err, DFSchema, DataFusionError, HashMap, TableReference,
};
use datafusion_expr::CreateExternalTable;

use async_trait::async_trait;
Expand Down Expand Up @@ -109,17 +111,13 @@ impl ListingSchemaProvider {
let file_name = table
.path
.file_name()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?
.ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?
.to_str()
.ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
.ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;
let table_name = file_name.split('.').collect_vec()[0];
let table_path = table.to_string().ok_or_else(|| {
DataFusionError::Internal("Cannot parse file name!".to_string())
})?;
let table_path = table
.to_string()
.ok_or_else(|| internal_datafusion_err!("Cannot parse file name!"))?;

if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
Expand Down
9 changes: 4 additions & 5 deletions datafusion/common/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

//! This module provides a function to estimate the memory size of a HashTable prior to allocation

use crate::{DataFusionError, Result};
use crate::error::_exec_datafusion_err;
use crate::Result;
use std::mem::size_of;

/// Estimates the memory size required for a hash table prior to allocation.
Expand All @@ -36,7 +37,7 @@ use std::mem::size_of;
/// buckets.
/// - One byte overhead for each bucket.
/// - The fixed size overhead of the collection.
/// - If the estimation overflows, we return a [`DataFusionError`]
/// - If the estimation overflows, we return a [`crate::error::DataFusionError`]
///
/// # Examples
/// ---
Expand Down Expand Up @@ -94,9 +95,7 @@ pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result
.checked_add(fixed_size)
})
.ok_or_else(|| {
DataFusionError::Execution(
"usize overflow while estimating the number of buckets".to_string(),
)
_exec_datafusion_err!("usize overflow while estimating the number of buckets")
})
}

Expand Down
18 changes: 6 additions & 12 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub mod memory;
pub mod proxy;
pub mod string_utils;

use crate::error::{_exec_datafusion_err, _internal_err};
use crate::{DataFusionError, Result, ScalarValue};
use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
use crate::{Result, ScalarValue};
use arrow::array::{
cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
OffsetSizeTrait,
Expand Down Expand Up @@ -147,9 +147,7 @@ pub fn bisect<const SIDE: bool>(
let low: usize = 0;
let high: usize = item_columns
.first()
.ok_or_else(|| {
DataFusionError::Internal("Column array shouldn't be empty".to_string())
})?
.ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
.len();
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, sort_options)?;
Expand Down Expand Up @@ -198,9 +196,7 @@ pub fn linear_search<const SIDE: bool>(
let low: usize = 0;
let high: usize = item_columns
.first()
.ok_or_else(|| {
DataFusionError::Internal("Column array shouldn't be empty".to_string())
})?
.ok_or_else(|| _internal_datafusion_err!("Column array shouldn't be empty"))?
.len();
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| {
let cmp = compare_rows(current, target, sort_options)?;
Expand Down Expand Up @@ -365,9 +361,7 @@ pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
.map(|idx| items.get(*idx.borrow()).cloned())
.collect::<Option<Vec<T>>>()
.ok_or_else(|| {
DataFusionError::Execution(
"Expects indices to be in the range of searched vector".to_string(),
)
_exec_datafusion_err!("Expects indices to be in the range of searched vector")
})
}

Expand Down Expand Up @@ -808,7 +802,7 @@ pub fn find_indices<T: PartialEq, S: Borrow<T>>(
.into_iter()
.map(|target| items.iter().position(|e| target.borrow().eq(e)))
.collect::<Option<_>>()
.ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
.ok_or_else(|| _exec_datafusion_err!("Target not found"))
}

/// Transposes the given vector of vectors.
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
DataFusionError, ParamValues, ScalarValue, SchemaError, TableReference,
UnnestOptions,
exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError,
TableReference, UnnestOptions,
};
use datafusion_expr::select_expr::SelectExpr;
use datafusion_expr::{
Expand Down Expand Up @@ -1347,9 +1347,9 @@ impl DataFrame {
.and_then(|r| r.columns().first())
.and_then(|c| c.as_any().downcast_ref::<Int64Array>())
.and_then(|a| a.values().first())
.ok_or(DataFusionError::Internal(
"Unexpected output when collecting for count()".to_string(),
))? as usize;
.ok_or_else(|| {
internal_datafusion_err!("Unexpected output when collecting for count()")
})? as usize;
Ok(len)
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ use arrow::ipc::{root_as_message, CompressionType};
use datafusion_catalog::Session;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
internal_datafusion_err, not_impl_err, DataFusionError, GetExt, Statistics,
DEFAULT_ARROW_EXTENSION,
};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
Expand Down Expand Up @@ -128,8 +129,8 @@ impl FileFormat for ArrowFormat {
let ext = self.get_ext();
match file_compression_type.get_variant() {
CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"Arrow FileFormat does not support compression.".into(),
_ => Err(internal_datafusion_err!(
"Arrow FileFormat does not support compression."
)),
}
}
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::{
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
config_datafusion_err, config_err, internal_datafusion_err, internal_err, plan_err,
project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
};
use datafusion_datasource::{
compute_all_files_statistics,
Expand Down Expand Up @@ -249,7 +249,7 @@ impl ListingTableConfig {
.await?
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
.ok_or_else(|| internal_datafusion_err!("No files for table"))??;

let (file_extension, maybe_compression_type) =
ListingTableConfig::infer_file_extension_and_compression_type(
Expand Down Expand Up @@ -984,11 +984,11 @@ impl ListingTable {

let file_schema = config
.file_schema
.ok_or_else(|| DataFusionError::Internal("No schema provided.".into()))?;
.ok_or_else(|| internal_datafusion_err!("No schema provided."))?;

let options = config.options.ok_or_else(|| {
DataFusionError::Internal("No ListingOptions provided".into())
})?;
let options = config
.options
.ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;

// Add the partition columns to the file schema
let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
Expand Down
Loading