diff --git a/benchmarks/README.md b/benchmarks/README.md index bf9b50028b14..c84af9a7e6f3 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -832,3 +832,41 @@ Getting results... cancelling thread done dropping runtime in 83.531417ms ``` + +## Sorted Data Benchmarks + +### Data Sorted ClickBench + +Benchmark for queries on pre-sorted data to test sort order optimization. +This benchmark uses a subset of the ClickBench dataset (hits.parquet, ~14GB) that has been pre-sorted by the EventTime column. The queries are designed to test DataFusion's performance when the data is already sorted as is common in timeseries workloads. + +The benchmark includes queries that: +- Scan pre-sorted data with ORDER BY clauses that match the sort order +- Test reverse scans on sorted data +- Verify the performance result + +#### Generating Sorted Data + +The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB. +```bash +./bench.sh data data_sorted_clickbench +``` + +To create the sorted dataset, for example with 16GB of memory, run: + +```bash +DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench +``` + +This command will: +1. Download the ClickBench partitioned dataset if not present +2. Sort hits.parquet by EventTime in ascending order +3. Save the sorted file as hits_sorted.parquet + +#### Running the Benchmark + +```bash +./bench.sh run data_sorted_clickbench +``` + +This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations. diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 6b94dadeb96b..a91a6c32bd42 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -102,6 +102,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) +# Sorted Data Benchmarks (ORDER BY Optimization) +clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) + # H2O.ai Benchmarks (Group By, Join, Window) h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv @@ -324,6 +327,9 @@ main() { compile_profile) data_tpch "1" "parquet" ;; + clickbench_sorted) + clickbench_sorted + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -459,7 +465,7 @@ main() { h2o_medium_window) run_h2o_window "MEDIUM" "CSV" "window" ;; - h2o_big_window) + h2o_big_window) run_h2o_window "BIG" "CSV" "window" ;; h2o_small_parquet) @@ -511,6 +517,9 @@ main() { compile_profile) run_compile_profile "${PROFILE_ARGS[@]}" ;; + clickbench_sorted) + run_clickbench_sorted + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -1260,6 +1269,113 @@ compare_benchmarks() { } +# Creates sorted ClickBench data from hits.parquet (full dataset) +# The data is sorted by EventTime in ascending order +# Uses datafusion-cli to reduce dependencies +clickbench_sorted() { + SORTED_FILE="${DATA_DIR}/hits_sorted.parquet" + ORIGINAL_FILE="${DATA_DIR}/hits.parquet" + + # Default memory limit is 12GB, can be overridden with DATAFUSION_MEMORY_GB env var + MEMORY_LIMIT_GB=${DATAFUSION_MEMORY_GB:-12} + + echo "Creating sorted ClickBench dataset from hits.parquet..." + echo "Configuration:" + echo " Memory limit: ${MEMORY_LIMIT_GB}G" + echo " Row group size: 64K rows" + echo " Compression: uncompressed" + + if [ ! -f "${ORIGINAL_FILE}" ]; then + echo "hits.parquet not found. Running data_clickbench_1 first..." + data_clickbench_1 + fi + + if [ -f "${SORTED_FILE}" ]; then + echo "Sorted hits.parquet already exists at ${SORTED_FILE}" + return 0 + fi + + echo "Sorting hits.parquet by EventTime (this may take several minutes)..." + + pushd "${DATAFUSION_DIR}" > /dev/null + echo "Building datafusion-cli..." + cargo build --release --bin datafusion-cli + DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli" + popd > /dev/null + + + START_TIME=$(date +%s) + echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')" + echo "Using datafusion-cli to create sorted parquet file..." + "${DATAFUSION_CLI}" << EOF +-- Memory and performance configuration +SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G'; +SET datafusion.execution.spill_compression = 'uncompressed'; +SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB +SET datafusion.execution.batch_size = 8192; +SET datafusion.execution.target_partitions = 1; + +-- Parquet output configuration +SET datafusion.execution.parquet.max_row_group_size = 65536; +SET datafusion.execution.parquet.compression = 'uncompressed'; + +-- Execute sort and write +COPY (SELECT * FROM '${ORIGINAL_FILE}' ORDER BY "EventTime") +TO '${SORTED_FILE}' +STORED AS PARQUET; +EOF + + local result=$? + + END_TIME=$(date +%s) + DURATION=$((END_TIME - START_TIME)) + echo "End time: $(date '+%Y-%m-%d %H:%M:%S')" + + if [ $result -eq 0 ]; then + echo "✓ Successfully created sorted ClickBench dataset" + + INPUT_SIZE=$(stat -f%z "${ORIGINAL_FILE}" 2>/dev/null || stat -c%s "${ORIGINAL_FILE}" 2>/dev/null) + OUTPUT_SIZE=$(stat -f%z "${SORTED_FILE}" 2>/dev/null || stat -c%s "${SORTED_FILE}" 2>/dev/null) + INPUT_MB=$((INPUT_SIZE / 1024 / 1024)) + OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024)) + + echo " Input: ${INPUT_MB} MB" + echo " Output: ${OUTPUT_MB} MB" + + echo "" + echo "Time Statistics:" + echo " Total duration: ${DURATION} seconds ($(printf '%02d:%02d:%02d' $((DURATION/3600)) $((DURATION%3600/60)) $((DURATION%60))))" + echo " Throughput: $((INPUT_MB / DURATION)) MB/s" + + return 0 + else + echo "✗ Error: Failed to create sorted dataset" + echo "💡 Tip: Try increasing memory with: DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted" + return 1 + fi +} + +# Runs the sorted data benchmark with prefer_existing_sort configuration +run_clickbench_sorted() { + RESULTS_FILE="${RESULTS_DIR}/clickbench_sorted.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running sorted data benchmark with prefer_existing_sort optimization..." + + # Ensure sorted data exists + clickbench_sorted + + # Run benchmark with prefer_existing_sort configuration + # This allows DataFusion to optimize away redundant sorts while maintaining parallelism + debug_run $CARGO_COMMAND --bin dfbench -- clickbench \ + --iterations 5 \ + --path "${DATA_DIR}/hits_sorted.parquet" \ + --queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \ + --sorted-by "EventTime" \ + -c datafusion.optimizer.prefer_existing_sort=true \ + -o "${RESULTS_FILE}" \ + ${QUERY_ARG} +} + setup_venv() { python3 -m venv "$VIRTUAL_ENV" PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt diff --git a/benchmarks/queries/clickbench/queries/sorted_data/q0.sql b/benchmarks/queries/clickbench/queries/sorted_data/q0.sql new file mode 100644 index 000000000000..1170a383bcb2 --- /dev/null +++ b/benchmarks/queries/clickbench/queries/sorted_data/q0.sql @@ -0,0 +1,3 @@ +-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591 +-- set datafusion.execution.parquet.binary_as_string = true +SELECT * FROM hits ORDER BY "EventTime" DESC limit 10; diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index a550503390c5..796b6a4dec52 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -78,6 +78,27 @@ pub struct RunOpt { /// If present, write results json here #[structopt(parse(from_os_str), short = "o", long = "output")] output_path: Option, + + /// Column name that the data is sorted by (e.g., "EventTime") + /// If specified, DataFusion will be informed that the data has this sort order + /// using CREATE EXTERNAL TABLE with WITH ORDER clause. + /// + /// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true + /// This allows DataFusion to optimize away redundant sorts while maintaining + /// multi-core parallelism for other operations. + #[structopt(long = "sorted-by")] + sorted_by: Option, + + /// Sort order: ASC or DESC (default: ASC) + #[structopt(long = "sort-order", default_value = "ASC")] + sort_order: String, + + /// Configuration options in the format key=value + /// Can be specified multiple times. + /// + /// Example: -c datafusion.optimizer.prefer_existing_sort=true + #[structopt(short = "c", long = "config")] + config_options: Vec, } /// Get the SQL file path @@ -125,6 +146,37 @@ impl RunOpt { // configure parquet options let mut config = self.common.config()?; + + if self.sorted_by.is_some() { + println!("ℹ️ Data is registered with sort order"); + + let has_prefer_sort = self + .config_options + .iter() + .any(|opt| opt.contains("prefer_existing_sort=true")); + + if !has_prefer_sort { + println!("ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true"); + println!("ℹ️ to optimize queries while maintaining parallelism"); + } + } + + // Apply user-provided configuration options + for config_opt in &self.config_options { + let parts: Vec<&str> = config_opt.splitn(2, '=').collect(); + if parts.len() != 2 { + return Err(exec_datafusion_err!( + "Invalid config option format: '{}'. Expected 'key=value'", + config_opt + )); + } + let key = parts[0]; + let value = parts[1]; + + println!("Setting config: {key} = {value}"); + config = config.set_str(key, value); + } + { let parquet_options = &mut config.options_mut().execution.parquet; // The hits_partitioned dataset specifies string columns @@ -136,10 +188,18 @@ impl RunOpt { parquet_options.pushdown_filters = true; parquet_options.reorder_filters = true; } + + if self.sorted_by.is_some() { + // We should compare the dynamic topk optimization when data is sorted, so we make the + // assumption that filter pushdown is also enabled in this case. + parquet_options.pushdown_filters = true; + parquet_options.reorder_filters = true; + } } let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + self.register_hits(&ctx).await?; let mut benchmark_run = BenchmarkRun::new(); @@ -214,17 +274,54 @@ impl RunOpt { } /// Registers the `hits.parquet` as a table named `hits` + /// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER async fn register_hits(&self, ctx: &SessionContext) -> Result<()> { - let options = Default::default(); let path = self.path.as_os_str().to_str().unwrap(); - ctx.register_parquet("hits", path, options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'hits' as {path}"), - Box::new(e), - ) - }) + + // If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER + if let Some(ref sort_column) = self.sorted_by { + println!( + "Registering table with sort order: {} {}", + sort_column, self.sort_order + ); + + // Escape column name with double quotes + let escaped_column = if sort_column.contains('"') { + sort_column.clone() + } else { + format!("\"{sort_column}\"") + }; + + // Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause + // Schema will be automatically inferred from the Parquet file + let create_table_sql = format!( + "CREATE EXTERNAL TABLE hits \ + STORED AS PARQUET \ + LOCATION '{}' \ + WITH ORDER ({} {})", + path, + escaped_column, + self.sort_order.to_uppercase() + ); + + println!("Executing: {create_table_sql}"); + + // Execute the CREATE EXTERNAL TABLE statement + ctx.sql(&create_table_sql).await?.collect().await?; + + Ok(()) + } else { + // Original registration without sort order + let options = Default::default(); + ctx.register_parquet("hits", path, options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'hits' as {path}"), + Box::new(e), + ) + }) + } } fn iterations(&self) -> usize {