Issues with repartition #9701
-
Hello, I'm fairly new to Rust and Datafusion so please excuse the basic question. I'm trying to understand why repartition does not always produce the desired number of partitions. Here's a very basic test I constructed: #[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn repartition_test() {
let data_vec = (1..200).map(|x| x.to_string()).collect::<Vec<_>>();
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("foo", DataType::Utf8, false)])),
vec![Arc::new(StringArray::from(data_vec))]
).unwrap();
let result = SessionContext::new().read_batch(batch)
.unwrap()
.repartition(Partitioning::Hash(vec![col("foo")], 3))
.unwrap()
.collect_partitioned()
.await
.unwrap();
print!("RESULTS look like: {:?}", result);
assert_eq!(result.len(), 3);
}
} If I run the test: cargo test -- tests::repartition_test --exact I see the following output: running 1 test
test tests::repartition_test ... FAILED
failures:
---- tests::repartition_test stdout ----
RESULTS look like: [[RecordBatch { schema: Schema { fields: [Field { name: "foo", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [StringArray
[
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"10",
...179 elements...,
"190",
"191",
"192",
"193",
"194",
"195",
"196",
"197",
"198",
"199",
]], row_count: 199 }]]thread 'tests::repartition_test' panicked at src/main.rs:808:9:
assertion `left == right` failed
left: 1
right: 3
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
failures:
tests::repartition_test
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.02s I'm probably missing something obvious, but shouldn't the top level vector returned by collect_partitioned have 3 elements (i.e. 1 for each partition)? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 3 replies
-
First, let's take a look at the query plan:
As we can see, during the Logical Plan and Optimized Logical Plan stages, the query plan appears normal, but by the Physical Plan stage, the Repartition operator has disappeared. Reason:
After the |
Beta Was this translation helpful? Give feedback.
-
Change your code to look like this: async fn repartition_test() {
let data_vec = (1..200).map(|x| x.to_string()).collect::<Vec<_>>();
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("foo", DataType::Utf8, false)])),
vec![Arc::new(StringArray::from(data_vec))],
)
.unwrap();
let state = SessionState::new_with_config_rt(
SessionConfig::new(),
Arc::new(RuntimeEnv::default()),
)
.with_physical_optimizer_rules(vec![]); // without physical optimizers
let result = SessionContext::new_with_state(state)
.read_batch(batch)
.unwrap()
.repartition(Partitioning::Hash(vec![col("foo")], 3))
.unwrap()
.collect_partitioned()
.await
.unwrap();
print!("RESULTS look like: {:?}", result);
assert_eq!(result.len(), 3);
} |
Beta Was this translation helpful? Give feedback.
-
Here's an expanded test: mod tests {
use datafusion::datasource::MemTable;
use super::*;
#[tokio::test]
async fn repartition_test() {
let data_vec = (0..100).map(|x| x.to_string()).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("foo", DataType::Utf8, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(data_vec))]
).unwrap();
let n_batches = 4;
let mut batches = vec![batch.clone(); n_batches];
for _ in 1..n_batches {
batches.push(batch.clone())
}
// setting a small batch size helps trigger EnforceDistribution
let config = SessionConfig::from_env().unwrap()
.with_batch_size(1);
let ctx = SessionContext::new_with_config(config);
/* TEST SQL "DISTRIBUTED BY" */
let mem_table = MemTable::try_new(
schema.clone(),
vec![batches.clone()]
).unwrap();
ctx
.register_table(
TableReference::bare("test_table"),
Arc::new(mem_table)
)
.unwrap();
// I naively assumed that "DISTRIBUTED BY foo" would be similar to `.repartition(Partitioning::Hash(vec![col("foo")], ...))`
// but without the ability to specify the number of partitions.
// However, it seems that "DISTRIBUTED BY foo" is not enforced at all?
let query = "SELECT trim(foo) FROM test_table DISTRIBUTED BY foo";
let explain_sql = ctx.sql(format!("EXPLAIN VERBOSE {query}").as_str()).await.unwrap().collect().await.unwrap();
let explain_str = datafusion::arrow::util::pretty::pretty_format_batches(&explain_sql).unwrap().to_string();
println!("===Explain SQl output===\n{explain_str}");
let result = ctx.sql(query).await.unwrap().collect_partitioned().await.unwrap();
println!("===SQL result len:{}", result.len());
/* TEST DATAFRAME REPARTITION */
let num_partitions_expected = 20;
let df = ctx
.read_batches(batches.clone())
.unwrap()
.repartition(Partitioning::Hash(vec![col("foo")], num_partitions_expected))
.unwrap()
.select(vec![trim(col("foo"))]) // add some non-trivial select (i.e. trim) to trigger EnforceDistribution
.unwrap();
let explain_unoptimized = df.clone().explain(true, false).unwrap().collect().await.unwrap();
let explain_unoptimized_str = datafusion::arrow::util::pretty::pretty_format_batches(&explain_unoptimized).unwrap().to_string();
println!("===Explain (unoptimized)===\n{explain_unoptimized_str}");
// let explain_optimized = df.clone().explain(true, true).unwrap().collect().await.unwrap();
// let explain_optimized_str = datafusion::arrow::util::pretty::pretty_format_batches(&explain_optimized).unwrap().to_string();
// println!("===Explain (optimized)===\n{explain_optimized_str}");
// println!("===Physical plan===\n{:?}", df.clone().create_physical_plan().await.unwrap());
let result = df
.collect_partitioned()
.await
.unwrap();
println!("===RESULTS len: {}", result.len());
assert_eq!(result.len(), num_partitions_expected);
}
} When I run
Two things stand out to me:
|
Beta Was this translation helpful? Give feedback.
AboutQ2:
I believe the purpose of the optimization rule
EnforceDistribution
is to speed up the query while ensuring the correctness of the results. Although it replacesHash
withRoundRobinBatch
, the final result is correct (by that I mean the output row data, not the result data with partitions information that you wanted).In other words, if there is an Aggregation operator downstream of Repartition(Hash), then it would not be replaced at that time.
Therefore, if you want to achieve the desired result, the solution is to remove
EnforceDistribution
as I mentioned earlier.