From b20ace9cae6a4156b74aed1953812ce51e476077 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 21:30:07 +0200 Subject: [PATCH 1/7] fix: display the failed sqllogictest file and query that failed in case of a panic --- datafusion/sqllogictest/bin/sqllogictests.rs | 71 ++++++++++++++++--- .../src/engines/currently_executed_sql.rs | 49 +++++++++++++ .../src/engines/datafusion_engine/runner.rs | 22 +++++- .../runner.rs | 17 +++++ datafusion/sqllogictest/src/engines/mod.rs | 3 + .../src/engines/postgres_engine/mod.rs | 18 +++++ datafusion/sqllogictest/src/lib.rs | 1 + 7 files changed, 167 insertions(+), 14 deletions(-) create mode 100644 datafusion/sqllogictest/src/engines/currently_executed_sql.rs diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 7aca0fdd6e8d..a2680b77711c 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -21,8 +21,8 @@ use datafusion::common::utils::get_available_parallelism; use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_sqllogictest::{ df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file, - should_skip_record, value_normalizer, DataFusion, DataFusionSubstraitRoundTrip, - Filter, TestContext, + should_skip_record, value_normalizer, CurrentlyExecutedSqlTracker, DataFusion, + DataFusionSubstraitRoundTrip, Filter, TestContext, }; use futures::stream::StreamExt; use indicatif::{ @@ -41,6 +41,7 @@ use crate::postgres_container::{ initialize_postgres_container, terminate_postgres_container, }; use datafusion::common::runtime::SpawnedTask; +use futures::FutureExt; use std::ffi::OsStr; use std::fs; use std::path::{Path, PathBuf}; @@ -154,6 +155,11 @@ async fn run_tests() -> Result<()> { let m_style_clone = m_style.clone(); let filters = options.filters.clone(); + let relative_path = test_file.relative_path.clone(); + + // Mutex per file to hold the current sql being run + let current_sql = CurrentlyExecutedSqlTracker::new(); + let current_sql_clone = current_sql.clone(); SpawnedTask::spawn(async move { match ( options.postgres_runner, @@ -167,6 +173,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + current_sql_clone, ) .await? } @@ -177,12 +184,19 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + current_sql_clone, ) .await? } (false, true, _) => { - run_complete_file(test_file, validator, m_clone, m_style_clone) - .await? + run_complete_file( + test_file, + validator, + m_clone, + m_style_clone, + current_sql_clone, + ) + .await? } (true, false, _) => { run_test_file_with_postgres( @@ -191,6 +205,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + current_sql_clone, ) .await? } @@ -200,6 +215,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, + current_sql_clone, ) .await? } @@ -207,14 +223,29 @@ async fn run_tests() -> Result<()> { Ok(()) as Result<()> }) .join() + .map(move |result| (result, relative_path, current_sql)) }) // run up to num_cpus streams in parallel .buffer_unordered(options.test_threads) - .flat_map(|result| { + .flat_map(|(result, test_file_path, current_sql)| { // Filter out any Ok() leaving only the DataFusionErrors futures::stream::iter(match result { // Tokio panic error - Err(e) => Some(DataFusionError::External(Box::new(e))), + Err(e) => { + let error = DataFusionError::External(Box::new(e)); + let current_sql = current_sql.get_sql(); + + match current_sql { + Some(sql) => Some(error.context(format!( + "failure in {} for sql {sql}", + test_file_path.display() + ))), + None => Some(error.context(format!( + "failure in {} with no currently running sql tracked", + test_file_path.display() + ))), + } + } Ok(thread_result) => thread_result.err(), }) }) @@ -247,6 +278,7 @@ async fn run_test_file_substrait_round_trip( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { let TestFile { path, @@ -269,7 +301,8 @@ async fn run_test_file_substrait_round_trip( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) }); runner.add_label("DatafusionSubstraitRoundTrip"); runner.with_column_validator(strict_column_validator); @@ -286,6 +319,7 @@ async fn run_test_file( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { let TestFile { path, @@ -308,7 +342,8 @@ async fn run_test_file( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) }); runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); @@ -402,6 +437,7 @@ async fn run_test_file_with_postgres( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -417,7 +453,11 @@ async fn run_test_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()) + Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { + conn.with_currently_executed_sql_tracker( + currently_executed_sql_tracker.clone(), + ) + }) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); @@ -435,6 +475,7 @@ async fn run_test_file_with_postgres( _mp: MultiProgress, _mp_style: ProgressStyle, _filters: &[Filter], + _currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") @@ -445,6 +486,7 @@ async fn run_complete_file( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { let TestFile { path, @@ -470,7 +512,8 @@ async fn run_complete_file( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) }); let col_separator = " "; @@ -497,6 +540,7 @@ async fn run_complete_file_with_postgres( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -516,7 +560,11 @@ async fn run_complete_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()) + Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { + conn.with_currently_executed_sql_tracker( + currently_executed_sql_tracker.clone(), + ) + }) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); @@ -547,6 +595,7 @@ async fn run_complete_file_with_postgres( _validator: Validator, _mp: MultiProgress, _mp_style: ProgressStyle, + _currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs new file mode 100644 index 000000000000..d0a0114e4baa --- /dev/null +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -0,0 +1,49 @@ +use std::sync::{Arc, Mutex}; + +/// Hold the currently executed SQL statement. +/// This is used to save the last executed SQL statement in case of a crash. +/// +/// This can only hold one SQL statement at a time. +#[derive(Clone)] +pub struct CurrentlyExecutedSqlTracker { + /// Lock to store the currently executed SQL statement. + /// It DOES NOT hold the lock for the duration of query execution and only execute the lock + /// when updating the currently executed SQL statement to allow for saving the last executed SQL + /// in case of a crash. + currently_executed_sql: Arc>>, +} + +impl Default for CurrentlyExecutedSqlTracker { + fn default() -> Self { + Self::new() + } +} + +impl CurrentlyExecutedSqlTracker { + pub fn new() -> Self { + Self { + currently_executed_sql: Arc::new(Mutex::new(None)), + } + } + + /// Set the currently executed SQL statement. + pub fn set_sql(&self, sql: impl Into) { + let mut lock = self.currently_executed_sql.lock().unwrap(); + *lock = Some(sql.into()); + } + + /// Get the currently executed SQL statement. + pub fn get_sql(&self) -> Option { + let lock = self.currently_executed_sql.lock().unwrap(); + lock.clone() + } + + /// Clear the currently executed SQL statement only if it matches the provided SQL. + pub fn clear_sql_if_same(&self, sql: impl Into) { + let mut lock = self.currently_executed_sql.lock().unwrap(); + if lock.as_ref() != Some(&sql.into()) { + return; + } + *lock = None; + } +} diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 45deefdc9bbd..9d4e61403c4e 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -19,6 +19,9 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; use super::{error::Result, normalize, DFSqlLogicTestError}; +use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; +use crate::engines::output::{DFColumnType, DFOutput}; +use crate::is_spark_path; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; @@ -30,13 +33,11 @@ use log::{debug, log_enabled, warn}; use sqllogictest::DBOutput; use tokio::time::Instant; -use crate::engines::output::{DFColumnType, DFOutput}; -use crate::is_spark_path; - pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, } impl DataFusion { @@ -45,6 +46,17 @@ impl DataFusion { ctx, relative_path, pb, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), + } + } + + pub fn with_currently_executed_sql_tracker( + self, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + ) -> Self { + Self { + currently_executed_sql_tracker, + ..self } } @@ -79,10 +91,14 @@ impl sqllogictest::AsyncDB for DataFusion { ); } + self.currently_executed_sql_tracker.set_sql(sql); + let start = Instant::now(); let result = run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await; let duration = start.elapsed(); + self.currently_executed_sql_tracker.clear_sql_if_same(sql); + if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); } diff --git a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs index 2df93f0dede3..16de0cc3aa8e 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; +use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; use crate::engines::datafusion_engine::Result; use crate::engines::output::{DFColumnType, DFOutput}; use crate::{convert_batches, convert_schema_to_types, DFSqlLogicTestError}; @@ -39,6 +40,7 @@ pub struct DataFusionSubstraitRoundTrip { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, } impl DataFusionSubstraitRoundTrip { @@ -47,6 +49,17 @@ impl DataFusionSubstraitRoundTrip { ctx, relative_path, pb, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), + } + } + + pub fn with_currently_executed_sql_tracker( + self, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + ) -> Self { + Self { + currently_executed_sql_tracker, + ..self } } @@ -81,10 +94,14 @@ impl sqllogictest::AsyncDB for DataFusionSubstraitRoundTrip { ); } + self.currently_executed_sql_tracker.set_sql(sql); + let start = Instant::now(); let result = run_query_substrait_round_trip(&self.ctx, sql).await; let duration = start.elapsed(); + self.currently_executed_sql_tracker.clear_sql_if_same(sql); + if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); } diff --git a/datafusion/sqllogictest/src/engines/mod.rs b/datafusion/sqllogictest/src/engines/mod.rs index ef6335ddbed6..4931250c903e 100644 --- a/datafusion/sqllogictest/src/engines/mod.rs +++ b/datafusion/sqllogictest/src/engines/mod.rs @@ -20,6 +20,7 @@ mod conversion; mod datafusion_engine; mod datafusion_substrait_roundtrip_engine; mod output; +mod currently_executed_sql; pub use datafusion_engine::convert_batches; pub use datafusion_engine::convert_schema_to_types; @@ -29,6 +30,8 @@ pub use datafusion_substrait_roundtrip_engine::DataFusionSubstraitRoundTrip; pub use output::DFColumnType; pub use output::DFOutput; +pub use currently_executed_sql::CurrentlyExecutedSqlTracker; + #[cfg(feature = "postgres")] mod postgres_engine; diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 4d310711687f..48c57cc4c2e0 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -24,9 +24,11 @@ use sqllogictest::DBOutput; /// Postgres engine implementation for sqllogictest. use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use std::time::Duration; use super::conversion::*; +use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use indicatif::ProgressBar; @@ -59,6 +61,7 @@ pub struct Postgres { /// Relative test file path relative_path: PathBuf, pb: ProgressBar, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, } impl Postgres { @@ -118,9 +121,20 @@ impl Postgres { spawned_task: Some(spawned_task), relative_path, pb, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), }) } + pub fn with_currently_executed_sql_tracker( + self, + currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + ) -> Self { + Self { + currently_executed_sql_tracker, + ..self + } + } + fn get_client(&mut self) -> &mut tokio_postgres::Client { self.client.as_mut().expect("client is shutdown") } @@ -242,6 +256,8 @@ impl sqllogictest::AsyncDB for Postgres { sql ); + self.currently_executed_sql_tracker.set_sql(sql); + let lower_sql = sql.trim_start().to_ascii_lowercase(); let is_query_sql = { @@ -292,6 +308,8 @@ impl sqllogictest::AsyncDB for Postgres { .collect() }; + self.currently_executed_sql_tracker.clear_sql_if_same(sql); + if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) } else { diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index f3a78607242c..fbad4284f1a9 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -30,6 +30,7 @@ mod engines; pub use engines::convert_batches; pub use engines::convert_schema_to_types; +pub use engines::CurrentlyExecutedSqlTracker; pub use engines::DFColumnType; pub use engines::DFOutput; pub use engines::DFSqlLogicTestError; From 9d1695853459701fd2087423483c52e8f5b41ce0 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:05:11 +0200 Subject: [PATCH 2/7] extract tracking to struct and allow for concurrent running sql --- datafusion/sqllogictest/bin/sqllogictests.rs | 75 +++++++++++-------- .../src/engines/currently_executed_sql.rs | 56 ++++++++------ .../src/engines/datafusion_engine/runner.rs | 19 +++-- .../runner.rs | 19 +++-- datafusion/sqllogictest/src/engines/mod.rs | 2 +- .../src/engines/postgres_engine/mod.rs | 19 +++-- datafusion/sqllogictest/src/lib.rs | 2 +- 7 files changed, 112 insertions(+), 80 deletions(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index a2680b77711c..40a6c9f4d934 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -21,7 +21,7 @@ use datafusion::common::utils::get_available_parallelism; use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_sqllogictest::{ df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file, - should_skip_record, value_normalizer, CurrentlyExecutedSqlTracker, DataFusion, + should_skip_record, value_normalizer, CurrentlyExecutingSqlTracker, DataFusion, DataFusionSubstraitRoundTrip, Filter, TestContext, }; use futures::stream::StreamExt; @@ -158,8 +158,8 @@ async fn run_tests() -> Result<()> { let relative_path = test_file.relative_path.clone(); // Mutex per file to hold the current sql being run - let current_sql = CurrentlyExecutedSqlTracker::new(); - let current_sql_clone = current_sql.clone(); + let current_sql_tracker = CurrentlyExecutingSqlTracker::new(); + let current_sql_tracker_clone = current_sql_tracker.clone(); SpawnedTask::spawn(async move { match ( options.postgres_runner, @@ -173,7 +173,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_clone, + current_sql_tracker_clone, ) .await? } @@ -184,7 +184,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_clone, + current_sql_tracker_clone, ) .await? } @@ -194,7 +194,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, - current_sql_clone, + current_sql_tracker_clone, ) .await? } @@ -205,7 +205,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_clone, + current_sql_tracker_clone, ) .await? } @@ -215,7 +215,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, - current_sql_clone, + current_sql_tracker_clone, ) .await? } @@ -223,7 +223,7 @@ async fn run_tests() -> Result<()> { Ok(()) as Result<()> }) .join() - .map(move |result| (result, relative_path, current_sql)) + .map(move |result| (result, relative_path, current_sql_tracker)) }) // run up to num_cpus streams in parallel .buffer_unordered(options.test_threads) @@ -233,17 +233,30 @@ async fn run_tests() -> Result<()> { // Tokio panic error Err(e) => { let error = DataFusionError::External(Box::new(e)); - let current_sql = current_sql.get_sql(); + let current_sql = current_sql.get_currently_running_sqls(); - match current_sql { - Some(sql) => Some(error.context(format!( - "failure in {} for sql {sql}", - test_file_path.display() - ))), - None => Some(error.context(format!( + if current_sql.is_empty() { + Some(error.context(format!( "failure in {} with no currently running sql tracked", test_file_path.display() - ))), + ))) + } else if current_sql.len() == 1 { + let sql = ¤t_sql[0]; + Some(error.context(format!( + "failure in {} for sql {sql}", + test_file_path.display() + ))) + } else { + let sqls = current_sql + .iter() + .enumerate() + .map(|(i, sql)| format!("\n[{}]: {}", i + 1, sql)) + .collect::(); + Some(error.context(format!( + "failure in {} for multiple currently running sqls: {}", + test_file_path.display(), + sqls + ))) } } Ok(thread_result) => thread_result.err(), @@ -278,7 +291,7 @@ async fn run_test_file_substrait_round_trip( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -302,7 +315,7 @@ async fn run_test_file_substrait_round_trip( relative_path.clone(), pb.clone(), ) - .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); runner.add_label("DatafusionSubstraitRoundTrip"); runner.with_column_validator(strict_column_validator); @@ -319,7 +332,7 @@ async fn run_test_file( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -343,7 +356,7 @@ async fn run_test_file( relative_path.clone(), pb.clone(), ) - .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); @@ -437,7 +450,7 @@ async fn run_test_file_with_postgres( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -454,8 +467,8 @@ async fn run_test_file_with_postgres( let mut runner = sqllogictest::Runner::new(|| { Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { - conn.with_currently_executed_sql_tracker( - currently_executed_sql_tracker.clone(), + conn.with_currently_executing_sql_tracker( + currently_executing_sql_tracker.clone(), ) }) }); @@ -475,7 +488,7 @@ async fn run_test_file_with_postgres( _mp: MultiProgress, _mp_style: ProgressStyle, _filters: &[Filter], - _currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + _currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") @@ -486,7 +499,7 @@ async fn run_complete_file( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -513,7 +526,7 @@ async fn run_complete_file( relative_path.clone(), pb.clone(), ) - .with_currently_executed_sql_tracker(currently_executed_sql_tracker.clone())) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); let col_separator = " "; @@ -540,7 +553,7 @@ async fn run_complete_file_with_postgres( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -561,8 +574,8 @@ async fn run_complete_file_with_postgres( let mut runner = sqllogictest::Runner::new(|| { Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { - conn.with_currently_executed_sql_tracker( - currently_executed_sql_tracker.clone(), + conn.with_currently_executing_sql_tracker( + currently_executing_sql_tracker.clone(), ) }) }); @@ -595,7 +608,7 @@ async fn run_complete_file_with_postgres( _validator: Validator, _mp: MultiProgress, _mp_style: ProgressStyle, - _currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + _currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs index d0a0114e4baa..8bfdce54f625 100644 --- a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -1,49 +1,59 @@ +use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicUsize; -/// Hold the currently executed SQL statement. -/// This is used to save the last executed SQL statement in case of a crash. -/// -/// This can only hold one SQL statement at a time. +/// Hold the currently executed SQL statements. +/// This is used to save the currently running SQLs in case of a crash. #[derive(Clone)] -pub struct CurrentlyExecutedSqlTracker { +pub struct CurrentlyExecutingSqlTracker { + /// The index of the SQL statement. + /// Used to uniquely identify each SQL statement even if they are the same. + sql_index: Arc, /// Lock to store the currently executed SQL statement. /// It DOES NOT hold the lock for the duration of query execution and only execute the lock /// when updating the currently executed SQL statement to allow for saving the last executed SQL /// in case of a crash. - currently_executed_sql: Arc>>, + currently_executed_sqls: Arc>>, } -impl Default for CurrentlyExecutedSqlTracker { +impl Default for CurrentlyExecutingSqlTracker { fn default() -> Self { Self::new() } } -impl CurrentlyExecutedSqlTracker { +impl CurrentlyExecutingSqlTracker { pub fn new() -> Self { Self { - currently_executed_sql: Arc::new(Mutex::new(None)), + sql_index: Arc::new(AtomicUsize::new(0)), + currently_executed_sqls: Arc::new(Mutex::new(HashMap::new())), } } /// Set the currently executed SQL statement. - pub fn set_sql(&self, sql: impl Into) { - let mut lock = self.currently_executed_sql.lock().unwrap(); - *lock = Some(sql.into()); + /// + /// Returns a key to use to remove the SQL statement when done. + /// + /// We are not returning a guard that will automatically remove the SQL statement when dropped. + /// as on panic the drop can be called, and it will remove the SQL statement before we can log it. + #[must_use = "The returned index must be used to remove the SQL statement when done."] + pub fn set_sql(&self, sql: impl Into) -> usize { + let index = self.sql_index.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let mut lock = self.currently_executed_sqls.lock().unwrap(); + lock.insert(index, sql.into()); + drop(lock); + index } - /// Get the currently executed SQL statement. - pub fn get_sql(&self) -> Option { - let lock = self.currently_executed_sql.lock().unwrap(); - lock.clone() + /// Remove the currently executed SQL statement by the provided key that was returned by [`Self::set_sql`]. + pub fn remove_sql(&self, index: usize) { + let mut lock = self.currently_executed_sqls.lock().unwrap(); + lock.remove(&index); } - /// Clear the currently executed SQL statement only if it matches the provided SQL. - pub fn clear_sql_if_same(&self, sql: impl Into) { - let mut lock = self.currently_executed_sql.lock().unwrap(); - if lock.as_ref() != Some(&sql.into()) { - return; - } - *lock = None; + /// Get the currently executed SQL statements. + pub fn get_currently_running_sqls(&self) -> Vec { + let lock = self.currently_executed_sqls.lock().unwrap(); + lock.values().cloned().collect() } } diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 9d4e61403c4e..13c107408cf5 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; use super::{error::Result, normalize, DFSqlLogicTestError}; -use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; use crate::engines::output::{DFColumnType, DFOutput}; use crate::is_spark_path; use arrow::record_batch::RecordBatch; @@ -37,7 +37,7 @@ pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl DataFusion { @@ -46,16 +46,19 @@ impl DataFusion { ctx, relative_path, pb, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), } } - pub fn with_currently_executed_sql_tracker( + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( self, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Self { Self { - currently_executed_sql_tracker, + currently_executing_sql_tracker, ..self } } @@ -91,13 +94,13 @@ impl sqllogictest::AsyncDB for DataFusion { ); } - self.currently_executed_sql_tracker.set_sql(sql); + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); let start = Instant::now(); let result = run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await; let duration = start.elapsed(); - self.currently_executed_sql_tracker.clear_sql_if_same(sql); + self.currently_executing_sql_tracker.remove_sql(tracked_sql); if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); diff --git a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs index 16de0cc3aa8e..8ba182763f5e 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; -use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; use crate::engines::datafusion_engine::Result; use crate::engines::output::{DFColumnType, DFOutput}; use crate::{convert_batches, convert_schema_to_types, DFSqlLogicTestError}; @@ -40,7 +40,7 @@ pub struct DataFusionSubstraitRoundTrip { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl DataFusionSubstraitRoundTrip { @@ -49,16 +49,19 @@ impl DataFusionSubstraitRoundTrip { ctx, relative_path, pb, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), } } - pub fn with_currently_executed_sql_tracker( + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( self, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Self { Self { - currently_executed_sql_tracker, + currently_executing_sql_tracker, ..self } } @@ -94,13 +97,13 @@ impl sqllogictest::AsyncDB for DataFusionSubstraitRoundTrip { ); } - self.currently_executed_sql_tracker.set_sql(sql); + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); let start = Instant::now(); let result = run_query_substrait_round_trip(&self.ctx, sql).await; let duration = start.elapsed(); - self.currently_executed_sql_tracker.clear_sql_if_same(sql); + self.currently_executing_sql_tracker.remove_sql(tracked_sql); if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); diff --git a/datafusion/sqllogictest/src/engines/mod.rs b/datafusion/sqllogictest/src/engines/mod.rs index 4931250c903e..ed4ea71566b4 100644 --- a/datafusion/sqllogictest/src/engines/mod.rs +++ b/datafusion/sqllogictest/src/engines/mod.rs @@ -30,7 +30,7 @@ pub use datafusion_substrait_roundtrip_engine::DataFusionSubstraitRoundTrip; pub use output::DFColumnType; pub use output::DFOutput; -pub use currently_executed_sql::CurrentlyExecutedSqlTracker; +pub use currently_executed_sql::CurrentlyExecutingSqlTracker; #[cfg(feature = "postgres")] mod postgres_engine; diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 48c57cc4c2e0..8e7c174c4f76 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -28,7 +28,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use super::conversion::*; -use crate::engines::currently_executed_sql::CurrentlyExecutedSqlTracker; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use indicatif::ProgressBar; @@ -61,7 +61,7 @@ pub struct Postgres { /// Relative test file path relative_path: PathBuf, pb: ProgressBar, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl Postgres { @@ -121,16 +121,19 @@ impl Postgres { spawned_task: Some(spawned_task), relative_path, pb, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker::default(), + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), }) } - pub fn with_currently_executed_sql_tracker( + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( self, - currently_executed_sql_tracker: CurrentlyExecutedSqlTracker, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Self { Self { - currently_executed_sql_tracker, + currently_executing_sql_tracker, ..self } } @@ -256,7 +259,7 @@ impl sqllogictest::AsyncDB for Postgres { sql ); - self.currently_executed_sql_tracker.set_sql(sql); + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); let lower_sql = sql.trim_start().to_ascii_lowercase(); @@ -308,7 +311,7 @@ impl sqllogictest::AsyncDB for Postgres { .collect() }; - self.currently_executed_sql_tracker.clear_sql_if_same(sql); + self.currently_executing_sql_tracker.remove_sql(tracked_sql); if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index fbad4284f1a9..d9319dbbbdac 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -30,7 +30,7 @@ mod engines; pub use engines::convert_batches; pub use engines::convert_schema_to_types; -pub use engines::CurrentlyExecutedSqlTracker; +pub use engines::CurrentlyExecutingSqlTracker; pub use engines::DFColumnType; pub use engines::DFOutput; pub use engines::DFSqlLogicTestError; From 4ccde92983ae9e2db2e61f286b343c0ec0035f01 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:05:33 +0200 Subject: [PATCH 3/7] add license --- .../src/engines/currently_executed_sql.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs index 8bfdce54f625..5a54ddede794 100644 --- a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; From 22aac842ccefdf57952f1513dd729c9381604a66 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:07:14 +0200 Subject: [PATCH 4/7] format and lint --- .../sqllogictest/src/engines/currently_executed_sql.rs | 6 ++++-- datafusion/sqllogictest/src/engines/mod.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs index 5a54ddede794..3e8344d35132 100644 --- a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -16,8 +16,8 @@ // under the License. use std::collections::HashMap; -use std::sync::{Arc, Mutex}; use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Mutex}; /// Hold the currently executed SQL statements. /// This is used to save the currently running SQLs in case of a crash. @@ -55,7 +55,9 @@ impl CurrentlyExecutingSqlTracker { /// as on panic the drop can be called, and it will remove the SQL statement before we can log it. #[must_use = "The returned index must be used to remove the SQL statement when done."] pub fn set_sql(&self, sql: impl Into) -> usize { - let index = self.sql_index.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let index = self + .sql_index + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); let mut lock = self.currently_executed_sqls.lock().unwrap(); lock.insert(index, sql.into()); drop(lock); diff --git a/datafusion/sqllogictest/src/engines/mod.rs b/datafusion/sqllogictest/src/engines/mod.rs index ed4ea71566b4..8a6f44de4391 100644 --- a/datafusion/sqllogictest/src/engines/mod.rs +++ b/datafusion/sqllogictest/src/engines/mod.rs @@ -17,10 +17,10 @@ /// Implementation of sqllogictest for datafusion. mod conversion; +mod currently_executed_sql; mod datafusion_engine; mod datafusion_substrait_roundtrip_engine; mod output; -mod currently_executed_sql; pub use datafusion_engine::convert_batches; pub use datafusion_engine::convert_schema_to_types; From f3980641660997345af6061dc3b34f365020bd07 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:10:43 +0200 Subject: [PATCH 5/7] rename --- datafusion/sqllogictest/bin/sqllogictests.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 40a6c9f4d934..dde7c24d27fd 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -157,9 +157,9 @@ async fn run_tests() -> Result<()> { let relative_path = test_file.relative_path.clone(); - // Mutex per file to hold the current sql being run - let current_sql_tracker = CurrentlyExecutingSqlTracker::new(); - let current_sql_tracker_clone = current_sql_tracker.clone(); + let currently_running_sql_tracker = CurrentlyExecutingSqlTracker::new(); + let currently_running_sql_tracker_clone = + currently_running_sql_tracker.clone(); SpawnedTask::spawn(async move { match ( options.postgres_runner, @@ -173,7 +173,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_tracker_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -184,7 +184,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_tracker_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -194,7 +194,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, - current_sql_tracker_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -205,7 +205,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), - current_sql_tracker_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -215,7 +215,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, - current_sql_tracker_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -223,7 +223,7 @@ async fn run_tests() -> Result<()> { Ok(()) as Result<()> }) .join() - .map(move |result| (result, relative_path, current_sql_tracker)) + .map(move |result| (result, relative_path, currently_running_sql_tracker)) }) // run up to num_cpus streams in parallel .buffer_unordered(options.test_threads) From c660b4ff8449bff83bc2891d7512dbe88b3dc01d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 22:57:07 +0200 Subject: [PATCH 6/7] fix postgres --- datafusion/sqllogictest/bin/sqllogictests.rs | 20 +++++++++---------- .../src/engines/postgres_engine/mod.rs | 12 ++++++++++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index dde7c24d27fd..ec705fc9ba06 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -466,11 +466,11 @@ async fn run_test_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { - conn.with_currently_executing_sql_tracker( - currently_executing_sql_tracker.clone(), - ) - }) + Postgres::connect_with_tracked_sql( + relative_path.clone(), + pb.clone(), + currently_executing_sql_tracker.clone(), + ) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); @@ -573,11 +573,11 @@ async fn run_complete_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()).map(|conn| { - conn.with_currently_executing_sql_tracker( - currently_executing_sql_tracker.clone(), - ) - }) + Postgres::connect_with_tracked_sql( + relative_path.clone(), + pb.clone(), + currently_executing_sql_tracker.clone(), + ) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 8e7c174c4f76..16e2f72736cf 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -24,7 +24,6 @@ use sqllogictest::DBOutput; /// Postgres engine implementation for sqllogictest. use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::{Arc, Mutex}; use std::time::Duration; use super::conversion::*; @@ -125,6 +124,17 @@ impl Postgres { }) } + /// Creates a runner for executing queries against an existing postgres connection + /// with a tracker for currently executing SQL statements. + pub async fn connect_with_tracked_sql( + relative_path: PathBuf, + pb: ProgressBar, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, + ) -> Result { + let conn = Self::connect(relative_path, pb).await?; + Ok(conn.with_currently_executing_sql_tracker(currently_executing_sql_tracker)) + } + /// Add a tracker that will track the currently executed SQL statement. /// /// This is useful for logging and debugging purposes. From f24c3d8977ad33ea476251960eabc65703131ace Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Mon, 17 Nov 2025 23:05:54 +0200 Subject: [PATCH 7/7] fix postgres and lock --- .../src/engines/currently_executed_sql.rs | 21 ++++++++++++------- .../src/engines/postgres_engine/mod.rs | 6 +++++- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs index 3e8344d35132..5b1979b4ee9a 100644 --- a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -58,21 +58,28 @@ impl CurrentlyExecutingSqlTracker { let index = self .sql_index .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let mut lock = self.currently_executed_sqls.lock().unwrap(); - lock.insert(index, sql.into()); - drop(lock); + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .insert(index, sql.into()); index } /// Remove the currently executed SQL statement by the provided key that was returned by [`Self::set_sql`]. pub fn remove_sql(&self, index: usize) { - let mut lock = self.currently_executed_sqls.lock().unwrap(); - lock.remove(&index); + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .remove(&index); } /// Get the currently executed SQL statements. pub fn get_currently_running_sqls(&self) -> Vec { - let lock = self.currently_executed_sqls.lock().unwrap(); - lock.values().cloned().collect() + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .values() + .cloned() + .collect() } } diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 16e2f72736cf..1fafc9626a3f 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -287,11 +287,15 @@ impl sqllogictest::AsyncDB for Postgres { if lower_sql.starts_with("copy") { self.pb.inc(1); - return self.run_copy_command(sql).await; + let result = self.run_copy_command(sql).await; + self.currently_executing_sql_tracker.remove_sql(tracked_sql); + + return result; } if !is_query_sql { self.get_client().execute(sql, &[]).await?; + self.currently_executing_sql_tracker.remove_sql(tracked_sql); self.pb.inc(1); return Ok(DBOutput::StatementComplete(0)); }