Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ resolver = "2"

[workspace.package]
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
edition = "2021"
edition = "2024"
homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
Expand Down Expand Up @@ -192,6 +192,8 @@ or_fun_call = "warn"
unnecessary_lazy_evaluations = "warn"
uninlined_format_args = "warn"
inefficient_to_string = "warn"
collapsible_if = "allow"
let_and_return = "allow"

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
Expand Down
3 changes: 3 additions & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ insta-cmd = "0.6.0"
rstest = { workspace = true }
testcontainers = { workspace = true }
testcontainers-modules = { workspace = true, features = ["minio"] }

[lints.clippy]
collapsible_if = "allow"
26 changes: 16 additions & 10 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,12 @@ mod tests {
} else {
"/home/user"
};
env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
test_home_path,
);
unsafe {
env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
test_home_path,
)
};
let input = "~/Code/datafusion/benchmarks/data/tpch_sf1/part/part-0.parquet";
let expected = PathBuf::from(test_home_path)
.join("Code")
Expand All @@ -376,12 +378,16 @@ mod tests {
.to_string();
let actual = substitute_tilde(input.to_string());
assert_eq!(actual, expected);
match original_home {
Some(home_path) => env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
home_path.to_str().unwrap(),
),
None => env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }),
unsafe {
match original_home {
Some(home_path) => env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
home_path.to_str().unwrap(),
),
None => {
env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" })
}
}
}
}
}
8 changes: 5 additions & 3 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,10 @@ mod tests {

let location = "s3://bucket/path/FAKE/file.parquet";
// Set it to a non-existent file to avoid reading the default configuration file
std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
unsafe {
std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials");
}

// No options
let table_url = ListingTableUrl::parse(location)?;
Expand Down Expand Up @@ -746,7 +748,7 @@ mod tests {
let expected_region = "eu-central-1";
let location = "s3://test-bucket/path/file.parquet";
// Set it to a non-existent file to avoid reading the default configuration file
std::env::set_var("AWS_CONFIG_FILE", "data/aws.config");
unsafe { std::env::set_var("AWS_CONFIG_FILE", "data/aws.config") };

let table_url = ListingTableUrl::parse(location)?;
let aws_options = AwsOptions {
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)), _)) = exprs.first()
else {
let Some(Expr::Literal(ScalarValue::Utf8(Some(path)), _)) = exprs.first() else {
return plan_err!("read_csv requires at least one string argument");
};

Expand Down
13 changes: 4 additions & 9 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use object_store::{ObjectMeta, ObjectStore};
pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| match expr {
Expr::Column(Column { ref name, .. }) => {
Expr::Column(Column { name, .. }) => {
is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
Expand Down Expand Up @@ -249,16 +249,11 @@ fn populate_partition_values<'a>(
partition_values: &mut HashMap<&'a str, PartitionValue>,
filter: &'a Expr,
) {
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = filter
{
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter {
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { ref name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { ref name, .. })) => {
(Expr::Column(Column { name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/rounding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ extern crate libc;
any(target_arch = "x86_64", target_arch = "aarch64"),
not(target_os = "windows")
))]
extern "C" {
unsafe extern "C" {
fn fesetround(round: i32);
fn fegetround() -> i32;
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/common/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,26 +735,26 @@ mod tests {
let non_existing = cwd.join("non-existing-dir").display().to_string();
let non_existing_str = non_existing.as_str();

env::set_var(udf_env, non_existing_str);
unsafe { env::set_var(udf_env, non_existing_str) };
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_err());

env::set_var(udf_env, "");
unsafe { env::set_var(udf_env, "") };
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, " ");
unsafe { env::set_var(udf_env, " ") };
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::set_var(udf_env, existing_str);
unsafe { env::set_var(udf_env, existing_str) };
let res = get_data_dir(udf_env, existing_str);
assert!(res.is_ok());
assert_eq!(res.unwrap(), existing_pb);

env::remove_var(udf_env);
unsafe { env::remove_var(udf_env) };
let res = get_data_dir(udf_env, non_existing_str);
assert!(res.is_err());

Expand Down
58 changes: 29 additions & 29 deletions datafusion/core/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,14 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_low, utf8_low, utf8_high)
fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_high_cardinality_values())
.collect();

if sorted {
Expand All @@ -388,14 +388,14 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_high_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_high_cardinality_values())
.collect();

if sorted {
Expand All @@ -421,15 +421,15 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (f64, utf8_low, utf8_low, i64)
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down Expand Up @@ -459,15 +459,15 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut data_gen = DataGenerator::new();

// need to sort by the combined key, so combine them together
let mut tuples: Vec<_> = gen
let mut tuples: Vec<_> = data_gen
.i64_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down Expand Up @@ -497,8 +497,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict)
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut values = gen.utf8_low_cardinality_values();
let mut data_gen = DataGenerator::new();
let mut values = data_gen.utf8_low_cardinality_values();
if sorted {
values.sort_unstable();
}
Expand All @@ -512,12 +512,12 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
let mut data_gen = DataGenerator::new();
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.collect();

if sorted {
Expand All @@ -543,13 +543,13 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {

/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
let mut gen = DataGenerator::new();
let mut tuples: Vec<_> = gen
let mut data_gen = DataGenerator::new();
let mut tuples: Vec<_> = data_gen
.utf8_low_cardinality_values()
.into_iter()
.zip(gen.utf8_low_cardinality_values())
.zip(gen.utf8_low_cardinality_values())
.zip(gen.i64_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.utf8_low_cardinality_values())
.zip(data_gen.i64_values())
.collect();

if sorted {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ mod tests {
) -> Result<()> {
let schema = csv_schema();
let generator = CsvBatchGenerator::new(batch_size, line_count);
let mut deserializer = csv_deserializer(batch_size, &schema);
let mut deserializer = csv_deserializer(batch_size, &schema.clone());

for data in generator {
deserializer.digest(data);
Expand Down Expand Up @@ -1346,7 +1346,7 @@ mod tests {
fn csv_deserializer(
batch_size: usize,
schema: &Arc<Schema>,
) -> impl BatchDeserializer<Bytes> {
) -> impl BatchDeserializer<Bytes> + use<> {
let decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.build_decoder();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ impl SessionContext {
match ddl {
DdlStatement::CreateExternalTable(cmd) => {
(Box::pin(async move { self.create_external_table(&cmd).await })
as std::pin::Pin<Box<dyn futures::Future<Output = _> + Send>>)
as std::pin::Pin<Box<dyn Future<Output = _> + Send>>)
.await
}
DdlStatement::CreateMemoryTable(cmd) => {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ impl DefaultPhysicalPlanner {
let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction(window_fun) => {
let WindowFunctionParams {
ref partition_by,
ref order_by,
partition_by,
order_by,
..
} = &window_fun.as_ref().params;
generate_sort_key(partition_by, order_by)
Expand All @@ -612,8 +612,8 @@ impl DefaultPhysicalPlanner {
match &**expr {
Expr::WindowFunction(window_fun) => {
let WindowFunctionParams {
ref partition_by,
ref order_by,
partition_by,
order_by,
..
} = &window_fun.as_ref().params;
generate_sort_key(partition_by, order_by)
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,31 @@ fn from_env() {
let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS";
// valid testing in different cases
for bool_option in ["true", "TRUE", "True", "tRUe"] {
env::set_var(env_key, bool_option);
unsafe { env::set_var(env_key, bool_option) };
let config = ConfigOptions::from_env().unwrap();
env::remove_var(env_key);
unsafe { env::remove_var(env_key) };
assert!(config.optimizer.filter_null_join_keys);
}

// invalid testing
env::set_var(env_key, "ttruee");
unsafe { env::set_var(env_key, "ttruee") };
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing 'ttruee' as bool\ncaused by\nExternal error: provided string was not `true` or `false`");
env::remove_var(env_key);
unsafe { env::remove_var(env_key) };

let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

// for valid testing
env::set_var(env_key, "4096");
unsafe { env::set_var(env_key, "4096") };
let config = ConfigOptions::from_env().unwrap();
assert_eq!(config.execution.batch_size, 4096);

// for invalid testing
env::set_var(env_key, "abc");
unsafe { env::set_var(env_key, "abc") };
let err = ConfigOptions::from_env().unwrap_err().strip_backtrace();
assert_eq!(err, "Error parsing 'abc' as usize\ncaused by\nExternal error: invalid digit found in string");

env::remove_var(env_key);
unsafe { env::remove_var(env_key) };
let config = ConfigOptions::from_env().unwrap();
assert_eq!(config.execution.batch_size, 8192); // set to its default value
}
4 changes: 2 additions & 2 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn make_lazy_exec_with_range(
};

// Instantiate the generator with the batch and limit
let gen = RangeBatchGenerator {
let batch_gen = RangeBatchGenerator {
schema: Arc::clone(&schema),
boundedness,
value_range: range,
Expand All @@ -145,7 +145,7 @@ fn make_lazy_exec_with_range(
};

// Wrap the generator in a trait object behind Arc<RwLock<_>>
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = Arc::new(RwLock::new(gen));
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = Arc::new(RwLock::new(batch_gen));

// Create a LazyMemoryExec with one partition using our generator
let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap();
Expand Down
Loading
Loading