Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ macro_rules! config_namespace {
$(
stringify!($field_name) => self.$field_name.set(rem, value),
)*
_ => Err(DataFusionError::Internal(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
_ => _internal_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}

Expand Down
21 changes: 8 additions & 13 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ macro_rules! primitive_right {
};
($TERM:expr, /, $SCALAR:ident) => {
internal_err!(
"Can not divide an uninitialized value to a non-floating point value",
"Can not divide an uninitialized value to a non-floating point value"
)
};
($TERM:expr, &, $SCALAR:ident) => {
Expand All @@ -722,11 +722,10 @@ macro_rules! primitive_right {

macro_rules! unsigned_subtraction_error {
($SCALAR:expr) => {{
let msg = format!(
_internal_err!(
"Can not subtract a {} value from an uninitialized value",
$SCALAR
);
Err(DataFusionError::Internal(msg))
)
}};
}

Expand Down Expand Up @@ -1404,9 +1403,7 @@ where
DT_MODE => add_day_time(prior, interval as i64, sign),
MDN_MODE => add_m_d_nano(prior, interval, sign),
_ => {
return Err(DataFusionError::Internal(
"Undefined interval mode for interval calculations".to_string(),
));
return _internal_err!("Undefined interval mode for interval calculations");
}
})
}
Expand Down Expand Up @@ -2241,9 +2238,9 @@ impl ScalarValue {
// figure out the type based on the first element
let data_type = match scalars.peek() {
None => {
return Err(DataFusionError::Internal(
"Empty iterator passed to ScalarValue::iter_to_array".to_string(),
));
return _internal_err!(
"Empty iterator passed to ScalarValue::iter_to_array"
);
}
Some(sv) => sv.get_datatype(),
};
Expand Down Expand Up @@ -3062,9 +3059,7 @@ impl ScalarValue {
Ok(ScalarValue::Decimal256(Some(value), precision, scale))
}
}
_ => Err(DataFusionError::Internal(
"Unsupported decimal type".to_string(),
)),
_ => _internal_err!("Unsupported decimal type"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use crate::datasource::TableProvider;

use arrow::datatypes::SchemaRef;
use datafusion_common::{Constraints, DataFusionError};
use datafusion_common::{internal_err, Constraints, DataFusionError};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};

/// DataFusion default table source, wrapping TableProvider
Expand Down Expand Up @@ -91,8 +91,6 @@ pub fn source_as_provider(
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
_ => internal_err!("TableSource was not DefaultTableSource"),
}
}
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ mod tests {
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -972,9 +973,7 @@ mod tests {
}
}

Err(DataFusionError::Internal(
"query contains no CsvExec".to_string(),
))
internal_err!("query contains no CsvExec")
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! File type abstraction

use crate::error::{DataFusionError, Result};

use crate::common::internal_err;
use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION;
use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::error::{DataFusionError, Result};
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
Expand Down Expand Up @@ -291,9 +291,9 @@ impl FileType {
FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())),
FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant {
UNCOMPRESSED => Ok(ext),
_ => Err(DataFusionError::Internal(
"FileCompressionType can be specified for CSV/JSON FileType.".into(),
)),
_ => internal_err!(
"FileCompressionType can be specified for CSV/JSON FileType."
),
},
}
}
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;

use arrow_array::RecordBatch;
use datafusion_common::internal_err;
use datafusion_common::DataFusionError;

