diff --git a/Cargo.toml b/Cargo.toml index e5acbd20224a..48a863b79f47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ resolver = "2" [workspace.package] authors = ["Apache DataFusion "] -edition = "2021" +edition = "2024" homepage = "https://datafusion.apache.org" license = "Apache-2.0" readme = "README.md" @@ -192,6 +192,8 @@ or_fun_call = "warn" unnecessary_lazy_evaluations = "warn" uninlined_format_args = "warn" inefficient_to_string = "warn" +collapsible_if = "allow" +let_and_return = "allow" [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = [ diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index f3069b492352..397ab4ca51fa 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -76,3 +76,6 @@ insta-cmd = "0.6.0" rstest = { workspace = true } testcontainers = { workspace = true } testcontainers-modules = { workspace = true, features = ["minio"] } + +[lints.clippy] +collapsible_if = "allow" diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 20d62eabc390..2f8475ad43d6 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -359,10 +359,12 @@ mod tests { } else { "/home/user" }; - env::set_var( - if cfg!(windows) { "USERPROFILE" } else { "HOME" }, - test_home_path, - ); + unsafe { + env::set_var( + if cfg!(windows) { "USERPROFILE" } else { "HOME" }, + test_home_path, + ) + }; let input = "~/Code/datafusion/benchmarks/data/tpch_sf1/part/part-0.parquet"; let expected = PathBuf::from(test_home_path) .join("Code") @@ -376,12 +378,16 @@ mod tests { .to_string(); let actual = substitute_tilde(input.to_string()); assert_eq!(actual, expected); - match original_home { - Some(home_path) => env::set_var( - if cfg!(windows) { "USERPROFILE" } else { "HOME" }, - home_path.to_str().unwrap(), - ), - None => env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }), + unsafe { + match original_home { + Some(home_path) => env::set_var( + if cfg!(windows) { "USERPROFILE" } else { "HOME" }, + home_path.to_str().unwrap(), + ), + None => { + env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }) + } + } } } } diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index e6e6be42c7ad..ac0308092e16 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -586,8 +586,10 @@ mod tests { let location = "s3://bucket/path/FAKE/file.parquet"; // Set it to a non-existent file to avoid reading the default configuration file - std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); - std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials"); + unsafe { + std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); + std::env::set_var("AWS_SHARED_CREDENTIALS_FILE", "data/aws.credentials"); + } // No options let table_url = ListingTableUrl::parse(location)?; @@ -746,7 +748,7 @@ mod tests { let expected_region = "eu-central-1"; let location = "s3://test-bucket/path/file.parquet"; // Set it to a non-existent file to avoid reading the default configuration file - std::env::set_var("AWS_CONFIG_FILE", "data/aws.config"); + unsafe { std::env::set_var("AWS_CONFIG_FILE", "data/aws.config") }; let table_url = ListingTableUrl::parse(location)?; let aws_options = AwsOptions { diff --git a/datafusion-examples/examples/udf/simple_udtf.rs b/datafusion-examples/examples/udf/simple_udtf.rs index 12ee74fc52ee..afebbb3c74b1 100644 --- a/datafusion-examples/examples/udf/simple_udtf.rs +++ b/datafusion-examples/examples/udf/simple_udtf.rs @@ -134,8 +134,7 @@ struct LocalCsvTableFunc {} impl TableFunctionImpl for LocalCsvTableFunc { fn call(&self, exprs: &[Expr]) -> Result> { - let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)), _)) = exprs.first() - else { + let Some(Expr::Literal(ScalarValue::Utf8(Some(path)), _)) = exprs.first() else { return plan_err!("read_csv requires at least one string argument"); }; diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index 5e69cf1a1402..8eae778756e7 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -53,7 +53,7 @@ use object_store::{ObjectMeta, ObjectStore}; pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool { let mut is_applicable = true; expr.apply(|expr| match expr { - Expr::Column(Column { ref name, .. }) => { + Expr::Column(Column { name, .. }) => { is_applicable &= col_names.contains(&name.as_str()); if is_applicable { Ok(TreeNodeRecursion::Jump) @@ -249,16 +249,11 @@ fn populate_partition_values<'a>( partition_values: &mut HashMap<&'a str, PartitionValue>, filter: &'a Expr, ) { - if let Expr::BinaryExpr(BinaryExpr { - ref left, - op, - ref right, - }) = filter - { + if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = filter { match op { Operator::Eq => match (left.as_ref(), right.as_ref()) { - (Expr::Column(Column { ref name, .. }), Expr::Literal(val, _)) - | (Expr::Literal(val, _), Expr::Column(Column { ref name, .. })) => { + (Expr::Column(Column { name, .. }), Expr::Literal(val, _)) + | (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => { if partition_values .insert(name, PartitionValue::Single(val.to_string())) .is_some() diff --git a/datafusion/common/src/rounding.rs b/datafusion/common/src/rounding.rs index 95eefd3235b5..1796143d7cf1 100644 --- a/datafusion/common/src/rounding.rs +++ b/datafusion/common/src/rounding.rs @@ -47,7 +47,7 @@ extern crate libc; any(target_arch = "x86_64", target_arch = "aarch64"), not(target_os = "windows") ))] -extern "C" { +unsafe extern "C" { fn fesetround(round: i32); fn fegetround() -> i32; } diff --git a/datafusion/common/src/test_util.rs b/datafusion/common/src/test_util.rs index c51dea1c4de0..cebe6102e019 100644 --- a/datafusion/common/src/test_util.rs +++ b/datafusion/common/src/test_util.rs @@ -735,26 +735,26 @@ mod tests { let non_existing = cwd.join("non-existing-dir").display().to_string(); let non_existing_str = non_existing.as_str(); - env::set_var(udf_env, non_existing_str); + unsafe { env::set_var(udf_env, non_existing_str) }; let res = get_data_dir(udf_env, existing_str); assert!(res.is_err()); - env::set_var(udf_env, ""); + unsafe { env::set_var(udf_env, "") }; let res = get_data_dir(udf_env, existing_str); assert!(res.is_ok()); assert_eq!(res.unwrap(), existing_pb); - env::set_var(udf_env, " "); + unsafe { env::set_var(udf_env, " ") }; let res = get_data_dir(udf_env, existing_str); assert!(res.is_ok()); assert_eq!(res.unwrap(), existing_pb); - env::set_var(udf_env, existing_str); + unsafe { env::set_var(udf_env, existing_str) }; let res = get_data_dir(udf_env, existing_str); assert!(res.is_ok()); assert_eq!(res.unwrap(), existing_pb); - env::remove_var(udf_env); + unsafe { env::remove_var(udf_env) }; let res = get_data_dir(udf_env, non_existing_str); assert!(res.is_err()); diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 276151e253f7..04724a856a5b 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -355,14 +355,14 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (utf8_low, utf8_low, utf8_high) fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); + let mut data_gen = DataGenerator::new(); // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen + let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_high_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_high_cardinality_values()) .collect(); if sorted { @@ -388,14 +388,14 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high) fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); + let mut data_gen = DataGenerator::new(); // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen + let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_high_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_high_cardinality_values()) .collect(); if sorted { @@ -421,15 +421,15 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (f64, utf8_low, utf8_low, i64) fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); + let mut data_gen = DataGenerator::new(); // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen + let mut tuples: Vec<_> = data_gen .i64_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.i64_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.i64_values()) .collect(); if sorted { @@ -459,15 +459,15 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (f64, utf8_view_low, utf8_view_low, i64) fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); + let mut data_gen = DataGenerator::new(); // need to sort by the combined key, so combine them together - let mut tuples: Vec<_> = gen + let mut tuples: Vec<_> = data_gen .i64_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.i64_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.i64_values()) .collect(); if sorted { @@ -497,8 +497,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (utf8_dict) fn dictionary_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); - let mut values = gen.utf8_low_cardinality_values(); + let mut data_gen = DataGenerator::new(); + let mut values = data_gen.utf8_low_cardinality_values(); if sorted { values.sort_unstable(); } @@ -512,12 +512,12 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (utf8_dict, utf8_dict, utf8_dict) fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); - let mut tuples: Vec<_> = gen + let mut data_gen = DataGenerator::new(); + let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) .collect(); if sorted { @@ -543,13 +543,13 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { /// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64) fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches { - let mut gen = DataGenerator::new(); - let mut tuples: Vec<_> = gen + let mut data_gen = DataGenerator::new(); + let mut tuples: Vec<_> = data_gen .utf8_low_cardinality_values() .into_iter() - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.utf8_low_cardinality_values()) - .zip(gen.i64_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.utf8_low_cardinality_values()) + .zip(data_gen.i64_values()) .collect(); if sorted { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 52fb8ae904eb..ef409d0e74f7 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1230,7 +1230,7 @@ mod tests { ) -> Result<()> { let schema = csv_schema(); let generator = CsvBatchGenerator::new(batch_size, line_count); - let mut deserializer = csv_deserializer(batch_size, &schema); + let mut deserializer = csv_deserializer(batch_size, &schema.clone()); for data in generator { deserializer.digest(data); @@ -1346,7 +1346,7 @@ mod tests { fn csv_deserializer( batch_size: usize, schema: &Arc, - ) -> impl BatchDeserializer { + ) -> impl BatchDeserializer + use<> { let decoder = ReaderBuilder::new(schema.clone()) .with_batch_size(batch_size) .build_decoder(); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9c7339e6748e..be20e77af26b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -684,7 +684,7 @@ impl SessionContext { match ddl { DdlStatement::CreateExternalTable(cmd) => { (Box::pin(async move { self.create_external_table(&cmd).await }) - as std::pin::Pin + Send>>) + as std::pin::Pin + Send>>) .await } DdlStatement::CreateMemoryTable(cmd) => { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2ae5aed30df9..cb018589f5c8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -601,8 +601,8 @@ impl DefaultPhysicalPlanner { let get_sort_keys = |expr: &Expr| match expr { Expr::WindowFunction(window_fun) => { let WindowFunctionParams { - ref partition_by, - ref order_by, + partition_by, + order_by, .. } = &window_fun.as_ref().params; generate_sort_key(partition_by, order_by) @@ -612,8 +612,8 @@ impl DefaultPhysicalPlanner { match &**expr { Expr::WindowFunction(window_fun) => { let WindowFunctionParams { - ref partition_by, - ref order_by, + partition_by, + order_by, .. } = &window_fun.as_ref().params; generate_sort_key(partition_by, order_by) diff --git a/datafusion/core/tests/config_from_env.rs b/datafusion/core/tests/config_from_env.rs index 976597c8a9ac..a4824682f7bf 100644 --- a/datafusion/core/tests/config_from_env.rs +++ b/datafusion/core/tests/config_from_env.rs @@ -24,31 +24,31 @@ fn from_env() { let env_key = "DATAFUSION_OPTIMIZER_FILTER_NULL_JOIN_KEYS"; // valid testing in different cases for bool_option in ["true", "TRUE", "True", "tRUe"] { - env::set_var(env_key, bool_option); + unsafe { env::set_var(env_key, bool_option) }; let config = ConfigOptions::from_env().unwrap(); - env::remove_var(env_key); + unsafe { env::remove_var(env_key) }; assert!(config.optimizer.filter_null_join_keys); } // invalid testing - env::set_var(env_key, "ttruee"); + unsafe { env::set_var(env_key, "ttruee") }; let err = ConfigOptions::from_env().unwrap_err().strip_backtrace(); assert_eq!(err, "Error parsing 'ttruee' as bool\ncaused by\nExternal error: provided string was not `true` or `false`"); - env::remove_var(env_key); + unsafe { env::remove_var(env_key) }; let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE"; // for valid testing - env::set_var(env_key, "4096"); + unsafe { env::set_var(env_key, "4096") }; let config = ConfigOptions::from_env().unwrap(); assert_eq!(config.execution.batch_size, 4096); // for invalid testing - env::set_var(env_key, "abc"); + unsafe { env::set_var(env_key, "abc") }; let err = ConfigOptions::from_env().unwrap_err().strip_backtrace(); assert_eq!(err, "Error parsing 'abc' as usize\ncaused by\nExternal error: invalid digit found in string"); - env::remove_var(env_key); + unsafe { env::remove_var(env_key) }; let config = ConfigOptions::from_env().unwrap(); assert_eq!(config.execution.batch_size, 8192); // set to its default value } diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index b6f406e96750..cfc590361b92 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -136,7 +136,7 @@ fn make_lazy_exec_with_range( }; // Instantiate the generator with the batch and limit - let gen = RangeBatchGenerator { + let batch_gen = RangeBatchGenerator { schema: Arc::clone(&schema), boundedness, value_range: range, @@ -145,7 +145,7 @@ fn make_lazy_exec_with_range( }; // Wrap the generator in a trait object behind Arc> - let generator: Arc> = Arc::new(RwLock::new(gen)); + let generator: Arc> = Arc::new(RwLock::new(batch_gen)); // Create a LazyMemoryExec with one partition using our generator let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap(); diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs index aaf2d1b9bad4..20f8aaa8b5a0 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs @@ -209,8 +209,8 @@ mod test { sort_keys_set: vec![vec!["b".to_string()]], }; - let mut gen = DatasetGenerator::new(config); - let datasets = gen.generate().unwrap(); + let mut dataset_gen = DatasetGenerator::new(config); + let datasets = dataset_gen.generate().unwrap(); // Should Generate 2 datasets assert_eq!(datasets.len(), 2); diff --git a/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs index 7b157b707a6d..a9bc506f16f4 100644 --- a/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs +++ b/datafusion/core/tests/memory_limit/memory_limit_validation/utils.rs @@ -40,7 +40,7 @@ use datafusion_execution::{ async fn measure_max_rss(f: F) -> (T, usize) where F: FnOnce() -> Fut, - Fut: std::future::Future, + Fut: Future, { // Initialize system information let mut system = System::new_all(); diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 5d8a1d24181c..5ff27ca1d185 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -1057,7 +1057,7 @@ fn make_dict_batches() -> Vec { let batch_size = 50; let mut i = 0; - let gen = std::iter::from_fn(move || { + let batch_gen = std::iter::from_fn(move || { // create values like // 0000000001 // 0000000002 @@ -1080,7 +1080,7 @@ fn make_dict_batches() -> Vec { let num_batches = 5; - let batches: Vec<_> = gen.take(num_batches).collect(); + let batches: Vec<_> = batch_gen.take(num_batches).collect(); batches.iter().enumerate().for_each(|(i, batch)| { println!("Dict batch[{i}] size is: {}", batch.get_array_memory_size()); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 426ec213b324..99aa0de84f35 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -335,8 +335,7 @@ async fn nyc() -> Result<()> { match &optimized_plan { LogicalPlan::Aggregate(Aggregate { input, .. }) => match input.as_ref() { LogicalPlan::TableScan(TableScan { - ref projected_schema, - .. + projected_schema, .. }) => { assert_eq!(2, projected_schema.fields().len()); assert_eq!(projected_schema.field(0).name(), "passenger_count"); diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 2d0e02719c21..52858ec92898 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -515,12 +515,7 @@ impl OptimizerRule for TopKOptimizerRule { return Ok(Transformed::no(plan)); }; - if let LogicalPlan::Sort(Sort { - ref expr, - ref input, - .. - }) = limit.input.as_ref() - { + if let LogicalPlan::Sort(Sort { expr, input, .. }) = limit.input.as_ref() { if expr.len() == 1 { // we found a sort with a single sort expr, replace with a a TopK return Ok(Transformed::yes(LogicalPlan::Extension(Extension { diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 2c6611f382ce..d7c6b8a6b849 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -205,7 +205,7 @@ impl TableFunctionImpl for SimpleCsvTableFunc { let mut filepath = String::new(); for expr in exprs { match expr { - Expr::Literal(ScalarValue::Utf8(Some(ref path)), _) => { + Expr::Literal(ScalarValue::Utf8(Some(path)), _) => { filepath.clone_from(path); } expr => new_exprs.push(expr.clone()), diff --git a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs index 5b1f534ad78b..9fb52e5e90b5 100644 --- a/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs @@ -753,7 +753,7 @@ impl AvroArrowArrayReader<'_, R> { .collect::(), ) as ArrayRef, - DataType::FixedSizeBinary(ref size) => { + DataType::FixedSizeBinary(size) => { Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size( rows.iter().map(|row| { let maybe_value = self.field_lookup(&field_path, row); @@ -762,9 +762,9 @@ impl AvroArrowArrayReader<'_, R> { *size, )?) as ArrayRef } - DataType::List(ref list_field) => { + DataType::List(list_field) => { match list_field.data_type() { - DataType::Dictionary(ref key_ty, _) => { + DataType::Dictionary(key_ty, _) => { self.build_wrapped_list_array(rows, &field_path, key_ty)? } _ => { @@ -784,7 +784,7 @@ impl AvroArrowArrayReader<'_, R> { } } } - DataType::Dictionary(ref key_ty, ref val_ty) => self + DataType::Dictionary(key_ty, val_ty) => self .build_string_dictionary_array( rows, &field_path, diff --git a/datafusion/datasource-avro/src/avro_to_arrow/schema.rs b/datafusion/datasource-avro/src/avro_to_arrow/schema.rs index 3fce0d4826a2..ad5545a86ba1 100644 --- a/datafusion/datasource-avro/src/avro_to_arrow/schema.rs +++ b/datafusion/datasource-avro/src/avro_to_arrow/schema.rs @@ -248,15 +248,9 @@ fn default_field_name(dt: &DataType) -> &str { fn external_props(schema: &AvroSchema) -> HashMap { let mut props = HashMap::new(); match &schema { - AvroSchema::Record(RecordSchema { - doc: Some(ref doc), .. - }) - | AvroSchema::Enum(EnumSchema { - doc: Some(ref doc), .. - }) - | AvroSchema::Fixed(FixedSchema { - doc: Some(ref doc), .. - }) => { + AvroSchema::Record(RecordSchema { doc: Some(doc), .. }) + | AvroSchema::Enum(EnumSchema { doc: Some(doc), .. }) + | AvroSchema::Fixed(FixedSchema { doc: Some(doc), .. }) => { props.insert("avro::doc".to_string(), doc.clone()); } _ => {} diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 8f8720941fad..1feb820dd650 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -205,11 +205,7 @@ impl ExprSchemable for Expr { Expr::ScalarSubquery(subquery) => { Ok(subquery.subquery.schema().field(0).data_type().clone()) } - Expr::BinaryExpr(BinaryExpr { - ref left, - ref right, - ref op, - }) => BinaryTypeCoercer::new( + Expr::BinaryExpr(BinaryExpr { left, right, op }) => BinaryTypeCoercer::new( &left.get_type(schema)?, op, &right.get_type(schema)?, @@ -403,11 +399,9 @@ impl ExprSchemable for Expr { Expr::ScalarSubquery(subquery) => { Ok(subquery.subquery.schema().field(0).is_nullable()) } - Expr::BinaryExpr(BinaryExpr { - ref left, - ref right, - .. - }) => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?), + Expr::BinaryExpr(BinaryExpr { left, right, .. }) => { + Ok(left.nullable(input_schema)? || right.nullable(input_schema)?) + } Expr::Like(Like { expr, pattern, .. }) | Expr::SimilarTo(Like { expr, pattern, .. }) => { Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?) @@ -547,11 +541,7 @@ impl ExprSchemable for Expr { Expr::ScalarSubquery(subquery) => { Ok(Arc::clone(&subquery.subquery.schema().fields()[0])) } - Expr::BinaryExpr(BinaryExpr { - ref left, - ref right, - ref op, - }) => { + Expr::BinaryExpr(BinaryExpr { left, right, op }) => { let (lhs_type, lhs_nullable) = left.data_type_and_nullable(schema)?; let (rhs_type, rhs_nullable) = right.data_type_and_nullable(schema)?; let mut coercer = BinaryTypeCoercer::new(&lhs_type, op, &rhs_type); diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 74fe7a2d009d..172366746dea 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -132,7 +132,7 @@ impl DdlStatement { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.0 { DdlStatement::CreateExternalTable(CreateExternalTable { - ref name, + name, constraints, .. }) => { diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index b60126335598..3be6b44ae21a 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -319,7 +319,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Is Distinct": is_distinct, }) } - LogicalPlan::Values(Values { ref values, .. }) => { + LogicalPlan::Values(Values { values, .. }) => { let str_values = values .iter() // limit to only 5 values to avoid horrible display @@ -344,10 +344,10 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { }) } LogicalPlan::TableScan(TableScan { - ref source, - ref table_name, - ref filters, - ref fetch, + source, + table_name, + filters, + fetch, .. }) => { let mut object = json!({ @@ -403,7 +403,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { object } - LogicalPlan::Projection(Projection { ref expr, .. }) => { + LogicalPlan::Projection(Projection { expr, .. }) => { json!({ "Node Type": "Projection", "Expressions": expr.iter().map(|e| e.to_string()).collect::>() @@ -443,25 +443,22 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { }) } LogicalPlan::Filter(Filter { - predicate: ref expr, - .. + predicate: expr, .. }) => { json!({ "Node Type": "Filter", "Condition": format!("{}", expr) }) } - LogicalPlan::Window(Window { - ref window_expr, .. - }) => { + LogicalPlan::Window(Window { window_expr, .. }) => { json!({ "Node Type": "WindowAggr", "Expressions": expr_vec_fmt!(window_expr) }) } LogicalPlan::Aggregate(Aggregate { - ref group_expr, - ref aggr_expr, + group_expr, + aggr_expr, .. }) => { json!({ @@ -483,7 +480,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { object } LogicalPlan::Join(Join { - on: ref keys, + on: keys, filter, join_constraint, join_type, @@ -534,11 +531,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { }) } }, - LogicalPlan::Limit(Limit { - ref skip, - ref fetch, - .. - }) => { + LogicalPlan::Limit(Limit { skip, fetch, .. }) => { let mut object = serde_json::json!( { "Node Type": "Limit", @@ -557,7 +550,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { "Node Type": "Subquery" }) } - LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { json!({ "Node Type": "Subquery", "Alias": alias.table(), diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ad9a46b004fc..3f7064144cfc 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1776,7 +1776,7 @@ impl LogicalPlan { }) => { write!(f, "RecursiveQuery: is_distinct={is_distinct}") } - LogicalPlan::Values(Values { ref values, .. }) => { + LogicalPlan::Values(Values { values, .. }) => { let str_values: Vec<_> = values .iter() // limit to only 5 values to avoid horrible display @@ -1796,11 +1796,11 @@ impl LogicalPlan { } LogicalPlan::TableScan(TableScan { - ref source, - ref table_name, - ref projection, - ref filters, - ref fetch, + source, + table_name, + projection, + filters, + fetch, .. }) => { let projected_fields = match projection { @@ -1870,7 +1870,7 @@ impl LogicalPlan { Ok(()) } - LogicalPlan::Projection(Projection { ref expr, .. }) => { + LogicalPlan::Projection(Projection { expr, .. }) => { write!(f, "Projection:")?; for (i, expr_item) in expr.iter().enumerate() { if i > 0 { @@ -1902,11 +1902,11 @@ impl LogicalPlan { write!(f, "{}", ddl.display()) } LogicalPlan::Filter(Filter { - predicate: ref expr, + predicate: expr, .. }) => write!(f, "Filter: {expr}"), LogicalPlan::Window(Window { - ref window_expr, .. + window_expr, .. }) => { write!( f, @@ -1915,8 +1915,8 @@ impl LogicalPlan { ) } LogicalPlan::Aggregate(Aggregate { - ref group_expr, - ref aggr_expr, + group_expr, + aggr_expr, .. }) => write!( f, @@ -1939,7 +1939,7 @@ impl LogicalPlan { Ok(()) } LogicalPlan::Join(Join { - on: ref keys, + on: keys, filter, join_constraint, join_type, @@ -2023,7 +2023,7 @@ impl LogicalPlan { LogicalPlan::Subquery(Subquery { .. }) => { write!(f, "Subquery:") } - LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => { write!(f, "SubqueryAlias: {alias}") } LogicalPlan::Statement(statement) => { diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index bcaff11bcdb4..ecb872800e73 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -693,7 +693,7 @@ fn get_valid_types( vec![current_types.to_vec()] } TypeSignature::Exact(valid_types) => vec![valid_types.clone()], - TypeSignature::ArraySignature(ref function_signature) => match function_signature { + TypeSignature::ArraySignature(function_signature) => match function_signature { ArrayFunctionSignature::Array { arguments, array_coercion, } => { array_valid_types(function_name, current_types, arguments, array_coercion.as_ref())? } diff --git a/datafusion/expr/src/window_state.rs b/datafusion/expr/src/window_state.rs index cdfb18ee1ddd..3d7356f767a3 100644 --- a/datafusion/expr/src/window_state.rs +++ b/datafusion/expr/src/window_state.rs @@ -170,7 +170,7 @@ impl WindowFrameContext { // comparison of rows. WindowFrameContext::Range { window_frame, - ref mut state, + state, } => state.calculate_range( window_frame, last_range, @@ -183,7 +183,7 @@ impl WindowFrameContext { // or position of NULLs do not impact inequality. WindowFrameContext::Groups { window_frame, - ref mut state, + state, } => state.calculate_range(window_frame, range_columns, length, idx), } } diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index d279951783b4..c9a2041dfeba 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -82,31 +82,39 @@ struct ProviderPrivateData { impl FFI_CatalogProvider { unsafe fn inner(&self) -> &Arc { - let private_data = self.private_data as *const ProviderPrivateData; - &(*private_data).provider + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } } unsafe fn runtime(&self) -> Option { - let private_data = self.private_data as *const ProviderPrivateData; - (*private_data).runtime.clone() + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } } } unsafe extern "C" fn schema_names_fn_wrapper( provider: &FFI_CatalogProvider, ) -> RVec { - let names = provider.inner().schema_names(); - names.into_iter().map(|s| s.into()).collect() + unsafe { + let names = provider.inner().schema_names(); + names.into_iter().map(|s| s.into()).collect() + } } unsafe extern "C" fn schema_fn_wrapper( provider: &FFI_CatalogProvider, name: RString, ) -> ROption { - let maybe_schema = provider.inner().schema(name.as_str()); - maybe_schema - .map(|schema| FFI_SchemaProvider::new(schema, provider.runtime())) - .into() + unsafe { + let maybe_schema = provider.inner().schema(name.as_str()); + maybe_schema + .map(|schema| FFI_SchemaProvider::new(schema, provider.runtime())) + .into() + } } unsafe extern "C" fn register_schema_fn_wrapper( @@ -114,16 +122,18 @@ unsafe extern "C" fn register_schema_fn_wrapper( name: RString, schema: &FFI_SchemaProvider, ) -> RResult, RString> { - let runtime = provider.runtime(); - let provider = provider.inner(); - let schema = Arc::new(ForeignSchemaProvider::from(schema)); + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); + let schema = Arc::new(ForeignSchemaProvider::from(schema)); - let returned_schema = - rresult_return!(provider.register_schema(name.as_str(), schema)) - .map(|schema| FFI_SchemaProvider::new(schema, runtime)) - .into(); + let returned_schema = + rresult_return!(provider.register_schema(name.as_str(), schema)) + .map(|schema| FFI_SchemaProvider::new(schema, runtime)) + .into(); - RResult::ROk(returned_schema) + RResult::ROk(returned_schema) + } } unsafe extern "C" fn deregister_schema_fn_wrapper( @@ -131,44 +141,51 @@ unsafe extern "C" fn deregister_schema_fn_wrapper( name: RString, cascade: bool, ) -> RResult, RString> { - let runtime = provider.runtime(); - let provider = provider.inner(); - - let maybe_schema = - rresult_return!(provider.deregister_schema(name.as_str(), cascade)); - - RResult::ROk( - maybe_schema - .map(|schema| FFI_SchemaProvider::new(schema, runtime)) - .into(), - ) + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); + + let maybe_schema = + rresult_return!(provider.deregister_schema(name.as_str(), cascade)); + + RResult::ROk( + maybe_schema + .map(|schema| FFI_SchemaProvider::new(schema, runtime)) + .into(), + ) + } } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) { - let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper( provider: &FFI_CatalogProvider, ) -> FFI_CatalogProvider { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); - - let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), - runtime, - })) as *mut c_void; - - FFI_CatalogProvider { - schema_names: schema_names_fn_wrapper, - schema: schema_fn_wrapper, - register_schema: register_schema_fn_wrapper, - deregister_schema: deregister_schema_fn_wrapper, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - version: super::version, - private_data, + unsafe { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_CatalogProvider { + schema_names: schema_names_fn_wrapper, + schema: schema_fn_wrapper, + register_schema: register_schema_fn_wrapper, + deregister_schema: deregister_schema_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } } } diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index b09f06d318c1..8525b3ddbb56 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -70,21 +70,27 @@ struct ProviderPrivateData { impl FFI_CatalogProviderList { unsafe fn inner(&self) -> &Arc { - let private_data = self.private_data as *const ProviderPrivateData; - &(*private_data).provider + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } } unsafe fn runtime(&self) -> Option { - let private_data = self.private_data as *const ProviderPrivateData; - (*private_data).runtime.clone() + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } } } unsafe extern "C" fn catalog_names_fn_wrapper( provider: &FFI_CatalogProviderList, ) -> RVec { - let names = provider.inner().catalog_names(); - names.into_iter().map(|s| s.into()).collect() + unsafe { + let names = provider.inner().catalog_names(); + names.into_iter().map(|s| s.into()).collect() + } } unsafe extern "C" fn register_catalog_fn_wrapper( @@ -92,52 +98,61 @@ unsafe extern "C" fn register_catalog_fn_wrapper( name: RString, catalog: &FFI_CatalogProvider, ) -> ROption { - let runtime = provider.runtime(); - let provider = provider.inner(); - let catalog = Arc::new(ForeignCatalogProvider::from(catalog)); - - provider - .register_catalog(name.into(), catalog) - .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) - .into() + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); + let catalog = Arc::new(ForeignCatalogProvider::from(catalog)); + + provider + .register_catalog(name.into(), catalog) + .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) + .into() + } } unsafe extern "C" fn catalog_fn_wrapper( provider: &FFI_CatalogProviderList, name: RString, ) -> ROption { - let runtime = provider.runtime(); - let provider = provider.inner(); - provider - .catalog(name.as_str()) - .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) - .into() + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); + provider + .catalog(name.as_str()) + .map(|catalog| FFI_CatalogProvider::new(catalog, runtime)) + .into() + } } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) { - let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper( provider: &FFI_CatalogProviderList, ) -> FFI_CatalogProviderList { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); - - let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), - runtime, - })) as *mut c_void; - - FFI_CatalogProviderList { - register_catalog: register_catalog_fn_wrapper, - catalog_names: catalog_names_fn_wrapper, - catalog: catalog_fn_wrapper, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - version: super::version, - private_data, + unsafe { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_CatalogProviderList { + register_catalog: register_catalog_fn_wrapper, + catalog_names: catalog_names_fn_wrapper, + catalog: catalog_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } } } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 70c957d8c373..e50d27c7fe2a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -79,66 +79,83 @@ pub struct ExecutionPlanPrivateData { unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; + unsafe { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; - plan.properties().into() + plan.properties().into() + } } unsafe extern "C" fn children_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> RVec { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - let ctx = &(*private_data).context; - let runtime = &(*private_data).runtime; - - let children: Vec<_> = plan - .children() - .into_iter() - .map(|child| { - FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone()) - }) - .collect(); - - children.into() + unsafe { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + let ctx = &(*private_data).context; + let runtime = &(*private_data).runtime; + + let children: Vec<_> = plan + .children() + .into_iter() + .map(|child| { + FFI_ExecutionPlan::new( + Arc::clone(child), + Arc::clone(ctx), + runtime.clone(), + ) + }) + .collect(); + + children.into() + } } unsafe extern "C" fn execute_fn_wrapper( plan: &FFI_ExecutionPlan, partition: usize, ) -> RResult { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; - let ctx = &(*private_data).context; - let runtime = (*private_data).runtime.clone(); - - rresult!(plan - .execute(partition, Arc::clone(ctx)) - .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))) + unsafe { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; + let ctx = &(*private_data).context; + let runtime = (*private_data).runtime.clone(); + + rresult!(plan + .execute(partition, Arc::clone(ctx)) + .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))) + } } unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan = &(*private_data).plan; + unsafe { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan = &(*private_data).plan; - plan.name().into() + plan.name().into() + } } unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) { - let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan { - let private_data = plan.private_data as *const ExecutionPlanPrivateData; - let plan_data = &(*private_data); - - FFI_ExecutionPlan::new( - Arc::clone(&plan_data.plan), - Arc::clone(&plan_data.context), - plan_data.runtime.clone(), - ) + unsafe { + let private_data = plan.private_data as *const ExecutionPlanPrivateData; + let plan_data = &(*private_data); + + FFI_ExecutionPlan::new( + Arc::clone(&plan_data.plan), + Arc::clone(&plan_data.context), + plan_data.runtime.clone(), + ) + } } impl Clone for FFI_ExecutionPlan { diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 48c2698a58c7..ef1f37f912ee 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -84,68 +84,80 @@ struct PlanPropertiesPrivateData { unsafe extern "C" fn output_partitioning_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; + unsafe { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; - let codec = DefaultPhysicalExtensionCodec {}; - let partitioning_data = - rresult_return!(serialize_partitioning(props.output_partitioning(), &codec)); - let output_partitioning = partitioning_data.encode_to_vec(); + let codec = DefaultPhysicalExtensionCodec {}; + let partitioning_data = + rresult_return!(serialize_partitioning(props.output_partitioning(), &codec)); + let output_partitioning = partitioning_data.encode_to_vec(); - ROk(output_partitioning.into()) + ROk(output_partitioning.into()) + } } unsafe extern "C" fn emission_type_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_EmissionType { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.emission_type.into() + unsafe { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + props.emission_type.into() + } } unsafe extern "C" fn boundedness_fn_wrapper( properties: &FFI_PlanProperties, ) -> FFI_Boundedness { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - props.boundedness.into() + unsafe { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + props.boundedness.into() + } } unsafe extern "C" fn output_ordering_fn_wrapper( properties: &FFI_PlanProperties, ) -> RResult, RString> { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; - - let codec = DefaultPhysicalExtensionCodec {}; - let output_ordering = match props.output_ordering() { - Some(ordering) => { - let physical_sort_expr_nodes = rresult_return!( - serialize_physical_sort_exprs(ordering.to_owned(), &codec) - ); - let ordering_data = PhysicalSortExprNodeCollection { - physical_sort_expr_nodes, - }; - - ordering_data.encode_to_vec() - } - None => Vec::default(), - }; - ROk(output_ordering.into()) + unsafe { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + + let codec = DefaultPhysicalExtensionCodec {}; + let output_ordering = match props.output_ordering() { + Some(ordering) => { + let physical_sort_expr_nodes = rresult_return!( + serialize_physical_sort_exprs(ordering.to_owned(), &codec) + ); + let ordering_data = PhysicalSortExprNodeCollection { + physical_sort_expr_nodes, + }; + + ordering_data.encode_to_vec() + } + None => Vec::default(), + }; + ROk(output_ordering.into()) + } } unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> WrappedSchema { - let private_data = properties.private_data as *const PlanPropertiesPrivateData; - let props = &(*private_data).props; + unsafe { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; - let schema: SchemaRef = Arc::clone(props.eq_properties.schema()); - schema.into() + let schema: SchemaRef = Arc::clone(props.eq_properties.schema()); + schema.into() + } } unsafe extern "C" fn release_fn_wrapper(props: &mut FFI_PlanProperties) { - let private_data = - Box::from_raw(props.private_data as *mut PlanPropertiesPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(props.private_data as *mut PlanPropertiesPrivateData); + drop(private_data); + } } impl Drop for FFI_PlanProperties { diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index 1739235d1703..8ab66ade418b 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -95,16 +95,20 @@ impl FFI_RecordBatchStream { unsafe impl Send for FFI_RecordBatchStream {} unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema { - let private_data = stream.private_data as *const RecordBatchStreamPrivateData; - let stream = &(*private_data).rbs; + unsafe { + let private_data = stream.private_data as *const RecordBatchStreamPrivateData; + let stream = &(*private_data).rbs; - (*stream).schema().into() + (*stream).schema().into() + } } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_RecordBatchStream) { - let private_data = - Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(provider.private_data as *mut RecordBatchStreamPrivateData); + drop(private_data); + } } fn record_batch_to_wrapped_array( @@ -136,18 +140,20 @@ unsafe extern "C" fn poll_next_fn_wrapper( stream: &FFI_RecordBatchStream, cx: &mut FfiContext, ) -> FfiPoll>> { - let private_data = stream.private_data as *mut RecordBatchStreamPrivateData; - let stream = &mut (*private_data).rbs; + unsafe { + let private_data = stream.private_data as *mut RecordBatchStreamPrivateData; + let stream = &mut (*private_data).rbs; - let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter()); + let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter()); - let poll_result = cx.with_context(|std_cx| { - (*stream) - .try_poll_next_unpin(std_cx) - .map(maybe_record_batch_to_wrapped_stream) - }); + let poll_result = cx.with_context(|std_cx| { + (*stream) + .try_poll_next_unpin(std_cx) + .map(maybe_record_batch_to_wrapped_stream) + }); - poll_result.into() + poll_result.into() + } } impl RecordBatchStream for FFI_RecordBatchStream { diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index b5970d5881d6..4f574d0467e7 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -92,40 +92,48 @@ struct ProviderPrivateData { impl FFI_SchemaProvider { unsafe fn inner(&self) -> &Arc { - let private_data = self.private_data as *const ProviderPrivateData; - &(*private_data).provider + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + &(*private_data).provider + } } unsafe fn runtime(&self) -> Option { - let private_data = self.private_data as *const ProviderPrivateData; - (*private_data).runtime.clone() + unsafe { + let private_data = self.private_data as *const ProviderPrivateData; + (*private_data).runtime.clone() + } } } unsafe extern "C" fn table_names_fn_wrapper( provider: &FFI_SchemaProvider, ) -> RVec { - let provider = provider.inner(); + unsafe { + let provider = provider.inner(); - let table_names = provider.table_names(); - table_names.into_iter().map(|s| s.into()).collect() + let table_names = provider.table_names(); + table_names.into_iter().map(|s| s.into()).collect() + } } unsafe extern "C" fn table_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, ) -> FfiFuture, RString>> { - let runtime = provider.runtime(); - let provider = Arc::clone(provider.inner()); + unsafe { + let runtime = provider.runtime(); + let provider = Arc::clone(provider.inner()); - async move { - let table = rresult_return!(provider.table(name.as_str()).await) - .map(|t| FFI_TableProvider::new(t, true, runtime)) - .into(); + async move { + let table = rresult_return!(provider.table(name.as_str()).await) + .map(|t| FFI_TableProvider::new(t, true, runtime)) + .into(); - RResult::ROk(table) + RResult::ROk(table) + } + .into_ffi() } - .into_ffi() } unsafe extern "C" fn register_table_fn_wrapper( @@ -133,64 +141,73 @@ unsafe extern "C" fn register_table_fn_wrapper( name: RString, table: FFI_TableProvider, ) -> RResult, RString> { - let runtime = provider.runtime(); - let provider = provider.inner(); + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); - let table = Arc::new(ForeignTableProvider(table)); + let table = Arc::new(ForeignTableProvider(table)); - let returned_table = rresult_return!(provider.register_table(name.into(), table)) - .map(|t| FFI_TableProvider::new(t, true, runtime)); + let returned_table = rresult_return!(provider.register_table(name.into(), table)) + .map(|t| FFI_TableProvider::new(t, true, runtime)); - RResult::ROk(returned_table.into()) + RResult::ROk(returned_table.into()) + } } unsafe extern "C" fn deregister_table_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, ) -> RResult, RString> { - let runtime = provider.runtime(); - let provider = provider.inner(); + unsafe { + let runtime = provider.runtime(); + let provider = provider.inner(); - let returned_table = rresult_return!(provider.deregister_table(name.as_str())) - .map(|t| FFI_TableProvider::new(t, true, runtime)); + let returned_table = rresult_return!(provider.deregister_table(name.as_str())) + .map(|t| FFI_TableProvider::new(t, true, runtime)); - RResult::ROk(returned_table.into()) + RResult::ROk(returned_table.into()) + } } unsafe extern "C" fn table_exist_fn_wrapper( provider: &FFI_SchemaProvider, name: RString, ) -> bool { - provider.inner().table_exist(name.as_str()) + unsafe { provider.inner().table_exist(name.as_str()) } } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) { - let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper( provider: &FFI_SchemaProvider, ) -> FFI_SchemaProvider { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); - - let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), - runtime, - })) as *mut c_void; - - FFI_SchemaProvider { - owner_name: provider.owner_name.clone(), - table_names: table_names_fn_wrapper, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - version: super::version, - private_data, - table: table_fn_wrapper, - register_table: register_table_fn_wrapper, - deregister_table: deregister_table_fn_wrapper, - table_exist: table_exist_fn_wrapper, + unsafe { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_SchemaProvider { + owner_name: provider.owner_name.clone(), + table_names: table_names_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + table: table_fn_wrapper, + register_table: register_table_fn_wrapper, + deregister_table: deregister_table_fn_wrapper, + table_exist: table_exist_fn_wrapper, + } } } diff --git a/datafusion/ffi/src/session_config.rs b/datafusion/ffi/src/session_config.rs index a07b66c60196..696ba64529f3 100644 --- a/datafusion/ffi/src/session_config.rs +++ b/datafusion/ffi/src/session_config.rs @@ -64,36 +64,42 @@ unsafe impl Sync for FFI_SessionConfig {} unsafe extern "C" fn config_options_fn_wrapper( config: &FFI_SessionConfig, ) -> RHashMap { - let private_data = config.private_data as *mut SessionConfigPrivateData; - let config_options = &(*private_data).config; + unsafe { + let private_data = config.private_data as *mut SessionConfigPrivateData; + let config_options = &(*private_data).config; - let mut options = RHashMap::default(); - for config_entry in config_options.entries() { - if let Some(value) = config_entry.value { - options.insert(config_entry.key.into(), value.into()); + let mut options = RHashMap::default(); + for config_entry in config_options.entries() { + if let Some(value) = config_entry.value { + options.insert(config_entry.key.into(), value.into()); + } } - } - options + options + } } unsafe extern "C" fn release_fn_wrapper(config: &mut FFI_SessionConfig) { - let private_data = - Box::from_raw(config.private_data as *mut SessionConfigPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(config.private_data as *mut SessionConfigPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(config: &FFI_SessionConfig) -> FFI_SessionConfig { - let old_private_data = config.private_data as *mut SessionConfigPrivateData; - let old_config = Arc::clone(&(*old_private_data).config); + unsafe { + let old_private_data = config.private_data as *mut SessionConfigPrivateData; + let old_config = Arc::clone(&(*old_private_data).config); - let private_data = Box::new(SessionConfigPrivateData { config: old_config }); + let private_data = Box::new(SessionConfigPrivateData { config: old_config }); - FFI_SessionConfig { - config_options: config_options_fn_wrapper, - private_data: Box::into_raw(private_data) as *mut c_void, - clone: clone_fn_wrapper, - release: release_fn_wrapper, + FFI_SessionConfig { + config_options: config_options_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + } } } diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 890511997a70..9f6ada9fea4a 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -167,19 +167,23 @@ struct ProviderPrivateData { } unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; + unsafe { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; - provider.schema().into() + provider.schema().into() + } } unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; + unsafe { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; - provider.table_type().into() + provider.table_type().into() + } } fn supports_filters_pushdown_internal( @@ -213,12 +217,14 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper( provider: &FFI_TableProvider, filters_serialized: RVec, ) -> RResult, RString> { - let private_data = provider.private_data as *const ProviderPrivateData; - let provider = &(*private_data).provider; + unsafe { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; - supports_filters_pushdown_internal(provider, &filters_serialized) - .map_err(|e| e.to_string().into()) - .into() + supports_filters_pushdown_internal(provider, &filters_serialized) + .map_err(|e| e.to_string().into()) + .into() + } } unsafe extern "C" fn scan_fn_wrapper( @@ -228,51 +234,54 @@ unsafe extern "C" fn scan_fn_wrapper( filters_serialized: RVec, limit: ROption, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; - let session_config = session_config.clone(); - let runtime = &(*private_data).runtime; - - async move { - let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); - let session = SessionStateBuilder::new() - .with_default_features() - .with_config(config.0) - .build(); - let ctx = SessionContext::new_with_state(session); - - let filters = match filters_serialized.is_empty() { - true => vec![], - false => { - let default_ctx = SessionContext::new(); - let codec = DefaultLogicalExtensionCodec {}; - - let proto_filters = - rresult_return!(LogicalExprList::decode(filters_serialized.as_ref())); - - rresult_return!(parse_exprs( - proto_filters.expr.iter(), - &default_ctx, - &codec - )) - } - }; + unsafe { + let private_data = provider.private_data as *mut ProviderPrivateData; + let internal_provider = &(*private_data).provider; + let session_config = session_config.clone(); + let runtime = &(*private_data).runtime; + + async move { + let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); + let session = SessionStateBuilder::new() + .with_default_features() + .with_config(config.0) + .build(); + let ctx = SessionContext::new_with_state(session); + + let filters = match filters_serialized.is_empty() { + true => vec![], + false => { + let default_ctx = SessionContext::new(); + let codec = DefaultLogicalExtensionCodec {}; + + let proto_filters = rresult_return!(LogicalExprList::decode( + filters_serialized.as_ref() + )); + + rresult_return!(parse_exprs( + proto_filters.expr.iter(), + &default_ctx, + &codec + )) + } + }; - let projections: Vec<_> = projections.into_iter().collect(); + let projections: Vec<_> = projections.into_iter().collect(); - let plan = rresult_return!( - internal_provider - .scan(&ctx.state(), Some(&projections), &filters, limit.into()) - .await - ); + let plan = rresult_return!( + internal_provider + .scan(&ctx.state(), Some(&projections), &filters, limit.into()) + .await + ); - RResult::ROk(FFI_ExecutionPlan::new( - plan, - ctx.task_ctx(), - runtime.clone(), - )) + RResult::ROk(FFI_ExecutionPlan::new( + plan, + ctx.task_ctx(), + runtime.clone(), + )) + } + .into_ffi() } - .into_ffi() } unsafe extern "C" fn insert_into_fn_wrapper( @@ -281,63 +290,71 @@ unsafe extern "C" fn insert_into_fn_wrapper( input: &FFI_ExecutionPlan, insert_op: FFI_InsertOp, ) -> FfiFuture> { - let private_data = provider.private_data as *mut ProviderPrivateData; - let internal_provider = &(*private_data).provider; - let session_config = session_config.clone(); - let input = input.clone(); - let runtime = &(*private_data).runtime; - - async move { - let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); - let session = SessionStateBuilder::new() - .with_default_features() - .with_config(config.0) - .build(); - let ctx = SessionContext::new_with_state(session); - - let input = rresult_return!(ForeignExecutionPlan::try_from(&input).map(Arc::new)); - - let insert_op = InsertOp::from(insert_op); - - let plan = rresult_return!( - internal_provider - .insert_into(&ctx.state(), input, insert_op) - .await - ); - - RResult::ROk(FFI_ExecutionPlan::new( - plan, - ctx.task_ctx(), - runtime.clone(), - )) + unsafe { + let private_data = provider.private_data as *mut ProviderPrivateData; + let internal_provider = &(*private_data).provider; + let session_config = session_config.clone(); + let input = input.clone(); + let runtime = &(*private_data).runtime; + + async move { + let config = rresult_return!(ForeignSessionConfig::try_from(&session_config)); + let session = SessionStateBuilder::new() + .with_default_features() + .with_config(config.0) + .build(); + let ctx = SessionContext::new_with_state(session); + + let input = + rresult_return!(ForeignExecutionPlan::try_from(&input).map(Arc::new)); + + let insert_op = InsertOp::from(insert_op); + + let plan = rresult_return!( + internal_provider + .insert_into(&ctx.state(), input, insert_op) + .await + ); + + RResult::ROk(FFI_ExecutionPlan::new( + plan, + ctx.task_ctx(), + runtime.clone(), + )) + } + .into_ffi() } - .into_ffi() } unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { - let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(provider.private_data as *mut ProviderPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { - let old_private_data = provider.private_data as *const ProviderPrivateData; - let runtime = (*old_private_data).runtime.clone(); - - let private_data = Box::into_raw(Box::new(ProviderPrivateData { - provider: Arc::clone(&(*old_private_data).provider), - runtime, - })) as *mut c_void; - - FFI_TableProvider { - schema: schema_fn_wrapper, - scan: scan_fn_wrapper, - table_type: table_type_fn_wrapper, - supports_filters_pushdown: provider.supports_filters_pushdown, - insert_into: provider.insert_into, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - version: super::version, - private_data, + unsafe { + let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); + + let private_data = Box::into_raw(Box::new(ProviderPrivateData { + provider: Arc::clone(&(*old_private_data).provider), + runtime, + })) as *mut c_void; + + FFI_TableProvider { + schema: schema_fn_wrapper, + scan: scan_fn_wrapper, + table_type: table_type_fn_wrapper, + supports_filters_pushdown: provider.supports_filters_pushdown, + insert_into: provider.insert_into, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + version: super::version, + private_data, + } } } diff --git a/datafusion/ffi/src/udaf/accumulator.rs b/datafusion/ffi/src/udaf/accumulator.rs index 80b872159f48..edb448f0b9ca 100644 --- a/datafusion/ffi/src/udaf/accumulator.rs +++ b/datafusion/ffi/src/udaf/accumulator.rs @@ -82,14 +82,18 @@ pub struct AccumulatorPrivateData { impl FFI_Accumulator { #[inline] unsafe fn inner_mut(&mut self) -> &mut Box { - let private_data = self.private_data as *mut AccumulatorPrivateData; - &mut (*private_data).accumulator + unsafe { + let private_data = self.private_data as *mut AccumulatorPrivateData; + &mut (*private_data).accumulator + } } #[inline] unsafe fn inner(&self) -> &dyn Accumulator { - let private_data = self.private_data as *const AccumulatorPrivateData; - (*private_data).accumulator.deref() + unsafe { + let private_data = self.private_data as *const AccumulatorPrivateData; + (*private_data).accumulator.deref() + } } } @@ -97,85 +101,97 @@ unsafe extern "C" fn update_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, values: RVec, ) -> RResult<(), RString> { - let accumulator = accumulator.inner_mut(); + unsafe { + let accumulator = accumulator.inner_mut(); - let values_arrays = values - .into_iter() - .map(|v| v.try_into().map_err(DataFusionError::from)) - .collect::>>(); - let values_arrays = rresult_return!(values_arrays); + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::>>(); + let values_arrays = rresult_return!(values_arrays); - rresult!(accumulator.update_batch(&values_arrays)) + rresult!(accumulator.update_batch(&values_arrays)) + } } unsafe extern "C" fn evaluate_fn_wrapper( accumulator: &mut FFI_Accumulator, ) -> RResult, RString> { - let accumulator = accumulator.inner_mut(); + unsafe { + let accumulator = accumulator.inner_mut(); - let scalar_result = rresult_return!(accumulator.evaluate()); - let proto_result: datafusion_proto::protobuf::ScalarValue = - rresult_return!((&scalar_result).try_into()); + let scalar_result = rresult_return!(accumulator.evaluate()); + let proto_result: datafusion_proto::protobuf::ScalarValue = + rresult_return!((&scalar_result).try_into()); - RResult::ROk(proto_result.encode_to_vec().into()) + RResult::ROk(proto_result.encode_to_vec().into()) + } } unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_Accumulator) -> usize { - accumulator.inner().size() + unsafe { accumulator.inner().size() } } unsafe extern "C" fn state_fn_wrapper( accumulator: &mut FFI_Accumulator, ) -> RResult>, RString> { - let accumulator = accumulator.inner_mut(); - - let state = rresult_return!(accumulator.state()); - let state = state - .into_iter() - .map(|state_val| { - datafusion_proto::protobuf::ScalarValue::try_from(&state_val) - .map_err(DataFusionError::from) - .map(|v| RVec::from(v.encode_to_vec())) - }) - .collect::>>() - .map(|state_vec| state_vec.into()); - - rresult!(state) + unsafe { + let accumulator = accumulator.inner_mut(); + + let state = rresult_return!(accumulator.state()); + let state = state + .into_iter() + .map(|state_val| { + datafusion_proto::protobuf::ScalarValue::try_from(&state_val) + .map_err(DataFusionError::from) + .map(|v| RVec::from(v.encode_to_vec())) + }) + .collect::>>() + .map(|state_vec| state_vec.into()); + + rresult!(state) + } } unsafe extern "C" fn merge_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, states: RVec, ) -> RResult<(), RString> { - let accumulator = accumulator.inner_mut(); + unsafe { + let accumulator = accumulator.inner_mut(); - let states = rresult_return!(states - .into_iter() - .map(|state| ArrayRef::try_from(state).map_err(DataFusionError::from)) - .collect::>>()); + let states = rresult_return!(states + .into_iter() + .map(|state| ArrayRef::try_from(state).map_err(DataFusionError::from)) + .collect::>>()); - rresult!(accumulator.merge_batch(&states)) + rresult!(accumulator.merge_batch(&states)) + } } unsafe extern "C" fn retract_batch_fn_wrapper( accumulator: &mut FFI_Accumulator, values: RVec, ) -> RResult<(), RString> { - let accumulator = accumulator.inner_mut(); + unsafe { + let accumulator = accumulator.inner_mut(); - let values_arrays = values - .into_iter() - .map(|v| v.try_into().map_err(DataFusionError::from)) - .collect::>>(); - let values_arrays = rresult_return!(values_arrays); + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::>>(); + let values_arrays = rresult_return!(values_arrays); - rresult!(accumulator.retract_batch(&values_arrays)) + rresult!(accumulator.retract_batch(&values_arrays)) + } } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_Accumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(accumulator.private_data as *mut AccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_Accumulator { diff --git a/datafusion/ffi/src/udaf/groups_accumulator.rs b/datafusion/ffi/src/udaf/groups_accumulator.rs index 58a18c69db7c..83516d312af0 100644 --- a/datafusion/ffi/src/udaf/groups_accumulator.rs +++ b/datafusion/ffi/src/udaf/groups_accumulator.rs @@ -98,14 +98,18 @@ pub struct GroupsAccumulatorPrivateData { impl FFI_GroupsAccumulator { #[inline] unsafe fn inner_mut(&mut self) -> &mut Box { - let private_data = self.private_data as *mut GroupsAccumulatorPrivateData; - &mut (*private_data).accumulator + unsafe { + let private_data = self.private_data as *mut GroupsAccumulatorPrivateData; + &mut (*private_data).accumulator + } } #[inline] unsafe fn inner(&self) -> &dyn GroupsAccumulator { - let private_data = self.private_data as *const GroupsAccumulatorPrivateData; - (*private_data).accumulator.deref() + unsafe { + let private_data = self.private_data as *const GroupsAccumulatorPrivateData; + (*private_data).accumulator.deref() + } } } @@ -135,46 +139,54 @@ unsafe extern "C" fn update_batch_fn_wrapper( opt_filter: ROption, total_num_groups: usize, ) -> RResult<(), RString> { - let accumulator = accumulator.inner_mut(); - let values = rresult_return!(process_values(values)); - let group_indices: Vec = group_indices.into_iter().collect(); - let opt_filter = rresult_return!(process_opt_filter(opt_filter)); - - rresult!(accumulator.update_batch( - &values, - &group_indices, - opt_filter.as_ref(), - total_num_groups - )) + unsafe { + let accumulator = accumulator.inner_mut(); + let values = rresult_return!(process_values(values)); + let group_indices: Vec = group_indices.into_iter().collect(); + let opt_filter = rresult_return!(process_opt_filter(opt_filter)); + + rresult!(accumulator.update_batch( + &values, + &group_indices, + opt_filter.as_ref(), + total_num_groups + )) + } } unsafe extern "C" fn evaluate_fn_wrapper( accumulator: &mut FFI_GroupsAccumulator, emit_to: FFI_EmitTo, ) -> RResult { - let accumulator = accumulator.inner_mut(); + unsafe { + let accumulator = accumulator.inner_mut(); - let result = rresult_return!(accumulator.evaluate(emit_to.into())); + let result = rresult_return!(accumulator.evaluate(emit_to.into())); - rresult!(WrappedArray::try_from(&result)) + rresult!(WrappedArray::try_from(&result)) + } } unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> usize { - let accumulator = accumulator.inner(); - accumulator.size() + unsafe { + let accumulator = accumulator.inner(); + accumulator.size() + } } unsafe extern "C" fn state_fn_wrapper( accumulator: &mut FFI_GroupsAccumulator, emit_to: FFI_EmitTo, ) -> RResult, RString> { - let accumulator = accumulator.inner_mut(); - - let state = rresult_return!(accumulator.state(emit_to.into())); - rresult!(state - .into_iter() - .map(|arr| WrappedArray::try_from(&arr).map_err(DataFusionError::from)) - .collect::>>()) + unsafe { + let accumulator = accumulator.inner_mut(); + + let state = rresult_return!(accumulator.state(emit_to.into())); + rresult!(state + .into_iter() + .map(|arr| WrappedArray::try_from(&arr).map_err(DataFusionError::from)) + .collect::>>()) + } } unsafe extern "C" fn merge_batch_fn_wrapper( @@ -184,17 +196,19 @@ unsafe extern "C" fn merge_batch_fn_wrapper( opt_filter: ROption, total_num_groups: usize, ) -> RResult<(), RString> { - let accumulator = accumulator.inner_mut(); - let values = rresult_return!(process_values(values)); - let group_indices: Vec = group_indices.into_iter().collect(); - let opt_filter = rresult_return!(process_opt_filter(opt_filter)); - - rresult!(accumulator.merge_batch( - &values, - &group_indices, - opt_filter.as_ref(), - total_num_groups - )) + unsafe { + let accumulator = accumulator.inner_mut(); + let values = rresult_return!(process_values(values)); + let group_indices: Vec = group_indices.into_iter().collect(); + let opt_filter = rresult_return!(process_opt_filter(opt_filter)); + + rresult!(accumulator.merge_batch( + &values, + &group_indices, + opt_filter.as_ref(), + total_num_groups + )) + } } unsafe extern "C" fn convert_to_state_fn_wrapper( @@ -202,22 +216,26 @@ unsafe extern "C" fn convert_to_state_fn_wrapper( values: RVec, opt_filter: ROption, ) -> RResult, RString> { - let accumulator = accumulator.inner(); - let values = rresult_return!(process_values(values)); - let opt_filter = rresult_return!(process_opt_filter(opt_filter)); - let state = - rresult_return!(accumulator.convert_to_state(&values, opt_filter.as_ref())); - - rresult!(state - .iter() - .map(|arr| WrappedArray::try_from(arr).map_err(DataFusionError::from)) - .collect::>>()) + unsafe { + let accumulator = accumulator.inner(); + let values = rresult_return!(process_values(values)); + let opt_filter = rresult_return!(process_opt_filter(opt_filter)); + let state = + rresult_return!(accumulator.convert_to_state(&values, opt_filter.as_ref())); + + rresult!(state + .iter() + .map(|arr| WrappedArray::try_from(arr).map_err(DataFusionError::from)) + .collect::>>()) + } } unsafe extern "C" fn release_fn_wrapper(accumulator: &mut FFI_GroupsAccumulator) { - let private_data = - Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(accumulator.private_data as *mut GroupsAccumulatorPrivateData); + drop(private_data); + } } impl From> for FFI_GroupsAccumulator { diff --git a/datafusion/ffi/src/udaf/mod.rs b/datafusion/ffi/src/udaf/mod.rs index ce5611590b67..266119365d74 100644 --- a/datafusion/ffi/src/udaf/mod.rs +++ b/datafusion/ffi/src/udaf/mod.rs @@ -156,8 +156,10 @@ pub struct AggregateUDFPrivateData { impl FFI_AggregateUDF { unsafe fn inner(&self) -> &Arc { - let private_data = self.private_data as *const AggregateUDFPrivateData; - &(*private_data).udaf + unsafe { + let private_data = self.private_data as *const AggregateUDFPrivateData; + &(*private_data).udaf + } } } @@ -165,87 +167,99 @@ unsafe extern "C" fn return_field_fn_wrapper( udaf: &FFI_AggregateUDF, arg_fields: RVec, ) -> RResult { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let arg_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&arg_fields)); + let arg_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&arg_fields)); - let return_field = udaf - .return_field(&arg_fields) - .and_then(|v| { - FFI_ArrowSchema::try_from(v.as_ref()).map_err(DataFusionError::from) - }) - .map(WrappedSchema); + let return_field = udaf + .return_field(&arg_fields) + .and_then(|v| { + FFI_ArrowSchema::try_from(v.as_ref()).map_err(DataFusionError::from) + }) + .map(WrappedSchema); - rresult!(return_field) + rresult!(return_field) + } } unsafe extern "C" fn accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, ) -> RResult { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); + let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); - rresult!(udaf - .accumulator(accumulator_args.into()) - .map(FFI_Accumulator::from)) + rresult!(udaf + .accumulator(accumulator_args.into()) + .map(FFI_Accumulator::from)) + } } unsafe extern "C" fn create_sliding_accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, ) -> RResult { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); + let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); - rresult!(udaf - .create_sliding_accumulator(accumulator_args.into()) - .map(FFI_Accumulator::from)) + rresult!(udaf + .create_sliding_accumulator(accumulator_args.into()) + .map(FFI_Accumulator::from)) + } } unsafe extern "C" fn create_groups_accumulator_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, ) -> RResult { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); + let accumulator_args = &rresult_return!(ForeignAccumulatorArgs::try_from(args)); - rresult!(udaf - .create_groups_accumulator(accumulator_args.into()) - .map(FFI_GroupsAccumulator::from)) + rresult!(udaf + .create_groups_accumulator(accumulator_args.into()) + .map(FFI_GroupsAccumulator::from)) + } } unsafe extern "C" fn groups_accumulator_supported_fn_wrapper( udaf: &FFI_AggregateUDF, args: FFI_AccumulatorArgs, ) -> bool { - let udaf = udaf.inner(); - - ForeignAccumulatorArgs::try_from(args) - .map(|a| udaf.groups_accumulator_supported((&a).into())) - .unwrap_or_else(|e| { - log::warn!("Unable to parse accumulator args. {e}"); - false - }) + unsafe { + let udaf = udaf.inner(); + + ForeignAccumulatorArgs::try_from(args) + .map(|a| udaf.groups_accumulator_supported((&a).into())) + .unwrap_or_else(|e| { + log::warn!("Unable to parse accumulator args. {e}"); + false + }) + } } unsafe extern "C" fn with_beneficial_ordering_fn_wrapper( udaf: &FFI_AggregateUDF, beneficial_ordering: bool, ) -> RResult, RString> { - let udaf = udaf.inner().as_ref().clone(); + unsafe { + let udaf = udaf.inner().as_ref().clone(); - let result = rresult_return!(udaf.with_beneficial_ordering(beneficial_ordering)); - let result = rresult_return!(result - .map(|func| func.with_beneficial_ordering(beneficial_ordering)) - .transpose()) - .flatten() - .map(|func| FFI_AggregateUDF::from(Arc::new(func))); + let result = rresult_return!(udaf.with_beneficial_ordering(beneficial_ordering)); + let result = rresult_return!(result + .map(|func| func.with_beneficial_ordering(beneficial_ordering)) + .transpose()) + .flatten() + .map(|func| FFI_AggregateUDF::from(Arc::new(func))); - RResult::ROk(result.into()) + RResult::ROk(result.into()) + } } unsafe extern "C" fn state_fields_fn_wrapper( @@ -256,77 +270,87 @@ unsafe extern "C" fn state_fields_fn_wrapper( ordering_fields: RVec>, is_distinct: bool, ) -> RResult>, RString> { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let input_fields = &rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); - let return_field = rresult_return!(Field::try_from(&return_field.0)).into(); + let input_fields = &rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); + let return_field = rresult_return!(Field::try_from(&return_field.0)).into(); - let ordering_fields = &rresult_return!(ordering_fields - .into_iter() - .map(|field_bytes| datafusion_proto_common::Field::decode(field_bytes.as_ref())) - .collect::, DecodeError>>()); + let ordering_fields = &rresult_return!(ordering_fields + .into_iter() + .map(|field_bytes| datafusion_proto_common::Field::decode( + field_bytes.as_ref() + )) + .collect::, DecodeError>>()); + + let ordering_fields = + &rresult_return!(parse_proto_fields_to_fields(ordering_fields)) + .into_iter() + .map(Arc::new) + .collect::>(); + + let args = StateFieldsArgs { + name: name.as_str(), + input_fields, + return_field, + ordering_fields, + is_distinct, + }; - let ordering_fields = &rresult_return!(parse_proto_fields_to_fields(ordering_fields)) + let state_fields = rresult_return!(udaf.state_fields(args)); + let state_fields = rresult_return!(state_fields + .iter() + .map(|f| f.as_ref()) + .map(datafusion_proto::protobuf::Field::try_from) + .map(|v| v.map_err(DataFusionError::from)) + .collect::>>()) .into_iter() - .map(Arc::new) - .collect::>(); - - let args = StateFieldsArgs { - name: name.as_str(), - input_fields, - return_field, - ordering_fields, - is_distinct, - }; + .map(|field| field.encode_to_vec().into()) + .collect(); - let state_fields = rresult_return!(udaf.state_fields(args)); - let state_fields = rresult_return!(state_fields - .iter() - .map(|f| f.as_ref()) - .map(datafusion_proto::protobuf::Field::try_from) - .map(|v| v.map_err(DataFusionError::from)) - .collect::>>()) - .into_iter() - .map(|field| field.encode_to_vec().into()) - .collect(); - - RResult::ROk(state_fields) + RResult::ROk(state_fields) + } } unsafe extern "C" fn order_sensitivity_fn_wrapper( udaf: &FFI_AggregateUDF, ) -> FFI_AggregateOrderSensitivity { - udaf.inner().order_sensitivity().into() + unsafe { udaf.inner().order_sensitivity().into() } } unsafe extern "C" fn coerce_types_fn_wrapper( udaf: &FFI_AggregateUDF, arg_types: RVec, ) -> RResult, RString> { - let udaf = udaf.inner(); + unsafe { + let udaf = udaf.inner(); - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); + let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - let arg_fields = arg_types - .iter() - .map(|dt| Field::new("f", dt.clone(), true)) - .map(Arc::new) - .collect::>(); - let return_types = rresult_return!(fields_with_aggregate_udf(&arg_fields, udaf)) - .into_iter() - .map(|f| f.data_type().to_owned()) - .collect::>(); + let arg_fields = arg_types + .iter() + .map(|dt| Field::new("f", dt.clone(), true)) + .map(Arc::new) + .collect::>(); + let return_types = rresult_return!(fields_with_aggregate_udf(&arg_fields, udaf)) + .into_iter() + .map(|f| f.data_type().to_owned()) + .collect::>(); - rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + } } unsafe extern "C" fn release_fn_wrapper(udaf: &mut FFI_AggregateUDF) { - let private_data = Box::from_raw(udaf.private_data as *mut AggregateUDFPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(udaf.private_data as *mut AggregateUDFPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(udaf: &FFI_AggregateUDF) -> FFI_AggregateUDF { - Arc::clone(udaf.inner()).into() + unsafe { Arc::clone(udaf.inner()).into() } } impl Clone for FFI_AggregateUDF { diff --git a/datafusion/ffi/src/udf/mod.rs b/datafusion/ffi/src/udf/mod.rs index 5e59cfc5ecb0..6a186bd3ac37 100644 --- a/datafusion/ffi/src/udf/mod.rs +++ b/datafusion/ffi/src/udf/mod.rs @@ -127,49 +127,55 @@ unsafe extern "C" fn return_type_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; + unsafe { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf = &(*private_data).udf; - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); + let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - let return_type = udf - .return_type(&arg_types) - .and_then(|v| FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from)) - .map(WrappedSchema); + let return_type = udf + .return_type(&arg_types) + .and_then(|v| FFI_ArrowSchema::try_from(v).map_err(DataFusionError::from)) + .map(WrappedSchema); - rresult!(return_type) + rresult!(return_type) + } } unsafe extern "C" fn return_field_from_args_fn_wrapper( udf: &FFI_ScalarUDF, args: FFI_ReturnFieldArgs, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; + unsafe { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf = &(*private_data).udf; - let args: ForeignReturnFieldArgsOwned = rresult_return!((&args).try_into()); - let args_ref: ForeignReturnFieldArgs = (&args).into(); + let args: ForeignReturnFieldArgsOwned = rresult_return!((&args).try_into()); + let args_ref: ForeignReturnFieldArgs = (&args).into(); - let return_type = udf - .return_field_from_args((&args_ref).into()) - .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(DataFusionError::from)) - .map(WrappedSchema); + let return_type = udf + .return_field_from_args((&args_ref).into()) + .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(DataFusionError::from)) + .map(WrappedSchema); - rresult!(return_type) + rresult!(return_type) + } } unsafe extern "C" fn coerce_types_fn_wrapper( udf: &FFI_ScalarUDF, arg_types: RVec, ) -> RResult, RString> { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; + unsafe { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf = &(*private_data).udf; - let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); + let arg_types = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)); - let return_types = rresult_return!(data_types_with_scalar_udf(&arg_types, udf)); + let return_types = rresult_return!(data_types_with_scalar_udf(&arg_types, udf)); - rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + } } unsafe extern "C" fn invoke_with_args_fn_wrapper( @@ -179,61 +185,67 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper( number_rows: usize, return_field: WrappedSchema, ) -> RResult { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf = &(*private_data).udf; - - let args = args - .into_iter() - .map(|arr| { - from_ffi(arr.array, &arr.schema.0) - .map(|v| ColumnarValue::Array(arrow::array::make_array(v))) - }) - .collect::>(); + unsafe { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf = &(*private_data).udf; - let args = rresult_return!(args); - let return_field = rresult_return!(Field::try_from(&return_field.0)).into(); + let args = args + .into_iter() + .map(|arr| { + from_ffi(arr.array, &arr.schema.0) + .map(|v| ColumnarValue::Array(arrow::array::make_array(v))) + }) + .collect::>(); - let arg_fields = arg_fields - .into_iter() - .map(|wrapped_field| { - Field::try_from(&wrapped_field.0) - .map(Arc::new) - .map_err(DataFusionError::from) + let args = rresult_return!(args); + let return_field = rresult_return!(Field::try_from(&return_field.0)).into(); + + let arg_fields = arg_fields + .into_iter() + .map(|wrapped_field| { + Field::try_from(&wrapped_field.0) + .map(Arc::new) + .map_err(DataFusionError::from) + }) + .collect::>>(); + let arg_fields = rresult_return!(arg_fields); + + let args = ScalarFunctionArgs { + args, + arg_fields, + number_rows, + return_field, + // TODO: pass config options: https://github.com/apache/datafusion/issues/17035 + config_options: Arc::new(ConfigOptions::default()), + }; + + let result = rresult_return!(udf + .invoke_with_args(args) + .and_then(|r| r.to_array(number_rows))); + + let (result_array, result_schema) = rresult_return!(to_ffi(&result.to_data())); + + RResult::ROk(WrappedArray { + array: result_array, + schema: WrappedSchema(result_schema), }) - .collect::>>(); - let arg_fields = rresult_return!(arg_fields); - - let args = ScalarFunctionArgs { - args, - arg_fields, - number_rows, - return_field, - // TODO: pass config options: https://github.com/apache/datafusion/issues/17035 - config_options: Arc::new(ConfigOptions::default()), - }; - - let result = rresult_return!(udf - .invoke_with_args(args) - .and_then(|r| r.to_array(number_rows))); - - let (result_array, result_schema) = rresult_return!(to_ffi(&result.to_data())); - - RResult::ROk(WrappedArray { - array: result_array, - schema: WrappedSchema(result_schema), - }) + } } unsafe extern "C" fn release_fn_wrapper(udf: &mut FFI_ScalarUDF) { - let private_data = Box::from_raw(udf.private_data as *mut ScalarUDFPrivateData); - drop(private_data); + unsafe { + let private_data = Box::from_raw(udf.private_data as *mut ScalarUDFPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(udf: &FFI_ScalarUDF) -> FFI_ScalarUDF { - let private_data = udf.private_data as *const ScalarUDFPrivateData; - let udf_data = &(*private_data); + unsafe { + let private_data = udf.private_data as *const ScalarUDFPrivateData; + let udf_data = &(*private_data); - Arc::clone(&udf_data.udf).into() + Arc::clone(&udf_data.udf).into() + } } impl Clone for FFI_ScalarUDF { diff --git a/datafusion/ffi/src/udtf.rs b/datafusion/ffi/src/udtf.rs index edd5273c70a8..97008a74cd27 100644 --- a/datafusion/ffi/src/udtf.rs +++ b/datafusion/ffi/src/udtf.rs @@ -105,8 +105,11 @@ unsafe extern "C" fn call_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(udtf: &mut FFI_TableFunction) { - let private_data = Box::from_raw(udtf.private_data as *mut TableFunctionPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(udtf.private_data as *mut TableFunctionPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(udtf: &FFI_TableFunction) -> FFI_TableFunction { diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index 9f56e2d4788b..1ac33fb17401 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -116,8 +116,10 @@ pub struct WindowUDFPrivateData { impl FFI_WindowUDF { unsafe fn inner(&self) -> &Arc { - let private_data = self.private_data as *const WindowUDFPrivateData; - &(*private_data).udf + unsafe { + let private_data = self.private_data as *const WindowUDFPrivateData; + &(*private_data).udf + } } } @@ -125,13 +127,16 @@ unsafe extern "C" fn partition_evaluator_fn_wrapper( udwf: &FFI_WindowUDF, args: FFI_PartitionEvaluatorArgs, ) -> RResult { - let inner = udwf.inner(); + unsafe { + let inner = udwf.inner(); - let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args)); + let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args)); - let evaluator = rresult_return!(inner.partition_evaluator_factory((&args).into())); + let evaluator = + rresult_return!(inner.partition_evaluator_factory((&args).into())); - RResult::ROk(evaluator.into()) + RResult::ROk(evaluator.into()) + } } unsafe extern "C" fn field_fn_wrapper( @@ -139,68 +144,76 @@ unsafe extern "C" fn field_fn_wrapper( input_fields: RVec, display_name: RString, ) -> RResult { - let inner = udwf.inner(); + unsafe { + let inner = udwf.inner(); - let input_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); + let input_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); - let field = rresult_return!(inner.field(WindowUDFFieldArgs::new( - &input_fields, - display_name.as_str() - ))); + let field = rresult_return!(inner.field(WindowUDFFieldArgs::new( + &input_fields, + display_name.as_str() + ))); - let schema = Arc::new(Schema::new(vec![field])); + let schema = Arc::new(Schema::new(vec![field])); - RResult::ROk(WrappedSchema::from(schema)) + RResult::ROk(WrappedSchema::from(schema)) + } } unsafe extern "C" fn coerce_types_fn_wrapper( udwf: &FFI_WindowUDF, arg_types: RVec, ) -> RResult, RString> { - let inner = udwf.inner(); - - let arg_fields = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)) - .into_iter() - .map(|dt| Field::new("f", dt, false)) - .map(Arc::new) - .collect::>(); - - let return_fields = rresult_return!(fields_with_window_udf(&arg_fields, inner)); - let return_types = return_fields - .into_iter() - .map(|f| f.data_type().to_owned()) - .collect::>(); - - rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + unsafe { + let inner = udwf.inner(); + + let arg_fields = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)) + .into_iter() + .map(|dt| Field::new("f", dt, false)) + .map(Arc::new) + .collect::>(); + + let return_fields = rresult_return!(fields_with_window_udf(&arg_fields, inner)); + let return_types = return_fields + .into_iter() + .map(|f| f.data_type().to_owned()) + .collect::>(); + + rresult!(vec_datatype_to_rvec_wrapped(&return_types)) + } } unsafe extern "C" fn release_fn_wrapper(udwf: &mut FFI_WindowUDF) { - let private_data = Box::from_raw(udwf.private_data as *mut WindowUDFPrivateData); - drop(private_data); + unsafe { + let private_data = Box::from_raw(udwf.private_data as *mut WindowUDFPrivateData); + drop(private_data); + } } unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { - // let private_data = udf.private_data as *const WindowUDFPrivateData; - // let udf_data = &(*private_data); - - // let private_data = Box::new(WindowUDFPrivateData { - // udf: Arc::clone(&udf_data.udf), - // }); - let private_data = Box::new(WindowUDFPrivateData { - udf: Arc::clone(udwf.inner()), - }); - - FFI_WindowUDF { - name: udwf.name.clone(), - aliases: udwf.aliases.clone(), - volatility: udwf.volatility.clone(), - partition_evaluator: partition_evaluator_fn_wrapper, - sort_options: udwf.sort_options.clone(), - coerce_types: coerce_types_fn_wrapper, - field: field_fn_wrapper, - clone: clone_fn_wrapper, - release: release_fn_wrapper, - private_data: Box::into_raw(private_data) as *mut c_void, + unsafe { + // let private_data = udf.private_data as *const WindowUDFPrivateData; + // let udf_data = &(*private_data); + + // let private_data = Box::new(WindowUDFPrivateData { + // udf: Arc::clone(&udf_data.udf), + // }); + let private_data = Box::new(WindowUDFPrivateData { + udf: Arc::clone(udwf.inner()), + }); + + FFI_WindowUDF { + name: udwf.name.clone(), + aliases: udwf.aliases.clone(), + volatility: udwf.volatility.clone(), + partition_evaluator: partition_evaluator_fn_wrapper, + sort_options: udwf.sort_options.clone(), + coerce_types: coerce_types_fn_wrapper, + field: field_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } } } diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs index 14cf23b919aa..1f3c52326131 100644 --- a/datafusion/ffi/src/udwf/partition_evaluator.rs +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -87,13 +87,17 @@ pub struct PartitionEvaluatorPrivateData { impl FFI_PartitionEvaluator { unsafe fn inner_mut(&mut self) -> &mut Box { - let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; - &mut (*private_data).evaluator + unsafe { + let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; + &mut (*private_data).evaluator + } } unsafe fn inner(&self) -> &(dyn PartitionEvaluator + 'static) { - let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; - (*private_data).evaluator.as_ref() + unsafe { + let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; + (*private_data).evaluator.as_ref() + } } } @@ -102,19 +106,24 @@ unsafe extern "C" fn evaluate_all_fn_wrapper( values: RVec, num_rows: usize, ) -> RResult { - let inner = evaluator.inner_mut(); - - let values_arrays = values - .into_iter() - .map(|v| v.try_into().map_err(DataFusionError::from)) - .collect::>>(); - let values_arrays = rresult_return!(values_arrays); - - let return_array = inner - .evaluate_all(&values_arrays, num_rows) - .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); - - rresult!(return_array) + unsafe { + let inner = evaluator.inner_mut(); + + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::>>(); + let values_arrays = rresult_return!(values_arrays); + + let return_array = + inner + .evaluate_all(&values_arrays, num_rows) + .and_then(|array| { + WrappedArray::try_from(&array).map_err(DataFusionError::from) + }); + + rresult!(return_array) + } } unsafe extern "C" fn evaluate_fn_wrapper( @@ -122,21 +131,24 @@ unsafe extern "C" fn evaluate_fn_wrapper( values: RVec, range: FFI_Range, ) -> RResult, RString> { - let inner = evaluator.inner_mut(); - - let values_arrays = values - .into_iter() - .map(|v| v.try_into().map_err(DataFusionError::from)) - .collect::>>(); - let values_arrays = rresult_return!(values_arrays); - - // let return_array = (inner.evaluate(&values_arrays, &range.into())); - // .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); - let scalar_result = rresult_return!(inner.evaluate(&values_arrays, &range.into())); - let proto_result: datafusion_proto::protobuf::ScalarValue = - rresult_return!((&scalar_result).try_into()); - - RResult::ROk(proto_result.encode_to_vec().into()) + unsafe { + let inner = evaluator.inner_mut(); + + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::>>(); + let values_arrays = rresult_return!(values_arrays); + + // let return_array = (inner.evaluate(&values_arrays, &range.into())); + // .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); + let scalar_result = + rresult_return!(inner.evaluate(&values_arrays, &range.into())); + let proto_result: datafusion_proto::protobuf::ScalarValue = + rresult_return!((&scalar_result).try_into()); + + RResult::ROk(proto_result.encode_to_vec().into()) + } } unsafe extern "C" fn evaluate_all_with_rank_fn_wrapper( @@ -144,18 +156,22 @@ unsafe extern "C" fn evaluate_all_with_rank_fn_wrapper( num_rows: usize, ranks_in_partition: RVec, ) -> RResult { - let inner = evaluator.inner(); + unsafe { + let inner = evaluator.inner(); - let ranks_in_partition = ranks_in_partition - .into_iter() - .map(Range::from) - .collect::>(); + let ranks_in_partition = ranks_in_partition + .into_iter() + .map(Range::from) + .collect::>(); - let return_array = inner - .evaluate_all_with_rank(num_rows, &ranks_in_partition) - .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); + let return_array = inner + .evaluate_all_with_rank(num_rows, &ranks_in_partition) + .and_then(|array| { + WrappedArray::try_from(&array).map_err(DataFusionError::from) + }); - rresult!(return_array) + rresult!(return_array) + } } unsafe extern "C" fn get_range_fn_wrapper( @@ -163,16 +179,20 @@ unsafe extern "C" fn get_range_fn_wrapper( idx: usize, n_rows: usize, ) -> RResult { - let inner = evaluator.inner(); - let range = inner.get_range(idx, n_rows).map(FFI_Range::from); + unsafe { + let inner = evaluator.inner(); + let range = inner.get_range(idx, n_rows).map(FFI_Range::from); - rresult!(range) + rresult!(range) + } } unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) { - let private_data = - Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); - drop(private_data); + unsafe { + let private_data = + Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); + drop(private_data); + } } impl From> for FFI_PartitionEvaluator { diff --git a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs index 6381db63122d..5491c09214f9 100644 --- a/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs +++ b/datafusion/optimizer/src/analyzer/resolve_grouping_function.rs @@ -150,7 +150,7 @@ fn analyze_internal(plan: LogicalPlan) -> Result> { fn is_grouping_function(expr: &Expr) -> bool { // TODO: Do something better than name here should grouping be a built // in expression? - matches!(expr, Expr::AggregateFunction(AggregateFunction { ref func, .. }) if func.name() == "grouping") + matches!(expr, Expr::AggregateFunction(AggregateFunction { func, .. }) if func.name() == "grouping") } fn contains_grouping_function(exprs: &[Expr]) -> bool { diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index efdd6fcb6265..90d9d43af711 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -783,8 +783,10 @@ mod test { #[test] pub fn test_evaluate_selection_with_non_empty_record_batch() { + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }; test_evaluate_selection( - unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, + &batch, &BooleanArray::from(vec![true; 10]), &ColumnarValue::Array(Arc::new(Int64Array::from(vec![1; 10]))), ); @@ -793,36 +795,32 @@ mod test { #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_false_selection( ) { - test_evaluate_selection_error( - unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, - &BooleanArray::from(vec![false; 20]), - ); + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }; + test_evaluate_selection_error(&batch, &BooleanArray::from(vec![false; 20])); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_larger_true_selection( ) { - test_evaluate_selection_error( - unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, - &BooleanArray::from(vec![true; 20]), - ); + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }; + test_evaluate_selection_error(&batch, &BooleanArray::from(vec![true; 20])); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_false_selection( ) { - test_evaluate_selection_error( - unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, - &BooleanArray::from(vec![false; 5]), - ); + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }; + test_evaluate_selection_error(&batch, &BooleanArray::from(vec![false; 5])); } #[test] pub fn test_evaluate_selection_with_non_empty_record_batch_with_smaller_true_selection( ) { - test_evaluate_selection_error( - unsafe { &RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }, - &BooleanArray::from(vec![true; 5]), - ); + let batch = + unsafe { RecordBatch::new_unchecked(Arc::new(Schema::empty()), vec![], 10) }; + test_evaluate_selection_error(&batch, &BooleanArray::from(vec![true; 5])); } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 1e5c7e702440..1eadc074ba98 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -278,8 +278,7 @@ impl PhysicalExpr for BinaryExpr { let result_type = self.data_type(input_schema)?; // If the left-hand side is an array and the right-hand side is a non-null scalar, try the optimized kernel. - if let (ColumnarValue::Array(array), ColumnarValue::Scalar(ref scalar)) = - (&lhs, &rhs) + if let (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) = (&lhs, &rhs) { if !scalar.is_null() { if let Some(result_array) = diff --git a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs index 974aea3b6292..1915c2c0bc38 100644 --- a/datafusion/physical-plan/src/aggregates/topk/hash_table.rs +++ b/datafusion/physical-plan/src/aggregates/topk/hash_table.rs @@ -131,20 +131,24 @@ impl ArrowHashTable for StringHashTable { } unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - self.map.update_heap_idx(mapper); + unsafe { + self.map.update_heap_idx(mapper); + } } unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - self.map.heap_idx_at(map_idx) + unsafe { self.map.heap_idx_at(map_idx) } } unsafe fn take_all(&mut self, indexes: Vec) -> ArrayRef { - let ids = self.map.take_all(indexes); - match self.data_type { - DataType::Utf8 => Arc::new(StringArray::from(ids)), - DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)), - DataType::Utf8View => Arc::new(StringViewArray::from(ids)), - _ => unreachable!(), + unsafe { + let ids = self.map.take_all(indexes); + match self.data_type { + DataType::Utf8 => Arc::new(StringArray::from(ids)), + DataType::LargeUtf8 => Arc::new(LargeStringArray::from(ids)), + DataType::Utf8View => Arc::new(StringViewArray::from(ids)), + _ => unreachable!(), + } } } @@ -154,61 +158,63 @@ impl ArrowHashTable for StringHashTable { replace_idx: usize, mapper: &mut Vec<(usize, usize)>, ) -> (usize, bool) { - let id = match self.data_type { - DataType::Utf8 => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected StringArray for DataType::Utf8"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) + unsafe { + let id = match self.data_type { + DataType::Utf8 => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected StringArray for DataType::Utf8"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) + } } - } - DataType::LargeUtf8 => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected LargeStringArray for DataType::LargeUtf8"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) + DataType::LargeUtf8 => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected LargeStringArray for DataType::LargeUtf8"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) + } } - } - DataType::Utf8View => { - let ids = self - .owned - .as_any() - .downcast_ref::() - .expect("Expected StringViewArray for DataType::Utf8View"); - if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) + DataType::Utf8View => { + let ids = self + .owned + .as_any() + .downcast_ref::() + .expect("Expected StringViewArray for DataType::Utf8View"); + if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) + } } + _ => panic!("Unsupported data type"), + }; + + let hash = self.rnd.hash_one(id); + if let Some(map_idx) = self + .map + .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str())) + { + return (map_idx, false); } - _ => panic!("Unsupported data type"), - }; - let hash = self.rnd.hash_one(id); - if let Some(map_idx) = self - .map - .find(hash, |mi| id == mi.as_ref().map(|id| id.as_str())) - { - return (map_idx, false); - } - - // we're full and this is a better value, so remove the worst - let heap_idx = self.map.remove_if_full(replace_idx); + // we're full and this is a better value, so remove the worst + let heap_idx = self.map.remove_if_full(replace_idx); - // add the new group - let id = id.map(|id| id.to_string()); - let map_idx = self.map.insert(hash, id, heap_idx, mapper); - (map_idx, true) + // add the new group + let id = id.map(|id| id.to_string()); + let map_idx = self.map.insert(hash, id, heap_idx, mapper); + (map_idx, true) + } } } @@ -246,25 +252,29 @@ where } unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - self.map.update_heap_idx(mapper); + unsafe { + self.map.update_heap_idx(mapper); + } } unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - self.map.heap_idx_at(map_idx) + unsafe { self.map.heap_idx_at(map_idx) } } unsafe fn take_all(&mut self, indexes: Vec) -> ArrayRef { - let ids = self.map.take_all(indexes); - let mut builder: PrimitiveBuilder = - PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone()); - for id in ids.into_iter() { - match id { - None => builder.append_null(), - Some(id) => builder.append_value(id), + unsafe { + let ids = self.map.take_all(indexes); + let mut builder: PrimitiveBuilder = + PrimitiveArray::builder(ids.len()).with_data_type(self.kt.clone()); + for id in ids.into_iter() { + match id { + None => builder.append_null(), + Some(id) => builder.append_value(id), + } } + let ids = builder.finish(); + Arc::new(ids) } - let ids = builder.finish(); - Arc::new(ids) } unsafe fn find_or_insert( @@ -273,24 +283,26 @@ where replace_idx: usize, mapper: &mut Vec<(usize, usize)>, ) -> (usize, bool) { - let ids = self.owned.as_primitive::(); - let id: Option = if ids.is_null(row_idx) { - None - } else { - Some(ids.value(row_idx)) - }; + unsafe { + let ids = self.owned.as_primitive::(); + let id: Option = if ids.is_null(row_idx) { + None + } else { + Some(ids.value(row_idx)) + }; - let hash: u64 = id.hash(&self.rnd); - if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) { - return (map_idx, false); - } + let hash: u64 = id.hash(&self.rnd); + if let Some(map_idx) = self.map.find(hash, |mi| id == *mi) { + return (map_idx, false); + } - // we're full and this is a better value, so remove the worst - let heap_idx = self.map.remove_if_full(replace_idx); + // we're full and this is a better value, so remove the worst + let heap_idx = self.map.remove_if_full(replace_idx); - // add the new group - let map_idx = self.map.insert(hash, id, heap_idx, mapper); - (map_idx, true) + // add the new group + let map_idx = self.map.insert(hash, id, heap_idx, mapper); + (map_idx, true) + } } } @@ -312,22 +324,28 @@ impl TopKHashTable { } pub unsafe fn heap_idx_at(&self, map_idx: usize) -> usize { - let bucket = unsafe { self.map.bucket(map_idx) }; - bucket.as_ref().heap_idx + unsafe { + let bucket = self.map.bucket(map_idx); + bucket.as_ref().heap_idx + } } pub unsafe fn remove_if_full(&mut self, replace_idx: usize) -> usize { - if self.map.len() >= self.limit { - self.map.erase(self.map.bucket(replace_idx)); - 0 // if full, always replace top node - } else { - self.map.len() // if we're not full, always append to end + unsafe { + if self.map.len() >= self.limit { + self.map.erase(self.map.bucket(replace_idx)); + 0 // if full, always replace top node + } else { + self.map.len() // if we're not full, always append to end + } } } unsafe fn update_heap_idx(&mut self, mapper: &[(usize, usize)]) { - for (m, h) in mapper { - self.map.bucket(*m).as_mut().heap_idx = *h + unsafe { + for (m, h) in mapper { + self.map.bucket(*m).as_mut().heap_idx = *h + } } } @@ -368,12 +386,14 @@ impl TopKHashTable { } pub unsafe fn take_all(&mut self, idxs: Vec) -> Vec { - let ids = idxs - .into_iter() - .map(|idx| self.map.bucket(idx).as_ref().id.clone()) - .collect(); - self.map.clear(); - ids + unsafe { + let ids = idxs + .into_iter() + .map(|idx| self.map.bucket(idx).as_ref().id.clone()) + .collect(); + self.map.clear(); + ids + } } } diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9e..0c7404bb0e8f 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -1018,11 +1018,11 @@ fn encode_scalar_nested_value( )) })?; - let gen = IpcDataGenerator {}; + let ipc_gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let write_options = IpcWriteOptions::default(); let mut compression_context = CompressionContext::default(); - let (encoded_dictionaries, encoded_message) = gen + let (encoded_dictionaries, encoded_message) = ipc_gen .encode( &batch, &mut dict_tracker, diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 920e277b8ccc..1901b7987daa 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -85,3 +85,6 @@ datafusion-functions-window-common = { workspace = true } doc-comment = { workspace = true } pretty_assertions = "1.4" tokio = { workspace = true, features = ["rt-multi-thread"] } + +[lints.clippy] +collapsible_if = "allow" diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 2774b5b6ba7c..7f233af09961 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -307,16 +307,16 @@ pub fn serialize_expr( } Expr::WindowFunction(window_fun) => { let expr::WindowFunction { - ref fun, + fun, params: expr::WindowFunctionParams { - ref args, - ref partition_by, - ref order_by, - ref window_frame, - ref null_treatment, - ref distinct, - ref filter, + args, + partition_by, + order_by, + window_frame, + null_treatment, + distinct, + filter, }, } = window_fun.as_ref(); let mut buf = Vec::new(); @@ -361,14 +361,14 @@ pub fn serialize_expr( } } Expr::AggregateFunction(expr::AggregateFunction { - ref func, + func, params: AggregateFunctionParams { - ref args, - ref distinct, - ref filter, - ref order_by, - ref null_treatment, + args, + distinct, + filter, + order_by, + null_treatment, }, }) => { let mut buf = Vec::new(); diff --git a/datafusion/sql/src/unparser/ast.rs b/datafusion/sql/src/unparser/ast.rs index 2cf26009ac0f..35c0c447517f 100644 --- a/datafusion/sql/src/unparser/ast.rs +++ b/datafusion/sql/src/unparser/ast.rs @@ -711,10 +711,10 @@ impl From for BuilderError { impl fmt::Display for BuilderError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Self::UninitializedField(ref field) => { + Self::UninitializedField(field) => { write!(f, "`{field}` must be initialized") } - Self::ValidationError(ref error) => write!(f, "{error}"), + Self::ValidationError(error) => write!(f, "{error}"), } } } diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 1b6c3433f79f..0ad776db58a0 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -311,7 +311,7 @@ pub(super) fn subquery_alias_inner_query_and_columns( // Projection: j1.j1_id AS id // Projection: j1.j1_id for (i, inner_expr) in inner_projection.expr.iter().enumerate() { - let Expr::Alias(ref outer_alias) = &outer_projections.expr[i] else { + let Expr::Alias(outer_alias) = &outer_projections.expr[i] else { return (plan, vec![]); }; diff --git a/rustfmt.toml b/rustfmt.toml index 4522e520a469..0bd199d9e7cf 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -16,6 +16,7 @@ # under the License. edition = "2021" +style_edition = "2021" max_width = 90 # ignore generated files