use async_trait::async_trait;
Expand Down Expand Up @@ -331,13 +332,11 @@ pub(crate) async fn stateless_serialize_and_write_files(
per_thread_output: bool,
) -> Result<u64> {
if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) {
return Err(DataFusionError::Internal(
"per_thread_output is false, but got more than 1 writer!".into(),
));
return internal_err!("per_thread_output is false, but got more than 1 writer!");
}
let num_partitions = data.len();
if per_thread_output && (num_partitions != writers.len()) {
return Err(DataFusionError::Internal("per_thread_output is true, but did not get 1 writer for each output partition!".into()));
return internal_err!("per_thread_output is true, but did not get 1 writer for each output partition!");
}
let mut row_count = 0;
// Map errors to DatafusionError.
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::{plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_common::{internal_err, plan_err, project_schema, SchemaExt, ToDFSchema};
use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, LexOrdering, PhysicalSortExpr};
Expand Down Expand Up @@ -195,9 +195,7 @@ impl ListingTableConfig {
options: Some(options),
})
}
None => Err(DataFusionError::Internal(
"No `ListingOptions` set for inferring schema".into(),
)),
None => internal_err!("No `ListingOptions` set for inferring schema"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/datasource/physical_plan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ impl<F: FileOpener> RecordBatchStream for FileStream<F> {
#[cfg(test)]
mod tests {
use arrow_schema::Schema;
use datafusion_common::internal_err;
use datafusion_common::DataFusionError;

use super::*;
Expand Down Expand Up @@ -557,10 +558,7 @@ mod tests {
let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);

if self.error_opening_idx.contains(&idx) {
Ok(futures::future::ready(Err(DataFusionError::Internal(
"error opening".to_owned(),
)))
.boxed())
Ok(futures::future::ready(internal_err!("error opening")).boxed())
} else if self.error_scanning_idx.contains(&idx) {
let error = futures::future::ready(Err(ArrowError::IoError(
"error scanning".to_owned(),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,9 @@ fn swap_join_according_to_unboundedness(
(PartitionMode::CollectLeft, _) => {
swap_hash_join(hash_join, PartitionMode::CollectLeft)
}
(PartitionMode::Auto, _) => Err(DataFusionError::Internal(
"Auto is not acceptable for unbounded input here.".to_string(),
)),
(PartitionMode::Auto, _) => {
internal_err!("Auto is not acceptable for unbounded input here.")
}
}
}

Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ impl ExecutionPlan for AnalyzeExec {
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
Err(DataFusionError::Internal(
"Optimization not supported for ANALYZE".to_string(),
))
internal_err!("Optimization not supported for ANALYZE")
}

/// Get the output partitioning of this plan
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl ExecutionPlan for CoalescePartitionsExec {

let input_partitions = self.input.output_partitioning().partition_count();
match input_partitions {
0 => Err(DataFusionError::Internal(
"CoalescePartitionsExec requires at least one input partition".to_owned(),
)),
0 => internal_err!(
"CoalescePartitionsExec requires at least one input partition"
),
1 => {
// bypass any threading / metrics if there is a single partition
self.input.execute(0, context)
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_common::DataFusionError;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_execution::TaskContext;

/// `DataSink` implements writing streams of [`RecordBatch`]es to
Expand Down Expand Up @@ -232,9 +232,7 @@ impl ExecutionPlan for FileSinkExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
"FileSinkExec can only be called on partition 0!".into(),
));
return internal_err!("FileSinkExec can only be called on partition 0!");
}
let data = self.execute_all_input_streams(context.clone())?;

Expand Down
30 changes: 15 additions & 15 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,9 @@ pub fn equal_rows(
equal_rows_elem!(Time32MillisecondArray, l, r, left, right, null_equals_null)
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand All @@ -980,9 +980,9 @@ pub fn equal_rows(
equal_rows_elem!(Time64NanosecondArray, l, r, left, right, null_equals_null)
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand Down Expand Up @@ -1049,16 +1049,16 @@ pub fn equal_rows(
null_equals_null
)
} else {
err = Some(Err(DataFusionError::Internal(
"Inconsistent Decimal data type in hasher, the scale should be same".to_string(),
)));
err = Some(internal_err!(
"Inconsistent Decimal data type in hasher, the scale should be same"
));
false
}
}
_ => {
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
},
Expand Down Expand Up @@ -1148,9 +1148,9 @@ pub fn equal_rows(
}
_ => {
// should not happen
err = Some(Err(DataFusionError::Internal(
"Unsupported data type in hasher".to_string(),
)));
err = Some(internal_err!(
"Unsupported data type in hasher"
));
false
}
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ impl ExecutionPlan for SortMergeJoinExec {
self.sort_options.clone(),
self.null_equals_null,
)?)),
_ => Err(DataFusionError::Internal(
"SortMergeJoin wrong number of children".to_string(),
)),
_ => internal_err!("SortMergeJoin wrong number of children"),
}
}

Expand Down
8 changes: 2 additions & 6 deletions datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ impl ExecutionPlan for GlobalLimitExec {

// GlobalLimitExec requires a single input partition
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"GlobalLimitExec requires a single input partition".to_owned(),
));
return internal_err!("GlobalLimitExec requires a single input partition");
}

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Expand Down Expand Up @@ -331,9 +329,7 @@ impl ExecutionPlan for LocalLimitExec {
children[0].clone(),
self.fetch,
))),
_ => Err(DataFusionError::Internal(
"LocalLimitExec wrong number of children".to_string(),
)),
_ => internal_err!("LocalLimitExec wrong number of children"),
}
}

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use self::{
use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{ColumnStatistics, Statistics};
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -232,9 +232,7 @@ pub fn with_new_children_if_necessary(
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
let old_children = plan.children();
if children.len() != old_children.len() {
Err(DataFusionError::Internal(
"Wrong number of children".to_string(),
))
internal_err!("Wrong number of children")
} else if children.is_empty()
|| children
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
.register(&context.runtime_env().memory_pool);

match input_partitions {
0 => Err(DataFusionError::Internal(
0 => internal_err!(
"SortPreservingMergeExec requires at least one input partition"
.to_owned(),
)),
),
1 => {
// bypass if there is only one partition to merge (no metrics in this case either)
let result = self.input.execute(0, context);
Expand Down
Loading